01-生产者与消费者模式

参考资料1:http://c.biancheng.net/design_pattern/

参考资料2:https://refactoringguru.cn/design-patterns/catalog

1. Java生产者与消费者实现 - 极简版(单个商品)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public class TestProductCustomer {
public static void main(String[] args) {
Shop shop = new Shop(); // 共享资源对象
Thread p = new Thread(new Product(shop), "生产者");
Thread c = new Thread(new Custom(shop), "消费者");

p.start();
c.start();
}
}

class Goods{
private int id;
public Goods() {}
public Goods(int id) {
this.id = id;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
}

class Shop {
Goods goods;
boolean flag; // 表示商品是否充足

// 生产者调用 存 的方法
public synchronized void saveGoods(Goods goods) throws InterruptedException {
// 1.判断商品是否充足,生产者不用生产,等待消费者取+通知
if (flag) {
System.out.println("\n商品充足,等待购买中...\n");
this.wait();
}

// 2.商品不充足,生产者生产商品,存到商场里
System.out.println(Thread.currentThread().getName() + "生产-存到商场里了 " + goods.getId() + " 个商品");
this.goods = goods;
flag = true; // 已经有商品了,可以通知消费者了(消费者在等待)
this.notifyAll(); // 通知所有消费者
}
// 消费者调用 取 的方法
public synchronized void buyGoods() throws InterruptedException {
// 1.判断商品是否充足,消费者不能取,等待生产者存+通知
if (flag == false) {
System.out.println("\n商品不充足,等待生产中...\n");
this.wait();
}

// 2.商品充足,消费者购买商品,从商场里取
System.out.println(Thread.currentThread().getName() + "消费-取到商场里了 " + goods.getId() + " 个商品");
this.goods = null;
flag = false; // 已经没有商品了,可以通知生产者了(生产者在等待)
this.notifyAll(); // 通知所有生产者
}
}

class Product implements Runnable {
Shop shop;

public Product() {}
public Product(Shop shop) {
this.shop = shop;
}

public void run() {
// 循环放商品到shop里
int i = 0;
while (i++ < 10) {
try {
// 生产商品,存到商场
Thread.sleep(1000);
this.shop.saveGoods(new Goods(i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Custom implements Runnable {
Shop shop;

public Custom() {}
public Custom(Shop shop) {
this.shop = shop;
}

public void run() {
// 循环取商品从shop里
int i = 0;
while (i++ <= 10) {
try {
// 购买商品
Thread.sleep(2000);
this.shop.buyGoods();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

→ 输出结果截图

Java生产者与消费者实现

2. Java生产者与消费者实现 - 简单版(多个商品)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import java.util.ArrayList;
import java.util.List;

public class TestStorage {
public static void main(String[] args) {
Storage s = new Storage(); // 共享临界资源 - 仓库中的商品list
Thread p1 = new Thread(new Producer(s), "a厂");
Thread p2 = new Thread(new Producer(s), "b厂");
Thread p3 = new Thread(new Producer(s), "c厂");

Thread c1 = new Thread(new Customer(s), "A人");
Thread c2 = new Thread(new Customer(s), "B人");
Thread c3 = new Thread(new Customer(s), "C人");

p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
c3.start();
}
}

// 仓库
class Storage {
// 仓库容量
private final int MAX_SIZE = 10;
// 仓库共享临界资源 - 货物统计载体
private List<Object> list = new ArrayList<Object>();
public Storage() {
super();
}
public Storage(List<Object> list) {
super();
this.list = list;
}
public List<Object> getList() {
return list;
}
public void setList(List<Object> list) {
this.list = list;
}

// 生产
public void produce() throws InterruptedException {
synchronized (list) {
while (list.size() + 1 > MAX_SIZE) {
System.out.println(Thread.currentThread().getName() + " - 仓库满了...");
list.wait(); // 仓库满了,释放锁,生产动作先等着(除非被通知)
}
list.add(new Object()); // 数组元素+1即增加1个商品
System.out.println(Thread.currentThread().getName() + "生产了1个产品,库存 " + list.size());
list.notifyAll(); // 生产+1,通知等着的消费者可以获取锁
}
}
// 消费
public void custome() throws InterruptedException {
synchronized (list) {
while (list.size() == 0) {
System.out.println(Thread.currentThread().getName() + " - 仓库空了...");
list.wait(); // 仓库空了,释放锁,消费动作先等着(除非被通知)
}
list.remove(list.size() - 1); // 数组元素-1即最大下标位置
System.out.println(Thread.currentThread().getName() + "消费了1个产品,库存 " + list.size());
list.notifyAll(); // 消费-1,通知等着的生产者可以获取锁
}
}
}

// 生产者
class Producer implements Runnable {
private Storage s;
public Producer() {}
public Producer(Storage s) {
super();
this.s = s;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep((int) (Math.random() * 3000));
this.s.produce(); // 只要仓库没满,且拿到了锁,就生产+1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

// 消费者
class Customer implements Runnable {
private Storage s;
public Customer() {}
public Customer(Storage s) {
super();
this.s = s;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep((int) (Math.random() * 3000));
this.s.custome(); // 只要仓库不空,且拿到了锁,就消费-1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

→ 输出结果截图

Java生产者与消费者实现

3. BlockingQueue实现生产者消费者模式

BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是await()/signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

  • put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞
  • take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import java.util.concurrent.LinkedBlockingQueue;
public class TestProduceAndCustomer2 {
public static void main(String[] args) {
StorageQ s = new StorageQ();
Thread p1 = new Thread(new ProducerQ(s), "A厂");
Thread p2 = new Thread(new ProducerQ(s), "B厂");
Thread p3 = new Thread(new ProducerQ(s), "C厂");

Thread c1 = new Thread(new CustomerQ(s), "a人");
Thread c2 = new Thread(new CustomerQ(s), "b人");
Thread c3 = new Thread(new CustomerQ(s), "c人");
p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
c3.start();
}
}

// 仓库 - 共享资源对象
class StorageQ {
// 仓库存储的载体 - 使用无界阻塞队列,也可指定容量大小。
private LinkedBlockingQueue<Object> lbq = new LinkedBlockingQueue<>(10);
public StorageQ() {
super();
}
public StorageQ(LinkedBlockingQueue<Object> lbq) {
super();
this.lbq = lbq;
}
public LinkedBlockingQueue<Object> getLbq() {
return lbq;
}
public void setLbq(LinkedBlockingQueue<Object> lbq) {
this.lbq = lbq;
}

// 生产
public void produce() {
try{
lbq.put(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + lbq.size());
} catch (InterruptedException e){
e.printStackTrace();
}
}


// 消费
public void custome() {
try{
lbq.take();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费了一个产品,现库存" + lbq.size());
} catch (InterruptedException e){
e.printStackTrace();
}
}
}

// 生产者
class ProducerQ implements Runnable {
private StorageQ s;
public ProducerQ() {}
public ProducerQ(StorageQ s) {
this.s = s;
}
public void run() {
while (true) {
try {
Thread.sleep((int) (Math.random() * 2000));
this.s.produce(); // 没满 + 可锁 = 生产+1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

// 消费者
class CustomerQ implements Runnable {
private StorageQ s;
public CustomerQ() {}
public CustomerQ(StorageQ s) {
this.s = s;
}
public void run() {
while (true) {
try {
Thread.sleep((int) (Math.random() * 2000));
this.s.custome(); // 不空 + 可锁 = 消费-1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

→ 输出结果截图

Java队列实现生产者与消费者


01-生产者与消费者模式
https://janycode.github.io/2018/04/28/10_设计模式/01-生产者与消费者模式/
作者
Jerry(姜源)
发布于
2018年4月28日
许可协议