Java multithreaded communication mode [1]

Bayanbulake 2022-02-13 08:02:44 阅读数:972

java multithreaded communication mode

Thread communication

When multiple threads compete for the same resource , In order for these threads to work together 、 Improve CPU utilization , You can let threads communicate , The details can be obtained through wait()notify()notifyAll() Realization :

  • wait(): Leave the current thread waiting ( Blocking ), Until another thread calls the notify() Methods or notifyAll().

  • notify(): Wake up a single thread waiting on this object monitor ; If there are multiple threads waiting on the monitor at the same time , Then wake up a random .

  • notifyAll(): Wake up all threads waiting on the object monitor .

in short ,wait()、 Will block the thread ,notify() perhaps notifyAll() Can wake up threads , Make it ready .

When actually using these methods , Also pay attention to the following points :

  • The above three methods are Object Of native Method , No Thread Class . This is because Java The locking mechanism provided is object level , Not thread level .

  • All three methods must be in synchronized The method of decoration ( Or code blocks ) Use in , Otherwise, an exception will be thrown ,java.lang.IllegaMonitorStateException.

  • In the use of wait() when , In order to avoid the problems caused by concurrency , It is generally recommended that wait() Method is written inside the loop ,JDK When defining this method , Notes have also been added to explain .

Here is a case of producers and consumers , Strengthen the understanding of thread communication . The logic of the case is as follows :

  1. producer CarProducter Constantly adding data to the shared buffer ( We use cars++ simulation ).
  2. At the same time, consumers continue to consume data from the shared buffer ( cars--).
  3. The shared buffer has a fixed size capacity (20).
  4. When the output reaches 20 when , Producers will not be producing , The producer's thread will pass wait() Keep yourself blocked ; Until consumers consume some production (<20), Re pass notify() or notifyAll() Awaken producers to continue production .
  5. When the output is 0 when , Consumers will call wait() Keep yourself blocked , Until the producer increases production (>0), Re pass notify() or notifyAll() Awaken consumers to continue to consume .

Such rules , Will make the output in 0~20 Has been in a state of fluctuation between .

package com.geovis.bin.custom.study.wangbin.lock.contion;

/**
 * @Author: Wangb
 * @EMail[email protected]
 * @Date: 24/12/2021  In the morning 11:50
 * @Description
 */

//car Inventory of
class CarStock {
    // Can store at most 20 Vehicles
    int cars;

    // Inform the producer to produce the car
    public synchronized void productCar() {
        try {
            if (cars <= 20) {
                System.out.println(" Production vehicle ...."+ cars);
                Thread.sleep(100);
                // Notification is listening CarStock And a thread in a blocked state ( It's in wait() Status consumers )
                notifyAll();
                cars++;

            } else {// Exceeded the maximum stock 20
                // Make oneself ( Current producer thread ) In a blocking state , Wait for the consumer to cancel car--( That is, wait for the consumer to call notifyAll() Method )
                wait();

            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // Inform consumers to go to the consumer car
    public synchronized void consumeCar() {
        try {
            if (cars > 0) {
                System.out.println(" Sales car ...."+ cars);
                Thread.sleep(100);
                notifyAll();
                cars--;
                // Notification is listening CarStock And a thread in a blocked state ( It's in wait() The producer of the State )
            } else {
                // Make oneself ( Current consumer thread ) In a blocking state , Wait for the consumer to cancel car++( That is, wait for the producer to call notifyAll() Method )
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

// producer
class CarProducter implements Runnable {
    CarStock carStock;
    public CarProducter(CarStock clerk) {
        this.carStock = clerk;
    }

    @Override
    public void run() {
        while (true) {
            carStock.productCar(); // Production vehicle
        }
    }
}

// consumer
class CarConsumer implements Runnable {
    CarStock carStock;

    public CarConsumer(CarStock carStock) {
        this.carStock = carStock;
    }

    @Override
    public void run() {
        while (true) {
            carStock.consumeCar();// Consumer car
        }
    }
}

// The test method
public class ProducerAndConsumer {
    public static void main(String[] args) {
        CarStock carStock = new CarStock();
        // Be careful : Producer thread and consumer thread , Use the same carStock object
        CarProducter product = new CarProducter(carStock);
        CarConsumer resumer = new CarConsumer(carStock);
        //2 A producer ,2 Consumers
        Thread tProduct1 = new Thread(product);
        Thread tProduct2 = new Thread(product);
        Thread tResumer1 = new Thread(resumer);
        Thread tResumer2 = new Thread(resumer);
        tProduct1.start();
        tProduct2.start();
        tResumer1.start();
        tResumer2.start();
    }
}

The program runs , As shown in the figure below :

The above code is a very simple program for producers and consumers to share variables .

Next , We use queue and thread pool technology to improve the program , And this data sharing is a BlockingQueue queue , This queue can save 100 individual Car Data object .

Automotive entities

package com.geovis.bin.custom.study.wangbin.lock.contion.producerconsumer;

public class CarData {
    private int id;

    // Other fields
    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }
}

Car inventory CarStock, Contains shared buffers BlockingQueue object

package com.geovis.bin.custom.study.wangbin.lock.contion.producerconsumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class CarStock {
    // Count the total number of cars produced
    private static int count = 0;
    // Deposit CarData Object's shared buffer
    private BlockingQueue<CarData> queue;

    public CarStock(BlockingQueue<CarData> queue) {
        this.queue = queue;
    }

    // Production vehicle
    public synchronized void productCar() {
        try {
            CarData carData = new CarData();
            // towards CarData Add one... To the queue CarData object
            boolean success = this.queue.offer(carData, 2, TimeUnit.SECONDS);
            if (success) {
                int id = ++count;
                carData.setId(id);
                System.out.println(" production CarData, Number :" + id + ", stock :" + queue.size());
                Thread.sleep((int) (1000 * Math.random()));
                notifyAll();
            } else {
                System.out.println(" production CarData Failure ....");
            }
            if (queue.size() < 100) {
            } else {

                System.out.println(" Inventory is full , Waiting for consumption ...");
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // Consumer car
    public synchronized void resumeCar() {
        try {
            //  from CarData In line , Take one CarData object
            CarData carData = this.queue.poll(2, TimeUnit.SECONDS);
            if (carData != null) {
                Thread.sleep((int) (1000 * Math.random()));
                notifyAll();
                System.out.println(" consumption CarData, Number :" + carData.getId() + ", stock : " + queue.size());
            } else {
                System.out.println(" consumption CarData Failure ....");
            }
            if (queue.size() > 0) {

            } else {
                System.out.println(" Inventory is empty , Waiting for production ...");
                wait();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Producer class

package com.geovis.bin.custom.study.wangbin.lock.contion.producerconsumer;

public class CarProducter implements Runnable {
    // Shared cache
    private CarStock carPool;
    // Multithreaded execution state , Used to control the start and stop of threads
    private volatile boolean isRunning = true;

    public CarProducter(CarStock carPool) {
        this.carPool = carPool;
    }

    @Override
    public void run() {
        while (isRunning) {
            carPool.productCar();
        }
    }

    // Stop the current thread
    public void stop() {
        this.isRunning = false;
    }
}

Consumer class

package com.geovis.bin.custom.study.wangbin.lock.contion.producerconsumer;

// consumer
public class CarConsumer implements Runnable {
    // Shared cache :CarData queue
    private CarStock carPool;

    public CarConsumer(CarStock carPool) {
        this.carPool = carPool;
    }

    @Override
    public void run() {
        while (true) {
            carPool.resumeCar();
        }
    }
}

Test class

package com.geovis.bin.custom.study.wangbin.lock.contion.producerconsumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class TestProducerAndConsumer {

    public static void main(String[] args) throws Exception {
        // Shared cache :CarData queue
        BlockingQueue<CarData> queue = new LinkedBlockingQueue<CarData>(100);
        //CarData stock , Contains queue queue
        CarStock carStock = new CarStock(queue);
        // producer
        CarProducter carProducter1 = new CarProducter(carStock);
        CarProducter carProducter2 = new CarProducter(carStock);
        CarProducter carProducter3 = new CarProducter(carStock);
        // consumer
        CarConsumer carConsumer1 = new CarConsumer(carStock);
        CarConsumer carConsumer2 = new CarConsumer(carStock);
        CarConsumer carConsumer3 = new CarConsumer(carStock);
        // Add producers and consumers to the thread pool to run
        ExecutorService cachePool = Executors.newCachedThreadPool();
        cachePool.execute(carProducter1);
        cachePool.execute(carProducter2);
        cachePool.execute(carProducter3);
        cachePool.execute(carConsumer1);
        cachePool.execute(carConsumer2);
        cachePool.execute(carConsumer3);
//  carProducter1.stop(); stop it p1 production
//  cachePool.shutdown();// Close thread pool  
    }
}

Test the execution result of class code , As shown in the figure below :

The above is the use of JDKObject Class to implement thread communication , In order to avoid being too lengthy , In the following articles, we will also introduce other ways of thread communication .

copyright:author[Bayanbulake],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/02/202202130802410342.html