泥土巢 - JUC包之并发编程 https://www.nituchao.com/category/juc-program/ zh-CN JUC包之并发编程的相关探讨。 Tue, 23 May 2017 16:18:00 +0800 Tue, 23 May 2017 16:18:00 +0800 Java通过Exchange机制实现生产者消费者 https://www.nituchao.com/juc-program/31.html https://www.nituchao.com/juc-program/31.html Tue, 23 May 2017 16:18:00 +0800 liang Java并发API提供了一个同步辅助类---Exchanger,它允许并发线程之间交换数据。具体来说,Exchanger类允许在两个线程之间定义同步点(Synchronization Point)。当两个线程都达到同步点时,它们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,同时第二个线程的数据结构进入到第一个线程。

生产者VS消费者模型本质上就是两个线程交换数据。因此,对于只有一个生产者和一个消费者的场景,就可以使用Exchanger类。

<h3 id="design">设计思想</h3>
为了通过Exchanger类实现生产者VS消费者模型,我们在设计的时候需要考虑以下三点:

1, 生产者线程和消费者线程需要各自持有一个自己的缓冲区对象。

2, 生产者线程和消费者线程需要持有一个共同的Exchanger对象,通过该对象实现两个线程的同步和数据结构交换。

3, 消费者每次交换前,需要清空自己的数据结构,因为消费者不需要给生产者传递数据。

<h3 id="code">代码实现</h3>
基于上面的设计,分别实现了生产者线程,消费者线程,主程序。

<h4 id="code-producer">生产者线程</h4>

package com.nituchao.jvm.prosumer.exchanger;

import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * 生产者
 * Created by liang on 2016/11/26.
 */
public class BufferProducer implements Runnable {
    private List<String> buffer;
    private final Exchanger<List<String>> exchanger;

    public BufferProducer(List<String> buffer, Exchanger<List<String>> exchanger) {
        this.buffer = buffer;
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        // 循环10次数据交换
        int cycle = 1;
        for (int i = 0; i < 10; i++) {
            System.out.printf("Buffer Producer: Cycle %d\n", cycle);

            // 在每个循环中,添加10个字符串到buffer列表中
            for (int j = 0; j < 10; j++) {
                String message = "Data " + ((i * 10) + j);
                System.out.printf("Buffer Producer: %s\n", message);
                buffer.add(message);
            }

            // 调用exchange()方法与消费者进行数据交换
            try {
                buffer = exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.printf("Exchange ok, Cycle %d, Buffer Producer size: %d\n", cycle, buffer.size());
            cycle++;
        }
    }
}

<h4 id="code-consumer">消费者线程</h4>

package com.nituchao.jvm.prosumer.exchanger;

import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * 消费者
 * Created by liang on 2016/11/26.
 */
public class BufferConsumer implements Runnable {
    private List<String> buffer;
    private final Exchanger<List<String>> exchanger;

    public BufferConsumer(List<String> buffer, Exchanger<List<String>> exchanger) {
        this.buffer = buffer;
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        // 循环10次交换数据
        int cycle = 1;
        for (int i = 0; i < 10; i++) {
            System.out.printf("Buffer Consumer: Cycle %d\n", cycle);

            // 在每个循环中,调用exchange()方法与生产者同步,消费数据
            try {
                buffer = exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.printf("Exchnage ok, Cycle %d, Buffer Consumer size: %d\n", cycle, buffer.size());
            // 消费buffer中的数据,并情况列表
            for (int j = 0; j < 10; j++) {
                String message = buffer.get(0);
                System.out.println("Buffer Consumer: " + message);
                buffer.remove(0);
            }

            cycle++;
        }
    }
}

<h4 id="code-main">主程序</h4>

package com.nituchao.jvm.prosumer.exchanger;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * 主程序
 * Created by liang on 2016/11/26.
 */
public class BufferMain {
    public static void main(String[] args) {
        // 为生产者和消费者各创建一个缓冲区
        List<String> buffer1 = new ArrayList<>();
        List<String> buffer2 = new ArrayList<>();

        // 创建Exchanger对象,用来同步生产者和消费者
        Exchanger<List<String>> exchanger = new Exchanger<>();

        // 创建生产者Producer对象和消费者对象Consumer对象
        BufferProducer bufferProducer = new BufferProducer(buffer1, exchanger);
        BufferConsumer bufferConsumer = new BufferConsumer(buffer2, exchanger);

        // 创建生产者线程和消费者线程
        Thread threadProducer = new Thread(bufferProducer);
        Thread threadConsumer = new Thread(bufferConsumer);

        // 启动
        threadProducer.start();
        threadConsumer.start();
    }
}

<h3 id="code-run">运行结果</h3>
运行上面的主程序,得到的输出信息如下:

Connected to the target VM, address: '127.0.0.1:55618', transport: 'socket'
Buffer Producer: Cycle 1
Buffer Consumer: Cycle 1
Buffer Producer: Data 0
Buffer Producer: Data 1
Buffer Producer: Data 2
Buffer Producer: Data 3
Buffer Producer: Data 4
Buffer Producer: Data 5
Buffer Producer: Data 6
Buffer Producer: Data 7
Buffer Producer: Data 8
Buffer Producer: Data 9
Exchange ok, Cycle 1, Buffer Producer size: 0
Buffer Producer: Cycle 2
Exchnage ok, Cycle 1, Buffer Consumer size: 10
Buffer Producer: Data 10
Buffer Consumer: Data 0
Buffer Producer: Data 11
Buffer Consumer: Data 1
Buffer Producer: Data 12
Buffer Consumer: Data 2
Buffer Producer: Data 13
Buffer Consumer: Data 3
Buffer Producer: Data 14
Buffer Consumer: Data 4
Buffer Producer: Data 15
Buffer Consumer: Data 5
Buffer Producer: Data 16
Buffer Consumer: Data 6
Buffer Producer: Data 17
Buffer Consumer: Data 7
Buffer Producer: Data 18
Buffer Consumer: Data 8
Buffer Producer: Data 19
Buffer Consumer: Data 9
Buffer Consumer: Cycle 2
Exchnage ok, Cycle 2, Buffer Consumer size: 10
Exchange ok, Cycle 2, Buffer Producer size: 0
Buffer Producer: Cycle 3
Buffer Consumer: Data 10
Buffer Producer: Data 20
Buffer Consumer: Data 11
Buffer Producer: Data 21
Buffer Consumer: Data 12
Buffer Producer: Data 22
Buffer Consumer: Data 13
Buffer Consumer: Data 14
Buffer Consumer: Data 15
Buffer Producer: Data 23
Buffer Producer: Data 24
Buffer Consumer: Data 16
Buffer Producer: Data 25
Buffer Consumer: Data 17
Buffer Producer: Data 26
Buffer Producer: Data 27
Buffer Consumer: Data 18
Buffer Producer: Data 28
Buffer Producer: Data 29
Buffer Consumer: Data 19
Buffer Consumer: Cycle 3
Exchnage ok, Cycle 3, Buffer Consumer size: 10
Exchange ok, Cycle 3, Buffer Producer size: 0
Buffer Producer: Cycle 4
Buffer Consumer: Data 20
Buffer Producer: Data 30
Buffer Consumer: Data 21
Buffer Producer: Data 31
Buffer Consumer: Data 22
Buffer Producer: Data 32
Buffer Consumer: Data 23
Buffer Producer: Data 33
Buffer Consumer: Data 24
Buffer Producer: Data 34
Buffer Producer: Data 35
Buffer Producer: Data 36
Buffer Producer: Data 37
Buffer Consumer: Data 25
Buffer Producer: Data 38
Buffer Consumer: Data 26
Buffer Producer: Data 39
Buffer Consumer: Data 27
Buffer Consumer: Data 28
Buffer Consumer: Data 29
Buffer Consumer: Cycle 4
Exchnage ok, Cycle 4, Buffer Consumer size: 10
Buffer Consumer: Data 30
Buffer Consumer: Data 31
Exchange ok, Cycle 4, Buffer Producer size: 0
Buffer Producer: Cycle 5
Buffer Producer: Data 40
Buffer Producer: Data 41
Buffer Producer: Data 42
Buffer Producer: Data 43
Buffer Producer: Data 44
Buffer Consumer: Data 32
Buffer Producer: Data 45
Buffer Producer: Data 46
Buffer Producer: Data 47
Buffer Producer: Data 48
Buffer Producer: Data 49
Buffer Consumer: Data 33
Buffer Consumer: Data 34
Buffer Consumer: Data 35
Buffer Consumer: Data 36
Buffer Consumer: Data 37
Buffer Consumer: Data 38
Buffer Consumer: Data 39
Buffer Consumer: Cycle 5
Exchnage ok, Cycle 5, Buffer Consumer size: 10
Exchange ok, Cycle 5, Buffer Producer size: 0
Buffer Producer: Cycle 6
Buffer Consumer: Data 40
Buffer Consumer: Data 41
Buffer Consumer: Data 42
Buffer Consumer: Data 43
Buffer Producer: Data 50
Buffer Consumer: Data 44
Buffer Producer: Data 51
Buffer Consumer: Data 45
Buffer Producer: Data 52
Buffer Consumer: Data 46
Buffer Producer: Data 53
Buffer Consumer: Data 47
Buffer Producer: Data 54
Buffer Producer: Data 55
Buffer Consumer: Data 48
Buffer Producer: Data 56
Buffer Producer: Data 57
Buffer Producer: Data 58
Buffer Consumer: Data 49
Buffer Consumer: Cycle 6
Buffer Producer: Data 59
Exchange ok, Cycle 6, Buffer Producer size: 0
Buffer Producer: Cycle 7
Exchnage ok, Cycle 6, Buffer Consumer size: 10
Buffer Producer: Data 60
Buffer Consumer: Data 50
Buffer Consumer: Data 51
Buffer Consumer: Data 52
Buffer Producer: Data 61
Buffer Consumer: Data 53
Buffer Producer: Data 62
Buffer Consumer: Data 54
Buffer Producer: Data 63
Buffer Consumer: Data 55
Buffer Consumer: Data 56
Buffer Consumer: Data 57
Buffer Producer: Data 64
Buffer Consumer: Data 58
Buffer Producer: Data 65
Buffer Consumer: Data 59
Buffer Consumer: Cycle 7
Buffer Producer: Data 66
Buffer Producer: Data 67
Buffer Producer: Data 68
Buffer Producer: Data 69
Exchange ok, Cycle 7, Buffer Producer size: 0
Buffer Producer: Cycle 8
Exchnage ok, Cycle 7, Buffer Consumer size: 10
Buffer Consumer: Data 60
Buffer Producer: Data 70
Buffer Consumer: Data 61
Buffer Producer: Data 71
Buffer Consumer: Data 62
Buffer Consumer: Data 63
Buffer Producer: Data 72
Buffer Consumer: Data 64
Buffer Producer: Data 73
Buffer Consumer: Data 65
Buffer Producer: Data 74
Buffer Consumer: Data 66
Buffer Producer: Data 75
Buffer Consumer: Data 67
Buffer Producer: Data 76
Buffer Consumer: Data 68
Buffer Producer: Data 77
Buffer Consumer: Data 69
Buffer Producer: Data 78
Buffer Consumer: Cycle 8
Buffer Producer: Data 79
Exchange ok, Cycle 8, Buffer Producer size: 0
Buffer Producer: Cycle 9
Buffer Producer: Data 80
Exchnage ok, Cycle 8, Buffer Consumer size: 10
Buffer Consumer: Data 70
Buffer Consumer: Data 71
Buffer Consumer: Data 72
Buffer Consumer: Data 73
Buffer Producer: Data 81
Buffer Consumer: Data 74
Buffer Producer: Data 82
Buffer Consumer: Data 75
Buffer Producer: Data 83
Buffer Consumer: Data 76
Buffer Producer: Data 84
Buffer Consumer: Data 77
Buffer Producer: Data 85
Buffer Consumer: Data 78
Buffer Producer: Data 86
Buffer Consumer: Data 79
Buffer Producer: Data 87
Buffer Consumer: Cycle 9
Buffer Producer: Data 88
Buffer Producer: Data 89
Exchange ok, Cycle 9, Buffer Producer size: 0
Buffer Producer: Cycle 10
Buffer Producer: Data 90
Exchnage ok, Cycle 9, Buffer Consumer size: 10
Buffer Producer: Data 91
Buffer Consumer: Data 80
Buffer Producer: Data 92
Buffer Consumer: Data 81
Buffer Producer: Data 93
Buffer Consumer: Data 82
Buffer Producer: Data 94
Buffer Consumer: Data 83
Buffer Producer: Data 95
Buffer Producer: Data 96
Buffer Producer: Data 97
Buffer Producer: Data 98
Buffer Producer: Data 99
Buffer Consumer: Data 84
Buffer Consumer: Data 85
Buffer Consumer: Data 86
Buffer Consumer: Data 87
Buffer Consumer: Data 88
Buffer Consumer: Data 89
Buffer Consumer: Cycle 10
Exchnage ok, Cycle 10, Buffer Consumer size: 10
Exchange ok, Cycle 10, Buffer Producer size: 0
Buffer Consumer: Data 90
Buffer Consumer: Data 91
Buffer Consumer: Data 92
Buffer Consumer: Data 93
Buffer Consumer: Data 94
Buffer Consumer: Data 95
Buffer Consumer: Data 96
Buffer Consumer: Data 97
Buffer Consumer: Data 98
Buffer Consumer: Data 99
Disconnected from the target VM, address: '127.0.0.1:55618', transport: 'socket'

Process finished with exit code 0

<h3 id="principle">工作原理</h3>
消费者先创建一个空的缓存区,然后通过调用Exchanger与生产者同步来获得可以消费的数据。生产者从一个空的缓存列表开始执行,它创建了10个字符串,然后存储在这个缓存中,并且使用exchanger对象与消费者同步。两者共享一个exchanger对象。

在这个同步点上,两个线程(生产者和消费者)都在Exchanger里,它们交换数据结构,当消费者从exchange()方法返回的时候,它的缓存列表有10个字符串。当生产者从exchange()返回的时候,它的缓存列表是空的。这个操作将循环执行10次。

]]>
0 https://www.nituchao.com/juc-program/31.html#comments https://www.nituchao.com/feed/juc-program/31.html
Java通过重入锁实现生产者消费者 https://www.nituchao.com/juc-program/30.html https://www.nituchao.com/juc-program/30.html Wed, 17 May 2017 16:17:00 +0800 liang ReentrantLock是一个可重入的互斥锁,又被称为"独占锁",ReentrantLock锁在同一个时间点只能被一个线程锁持有,而可重入的意思是,ReentrantLock可以被单个线程多次获取,ReentrantLock的性能并不高,优点是比价灵活。ReentrantLock比Synchronized关键词更加灵活,并且能支持条件变量,后面我还会单独介绍使用条件变量实现生产者消费者模型的方法。

设计思想

本文希望同ReentrantLock来实现一个共享缓冲区,生产者线程和消费者线程通过该共享缓冲区来实现相关的生产和消费操作,每个线程对共享缓冲区的访问是互斥的。

另外,由于共享缓冲区是有空间限制的,生产者在生产前要判断共享缓冲区空间是否充足。消费者在消费前要判断共享缓冲区内的元素是否足够消费。

代码实现

根据上面的设计思想,我们需要实现共享缓冲区,生产者线程,消费者线程,主程序等四个部分。

共享缓冲区

package com.nituchao.jvm.prosumer.reentrant;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 共享缓冲区
 * Created by liang on 2016/12/30.
 */
public class Buffer {
    private final List<String> list;
    private int MAX_SIZE = 10;
    private final Lock lock;

    public Buffer() {
        this.list = new ArrayList<String>();
        this.lock = new ReentrantLock();
    }

    /**
     * 生产num个元素
     *
     * @param num
     * @return
     */
    public boolean bufferProduct(int num) {
        boolean result = true;
        try {
            lock.lock();

            // 检查缓冲区是否能够容纳要生产的元素
            if (list.size() + num <= MAX_SIZE) {
                // 开始生产
                for (int i = 0; i < num; i++) {
                    list.add(0, Thread.currentThread().getName() + ":" + (i + 1));
                }

                result = true;

                System.out.printf("Thread %s: Buffer Product succ, num is %d, buffer size is %d\n", Thread.currentThread().getName(), num, list.size());
            } else {
                result = false;

                System.out.printf("Thread %s: Buffer Product succ, num is %d, buffer size is %d\n", Thread.currentThread().getName(), num, list.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
            result = false;
        } finally {
            lock.unlock();
        }

        return result;
    }

    /**
     * 消费num个元素
     *
     * @param num
     * @return
     */
    public boolean bufferConsume(int num) {
        boolean result = false;
        try {
            lock.lock();

            // 检查缓冲区是否有足够的元素供消费
            if (list.size() >= num) {
                // 开始消费
                for (int i = 0; i < num; i++) {
                    list.remove(0);
                }

                result = true;

                System.out.printf("Thread: %s, Buffer Consume succ, num is %d, buffer size is %d\n", Thread.currentThread().getName(), num, list.size());
            } else {
                result = false;

                System.out.printf("Thread: %s, Buffer Consume fail, num is %d, buffer size is %d\n", Thread.currentThread().getName(), num, list.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
            result = false;
        } finally {
            lock.unlock();
        }

        return result;
    }
}

生产者线程

package com.nituchao.jvm.prosumer.reentrant;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 生产者线程
 * Created by liang on 2016/12/30.
 */
public class BufferProducer implements Runnable {
    private int num;
    private Buffer buffer;

    public BufferProducer(Buffer buffer, int num) {
        this.num = num;
        this.buffer = buffer;
    }

    @Override
    public void run() {
        // 生产num个元素,如果生产失败,则休眠一段时间重新生产
        while (!buffer.bufferProduct(num)) {
            try {
                TimeUnit.MILLISECONDS.sleep(new Random(1000).nextInt());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者线程

package com.nituchao.jvm.prosumer.reentrant;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 消费者线程
 * Created by liang on 2016/12/30.
 */
public class BufferConsumer implements Runnable {
    private int num;
    private Buffer buffer;

    public BufferConsumer(Buffer buffer, int num) {
        this.num = num;
        this.buffer = buffer;
    }

    @Override
    public void run() {
        // 消费num个产品,如果消费失败,则休眠一段时间再重新消费
        while (!buffer.bufferConsume(num)) {
            try {
                TimeUnit.MILLISECONDS.sleep(new Random(1000).nextInt());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

主程序

package com.nituchao.jvm.prosumer.reentrant;

/**
 * 主程序
 * Created by liang on 2016/12/30.
 */
public class BufferMain {

    public static void main(String[] args) {
        int num = 10;
        // 实例化Buffer
        Buffer buffer = new Buffer();

        // 实例化生产者和消费者线程集合
        Thread[] threadProducers = new Thread[num];
        Thread[] threadConsumers = new Thread[num];

        // 实例化生产者和消费者线程
        for (int i = 0; i < num; i++) {
            Thread threadProducer = new Thread(new BufferProducer(buffer, i + 1));
            Thread threadConsumer = new Thread(new BufferConsumer(buffer, i + 1));

            threadProducers[i] = threadProducer;
            threadConsumers[i] = threadConsumer;
        }

        // 启动生产者和消费者线程
        for (int i = 0; i < num; i++) {
            threadConsumers[i].start();

            threadProducers[i].start();
        }
    }
}

运行结果

Connected to the target VM, address: '127.0.0.1:53178', transport: 'socket'
Thread: Thread-1, Buffer Consume fail, num is 1, buffer size is 0
Thread Thread-0: Buffer Product succ, num is 1, buffer size is 1
Thread: Thread-3, Buffer Consume fail, num is 2, buffer size is 1
Thread Thread-2: Buffer Product succ, num is 2, buffer size is 3
Thread: Thread-3, Buffer Consume succ, num is 2, buffer size is 1
Thread: Thread-5, Buffer Consume fail, num is 3, buffer size is 1
Thread: Thread-5, Buffer Consume fail, num is 3, buffer size is 1
Thread: Thread-5, Buffer Consume fail, num is 3, buffer size is 1
Thread Thread-4: Buffer Product succ, num is 3, buffer size is 4
Thread: Thread-7, Buffer Consume succ, num is 4, buffer size is 0
Thread Thread-6: Buffer Product succ, num is 4, buffer size is 4
Thread: Thread-9, Buffer Consume fail, num is 5, buffer size is 4
Thread: Thread-9, Buffer Consume fail, num is 5, buffer size is 4
Thread: Thread-9, Buffer Consume fail, num is 5, buffer size is 4
Thread Thread-8: Buffer Product succ, num is 5, buffer size is 9
Thread: Thread-11, Buffer Consume succ, num is 6, buffer size is 3
Thread Thread-10: Buffer Product succ, num is 6, buffer size is 9
Thread: Thread-13, Buffer Consume succ, num is 7, buffer size is 2
Thread Thread-12: Buffer Product succ, num is 7, buffer size is 9
Thread: Thread-15, Buffer Consume succ, num is 8, buffer size is 1
Thread Thread-14: Buffer Product succ, num is 8, buffer size is 9
Thread: Thread-17, Buffer Consume succ, num is 9, buffer size is 0
Thread Thread-16: Buffer Product succ, num is 9, buffer size is 9
Thread: Thread-19, Buffer Consume fail, num is 10, buffer size is 9
Thread: Thread-19, Buffer Consume fail, num is 10, buffer size is 9
Thread: Thread-19, Buffer Consume fail, num is 10, buffer size is 9
Thread: Thread-19, Buffer Consume fail, num is 10, buffer size is 9
Thread: Thread-19, Buffer Consume fail, num is 10, buffer size is 9
Thread Thread-18: Buffer Product succ, num is 10, buffer size is 9
Thread: Thread-1, Buffer Consume succ, num is 1, buffer size is 8
Thread: Thread-5, Buffer Consume succ, num is 3, buffer size is 5
Thread: Thread-9, Buffer Consume succ, num is 5, buffer size is 0
Thread: Thread-19, Buffer Consume fail, num is 10, buffer size is 0
Thread: Thread-19, Buffer Consume fail, num is 10, buffer size is 0
Thread: Thread-19, Buffer Consume fail, num is 10, buffer size is 0
Thread: Thread-19, Buffer Consume fail, num is 10, buffer size is 0
Thread: Thread-19, Buffer Consume fail, num is 10, buffer size is 0
Disconnected from the target VM, address: '127.0.0.1:53178', transport: 'socket'
Thread: Thread-19, Buffer Consume fail, num is 10, buffer size is 0
Thread Thread-18: Buffer Product succ, num is 10, buffer size is 10
Thread: Thread-19, Buffer Consume succ, num is 10, buffer size is 0

Process finished with exit code 0

工作原理

主程序首先实例化一个共享缓冲区对象buffer,然后将该共享缓冲区对象buffer作为构造参数生成若干个生产者线程,和若干个消费者线程。

生产者线程调用共享缓冲区的bufferProduce(num)方法生产元素,在生产前要先获取锁(lock.lock()方法),并判断当前共享缓冲区是否能够容纳所有元素,如果不能容纳,则直接返回false,并释放锁。如果可以容纳,则进行生产活动,返回true,并释放锁。生产者线程在一个while循环里判断,如果生产失败(返回false),则等待一段时间,重新开始执行生产操作。如果生产成功,则结束当前线程。

消费者线程调用共享缓冲区的bufferConsume(num)方法消费元素,在生产前要先获取锁(lock.lock()方法),并判断当前共享缓冲区是否有足够的元素供消费,如果元素数量不够,则直接返回false,并释放锁。如果元素足够,则进行消费活动,返回true,并释放锁。消费者线程在一个while循环里判断,如果消费失败(返回false),则等待一段时间,重新开始执行消费操作。如果消费成功,则结束当前线程。

]]>
0 https://www.nituchao.com/juc-program/30.html#comments https://www.nituchao.com/feed/juc-program/30.html
Java通过信号量机制实现生产者消费者 https://www.nituchao.com/juc-program/29.html https://www.nituchao.com/juc-program/29.html Tue, 16 May 2017 16:16:00 +0800 liang 信号量是一种计数器,用来保护一个或者多个共享资源的访问。Java提供了Semaphore类来实现信号量机制。

如果线程要访问一个共享资源,它必须先获得信号量。如果信号量的内部计数器大于0,信号量将减1,然后允许访问这个共享资源。计数器大于0意味着有可以使用的资源,因此线程将被允许使用其中一个资源。

否则,如果信号量的计数器等于0,信号量将会把线程置入休眠直至计数器大于0.计数器等于0的时候意味着所有的共享资源已经被其他线程使用了,所以需要访问这个共享资源的线程必须等待。

当线程使用完某个共享资源时,信号量必须被释放,以便其他线程能够访问共享资源。释放操作将使信号量的内部计数器增加1。

设计思想

为了使用信号量机制来实现生产者VS消费者模型,我们需要实例化一个二进制信号量对象,即内部计数器只有0和1两个值。多个生产者线程和多个消费者线程竞争这个信号量来互斥访问共享缓冲区。

另外,由于共享缓冲区是有空间限制的,生产者在生产前要判断共享缓冲区空间是否充足。消费者在消费前要判断共享缓冲区内的元素是否足够消费。

代码实现

基于上面的设计思想,我们需要实现共享缓冲区,生产者线程,消费者线程和主程序四部分。

共享缓冲区

package com.nituchao.jvm.prosumer.semaphore;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

/**
 * 缓冲区
 * Created by liang on 2016/12/29.
 */
public class Buffer {
    private int MAX_SIZE = 100;
    private final List<String> list;
    private final Semaphore semaphore;

    public Buffer() {
        this.list = new ArrayList<>();
        this.semaphore = new Semaphore(1);
    }

    /**
     * 生产Buffer元素
     *
     * @param num
     */
    public boolean BufferProduct(int num) {
        boolean result = true;

        try {
            // 获取信号量
            semaphore.acquire();

            // 如果缓冲区能够容纳要生成的元素,允许生产
            if (list.size() + num <= MAX_SIZE) {
                // 开始生产
                for (int i = 1; i <= num; i++) {
                    list.add(0, Thread.currentThread().getName() + " : " + i);
                }

                result = true;

                System.out.printf("Thread %s: Buffer Product succ, num is %d, buffer size is %d\n", Thread.currentThread().getName(), num, list.size());
            } else {
                // 缓冲区无法容纳要生成的元素,禁止生产
                result = false;

                System.out.printf("Thread %s: Buffer Product fail, num is %d, buffer size is %d\n", Thread.currentThread().getName(), num, list.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }

        return result;
    }

    /**
     * 消费Buffer元素
     *
     * @param num
     * @return
     */
    public boolean BufferConsume(int num) {
        boolean result = true;

        try {
            // 获取信号量
            semaphore.acquire();

            // 如果缓冲区中有足够的元素消费,允许消费
            if (list.size() - num >= 0) {
                // 开始消费
                for (int i = 1; i <= num; i++) {
                    String element = list.remove(0);
                }

                result = true;

                System.out.printf("Thread: %s, Buffer Consume succ, num is %d, buffer size is %d\n", Thread.currentThread().getName(), num, list.size());
            } else {
                // 缓冲取没有足够的元素供消费,禁止消费
                result = false;

                System.out.printf("Thread: %s, Buffer Consume fail, num is %d, buffer size is %d\n", Thread.currentThread().getName(), num, list.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }

        return result;
    }
}

生产者线程

package com.nituchao.jvm.prosumer.semaphore;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 生产者线程
 * Created by liang on 2016/12/29.
 */
public class BufferProducer implements Runnable {
    private Buffer buffer;
    private int num;

    public BufferProducer(Buffer buffer, int num) {
        this.buffer = buffer;
        this.num = num;
    }

    @Override
    public void run() {
        while (!buffer.BufferProduct(num)) {
            try {
                TimeUnit.MILLISECONDS.sleep(new Random(1000).nextInt());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者线程

package com.nituchao.jvm.prosumer.semaphore;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 消费者线程
 * Created by liang on 2016/12/29.
 */
public class BufferConsumer implements Runnable {
    private Buffer buffer;
    private int num;

    public BufferConsumer(Buffer buffer, int num) {
        this.buffer = buffer;
        this.num = num;
    }

    @Override
    public void run() {
        while (!buffer.BufferConsume(num)) {
            try {
                TimeUnit.MILLISECONDS.sleep(new Random(1000).nextInt());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

主程序

package com.nituchao.jvm.prosumer.semaphore;

/**
 * 主程序
 * Created by liang on 2016/12/29.
 */
public class BufferMain {

    public static void main(String[] args) {
        int num = 10;

        // 初始化Buffer对象
        Buffer buffer = new Buffer();

        // 实例化num个生产者和消费者线程
        Thread[] threadProducers = new Thread[num];
        Thread[] threadConsumers = new Thread[num];

        for (int i = 0; i < num; i++) {
            threadProducers[i] = new Thread(new BufferProducer(buffer, i + 1));
            threadConsumers[i] = new Thread(new BufferConsumer(buffer, i + 1));
        }

        // 分别启动生产者和消费者线程
        for (int i = 0; i < num; i++) {
            // 故意先开始消费
            threadConsumers[i].start();

            // 故意后开始生产
            threadProducers[i].start();
        }
    }
}

运行结果

Connected to the target VM, address: '127.0.0.1:61012', transport: 'socket'
Thread: Thread-1, Buffer Consume fail, num is 1, buffer size is 0
Thread Thread-0: Buffer Product succ, num is 1, buffer size is 1
Thread: Thread-3, Buffer Consume fail, num is 2, buffer size is 1
Thread Thread-2: Buffer Product succ, num is 2, buffer size is 3
Thread: Thread-5, Buffer Consume succ, num is 3, buffer size is 0
Thread Thread-4: Buffer Product succ, num is 3, buffer size is 3
Thread: Thread-7, Buffer Consume fail, num is 4, buffer size is 3
Thread: Thread-7, Buffer Consume fail, num is 4, buffer size is 3
Thread: Thread-7, Buffer Consume fail, num is 4, buffer size is 3
Thread: Thread-7, Buffer Consume fail, num is 4, buffer size is 3
Thread: Thread-7, Buffer Consume fail, num is 4, buffer size is 3
Thread: Thread-7, Buffer Consume fail, num is 4, buffer size is 3
Thread Thread-6: Buffer Product succ, num is 4, buffer size is 7
Thread: Thread-9, Buffer Consume succ, num is 5, buffer size is 2
Thread Thread-8: Buffer Product succ, num is 5, buffer size is 7
Thread: Thread-11, Buffer Consume succ, num is 6, buffer size is 1
Thread Thread-10: Buffer Product succ, num is 6, buffer size is 7
Thread: Thread-13, Buffer Consume succ, num is 7, buffer size is 0
Thread Thread-12: Buffer Product succ, num is 7, buffer size is 7
Thread: Thread-15, Buffer Consume fail, num is 8, buffer size is 7
Thread: Thread-15, Buffer Consume fail, num is 8, buffer size is 7
Thread: Thread-15, Buffer Consume fail, num is 8, buffer size is 7
Thread: Thread-15, Buffer Consume fail, num is 8, buffer size is 7
Thread: Thread-15, Buffer Consume fail, num is 8, buffer size is 7
Thread Thread-14: Buffer Product succ, num is 8, buffer size is 15
Thread: Thread-17, Buffer Consume succ, num is 9, buffer size is 6
Thread Thread-16: Buffer Product succ, num is 9, buffer size is 15
Thread: Thread-19, Buffer Consume succ, num is 10, buffer size is 5
Thread Thread-18: Buffer Product succ, num is 10, buffer size is 15
Thread: Thread-3, Buffer Consume succ, num is 2, buffer size is 13
Thread: Thread-1, Buffer Consume succ, num is 1, buffer size is 12
Thread: Thread-7, Buffer Consume succ, num is 4, buffer size is 8
Thread: Thread-15, Buffer Consume succ, num is 8, buffer size is 0
Disconnected from the target VM, address: '127.0.0.1:61012', transport: 'socket'

Process finished with exit code 0

工作原理

主程序首先实例化一个二进制信号量对象semaphore,指定该信号量内部计数器只有0和1,这样可以保证同一时间只有一个线程能够访问共享缓冲区。这样一来,共享缓冲区buffer本质上就成为一个队列。

主程序实例化num个生产者线程和num个消费者线程。

生产者线程负责向缓冲区里生产元素,每个生产者生产的数量都各不相同,从而能更好的观察效果。生产者首先会去尝试获取信号量(acquire),然后检查当前共享缓冲区是否有足够空间容纳自己可生产的元素。如果有,则进行生产,如果没有,则不进行生产。最后,释放信号量(release)并把生产的结果返回(boolen)。生产者线程如果发现自己生产失败,则会随机休眠一段时间再重复上面的操作,直到生产操作成功。

消费者线程负责从缓冲区里消费元素,每个消费者消费的数量都各不相同,从而能更好的观察效果。消费者首先回去尝试获取信号量(acquire),然后检查当前共享缓冲区中是否有足够的元素供自己消费。如果有,则进行消费,如果没有,则不进行消费。最后,释放信号量(release)并把消费的结果返回(boolean)。消费者线程如果发现自己消费失败,则会随机休眠一段时间再重复上面的操作,直到消费操作成功。

生产者线程和消费者线程访问是信号量是公平竞争,第一个获取信号量的线程将能够访问临界区,其余的线程将被信号量阻塞,直到信号量被释放。一旦信号量被释放,被被阻塞的线程就可以重新竞争信号量并访问临界区。

]]>
0 https://www.nituchao.com/juc-program/29.html#comments https://www.nituchao.com/feed/juc-program/29.html
Java通过对象同步机制实现生产者消费者 https://www.nituchao.com/juc-program/28.html https://www.nituchao.com/juc-program/28.html Fri, 05 May 2017 16:16:00 +0800 liang 每个Java类都是从Object类派生出来的,Object类原生提供了wait(),notify(),notifyAll()等方法来实现线程间的同步控制。

进一步讲,每个对象都能当做一个锁,每个对象也能当做一个条件队列,对象中的wait(), notify(), notifyAll()方法构成了内部条件队列的API,而队列正是生产者消费者模型的一个关键元素。当对象调用wait()方法时,当前线程会释放获得的对象锁,同时,当前对象会请求操作系统挂起当前线程,此时对象的对象锁就可用了,允许其他等待线程进入。当对象调用notify()或者notifyAll()方法时,当前线程也会释放获得的对象锁,同时,操作系统会结束当前线程的执行,并从阻塞在该对象上的线程列表中选择一个进行唤醒,该线程会获得对线锁并被让操作系统调度。

设计思想

为了设计基于Java对象同步机制的生产者消费者程序,并且是多个生产者线程VS多个消费者线程,可以从以下三点出发。

首先,我们需要有一个缓冲区来充当生产者和消费者之间交换数据的媒介,这个缓冲区可以是一个普通的列表对象,我们在该列表对象上进行生产者和消费者的互斥访问控制。本质上讲,这个缓冲器就相当于一个队列,一方面允许生产者往里面添加数据,一方面允许消费者从里面取走数据。这个列表对象非常重要,所有的wait()和notify()操作以及对象锁的控制都是针对该对象的。因此,这是一个共享对象,在各个生产者线程和消费者线程之间充当媒介。

其次,我们需要有若干个生产者线程。生产者线程要争夺缓冲区对象锁,如果未得到锁则wait()进入阻塞等待被唤醒。如果得到了对象锁,还要判断缓冲区是否已满。如果缓冲区已满,则要wait()进入阻塞等待被唤醒,并释放缓冲区对象锁。如果缓冲区未满,则可以进行生产活动,结束后释放缓冲区对象锁,并唤醒一个阻塞的线程。

最后,我们需要有若干个消费者线程。消费者线程要争夺缓冲区对象锁,如果未得到锁则wait()进行阻塞等待被唤醒。如果得到了对象锁,还要判断缓冲区里的元素是否满足自己的需要。如果缓冲区里的元素不够自己消费,则要wait()进入阻塞等待被唤醒,并释放缓冲区对象锁。如果缓冲区里的元素满足自己的需要,则进行消费操作,结束后释放缓冲区对象锁,并唤醒一个阻塞的线程。

代码实现

基于上面的设计,我们需要实现共享缓冲区,生产者线程,消费者线程和主程序四部分。

共享缓冲区

package com.nituchao.jvm.prosumer.objectsync;

import java.util.Date;
import java.util.LinkedList;

/**
 * 共享缓冲区
 * Created by liang on 2016/12/15.
 */
public class Buffer {
    private static final int MAX_SIZE = 100;
    private LinkedList<String> list;

    public Buffer(LinkedList<String> list) {
        this.list = list;
    }

    /**
     * 生产n个产品
     *
     * @param num
     * @throws InterruptedException
     */
    public void BufferProduce(int num) throws InterruptedException {
        synchronized (list) {
            while (list.size() + num > MAX_SIZE) {
                System.out.println("【要生产的产品数量】:" + num + "/t【库存量】:" + list.size() + "/t暂时不能执行生产任务!");

                // list进入等待状态
                list.wait();
            }

            // 生产n个产品
            for (int i = 0; i < num; i++) {
                list.add(new Date() + " : " + i);
            }

            System.out.println("【已经生产产品数】:" + num + "/t【现仓储量为】:" + list.size());

            list.notifyAll();
        }
    }

    /**
     * 消费n个产品
     *
     * @param num
     * @throws InterruptedException
     */
    public void bufferConsume(int num) throws InterruptedException {
        synchronized (list) {
            while (list.size() < num) {
                System.out.println("【要消费的产品数量】:" + num + "/t【库存量】:" + list.size() + " /t暂时不能执行消费任务!");

                // list进入等待状态
                list.wait();
            }

            // 消费n个产品
            for (int i = 0; i < num; i++) {
                list.removeFirst();
            }

            System.out.println("【已经消费产品数】:" + num + "/t【现仓储量为】:" + list.size());

            list.notifyAll();
        }
    }
}

生产者线程

package com.nituchao.jvm.prosumer.objectsync;

/**
 * 生产者线程
 * Created by liang on 2016/12/30.
 */
public class BufferProducer implements Runnable {
    private Buffer buffer;
    private int num;

    public BufferProducer(Buffer buffer, int num) {
        this.buffer = buffer;
        this.num = num;
    }

    @Override
    public void run() {
        try {
            buffer.BufferProduce(num);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者线程

package com.nituchao.jvm.prosumer.objectsync;

/**
 * 消费者线程
 * Created by liang on 2016/12/30.
 */
public class BufferConsumer implements Runnable {
    private Buffer buffer;
    private int num;

    public BufferConsumer(Buffer buffer, int num) {
        this.buffer = buffer;
        this.num = num;
    }

    @Override
    public void run() {
        try {
            buffer.bufferConsume(num);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

主程序

package com.nituchao.jvm.prosumer.objectsync;

import java.util.LinkedList;

/**
 * 主程序
 * Created by liang on 2016/12/30.
 */
public class BufferMain {
    public static void main(String[] args) {
        int num = 10;
        LinkedList<String> list = new LinkedList<>();
        Thread[] threadProducers = new Thread[num];
        Thread[] threadConsumers = new Thread[num];
        Buffer buffer = new Buffer(list);

        for (int i = 0; i < num; i++) {
            Thread threadTmp = new Thread(new BufferProducer(buffer, 10 + i));
            threadProducers[i] = threadTmp;
        }

        for (int i = 0; i < num; i++) {
            Thread threadTmp = new Thread(new BufferConsumer(buffer, 10 + i));
            threadConsumers[i] = threadTmp;
        }

        for (int i = 0; i < num; i++) {
            threadProducers[i].start();
            threadConsumers[i].start();
        }

        System.out.printf("list size is [%d].\n", list.size());
    }
}

运行结果

【要消费的产品数量】:10/t【库存量】:0 /t暂时不能执行消费任务!
【要消费的产品数量】:12/t【库存量】:0 /t暂时不能执行消费任务!
【要消费的产品数量】:13/t【库存量】:0 /t暂时不能执行消费任务!
list size is [0].
【已经生产产品数】:12/t【现仓储量为】:12
【已经消费产品数】:11/t【现仓储量为】:1
【已经生产产品数】:11/t【现仓储量为】:12
【已经生产产品数】:10/t【现仓储量为】:22
【已经消费产品数】:13/t【现仓储量为】:9
【要消费的产品数量】:12/t【库存量】:9 /t暂时不能执行消费任务!
【要消费的产品数量】:10/t【库存量】:9 /t暂时不能执行消费任务!
【要消费的产品数量】:19/t【库存量】:9 /t暂时不能执行消费任务!
【已经生产产品数】:19/t【现仓储量为】:28
【已经消费产品数】:18/t【现仓储量为】:10
【已经生产产品数】:18/t【现仓储量为】:28
【已经消费产品数】:17/t【现仓储量为】:11
【已经生产产品数】:17/t【现仓储量为】:28
【已经消费产品数】:16/t【现仓储量为】:12
【已经生产产品数】:16/t【现仓储量为】:28
【已经消费产品数】:15/t【现仓储量为】:13
【已经生产产品数】:15/t【现仓储量为】:28
【已经消费产品数】:14/t【现仓储量为】:14
【已经生产产品数】:14/t【现仓储量为】:28
【已经生产产品数】:13/t【现仓储量为】:41
【已经消费产品数】:19/t【现仓储量为】:22
【已经消费产品数】:10/t【现仓储量为】:12
【已经消费产品数】:12/t【现仓储量为】:0

Process finished with exit code 0

工作原理

共享缓冲区是此程序的核心,它里面内聚了一个普通的列表list。

在生产产品时,首先通过synchronized关键字获取列表list的对象锁,然后进行业务判断,如果当前集合的剩余空间无法容纳要生产的元素,则调用list.wait(),进入等待状态,并释放list对象锁,使其他线程有机会获取对象锁。如果可以生产,则进行相关操作,最后通过调用list.notifyAll()来唤醒等待状态中的线程,并释放list对象锁。

在消费产品时,首先通过synchronized关键字获取列表list的对象锁,然后进行业务判断,如果当前集合中没有足够的元素供消费,则调用list.wait(),进入等待状态,并释放list对象锁,使其他线程有机会获取对象锁。如果可以消费,则进行相关操作,最后通过调用list.notifyAll()来唤醒等待状态中的线程,并释放对象锁。

由于缓冲池实现了共享数据结构的访问和同步,因此生产者线程和消费者线程的实现就相对简单,只是调用共享缓冲池的生产方法和消费方法。

使用对象锁实现生产者消费者的另一个关键点在于,生产者和消费者要使用同一个共享缓冲池对象,即buffer对象。

]]>
0 https://www.nituchao.com/juc-program/28.html#comments https://www.nituchao.com/feed/juc-program/28.html