参考资料1:http://c.biancheng.net/design_pattern/
参考资料2:https://refactoringguru.cn/design-patterns/catalog
1. Java生产者与消费者实现 - 极简版(单个商品)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| public class TestProductCustomer { public static void main(String[] args) { Shop shop = new Shop(); Thread p = new Thread(new Product(shop), "生产者"); Thread c = new Thread(new Custom(shop), "消费者"); p.start(); c.start(); } }
class Goods{ private int id; public Goods() {} public Goods(int id) { this.id = id; } public int getId() { return id; } public void setId(int id) { this.id = id; } }
class Shop { Goods goods; boolean flag; public synchronized void saveGoods(Goods goods) throws InterruptedException { if (flag) { System.out.println("\n商品充足,等待购买中...\n"); this.wait(); } System.out.println(Thread.currentThread().getName() + "生产-存到商场里了 " + goods.getId() + " 个商品"); this.goods = goods; flag = true; this.notifyAll(); } public synchronized void buyGoods() throws InterruptedException { if (flag == false) { System.out.println("\n商品不充足,等待生产中...\n"); this.wait(); } System.out.println(Thread.currentThread().getName() + "消费-取到商场里了 " + goods.getId() + " 个商品"); this.goods = null; flag = false; this.notifyAll(); } }
class Product implements Runnable { Shop shop; public Product() {} public Product(Shop shop) { this.shop = shop; } public void run() { int i = 0; while (i++ < 10) { try { Thread.sleep(1000); this.shop.saveGoods(new Goods(i)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
class Custom implements Runnable { Shop shop; public Custom() {} public Custom(Shop shop) { this.shop = shop; } public void run() { int i = 0; while (i++ <= 10) { try { Thread.sleep(2000); this.shop.buyGoods(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
→ 输出结果截图
2. Java生产者与消费者实现 - 简单版(多个商品)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| import java.util.ArrayList; import java.util.List;
public class TestStorage { public static void main(String[] args) { Storage s = new Storage(); Thread p1 = new Thread(new Producer(s), "a厂"); Thread p2 = new Thread(new Producer(s), "b厂"); Thread p3 = new Thread(new Producer(s), "c厂"); Thread c1 = new Thread(new Customer(s), "A人"); Thread c2 = new Thread(new Customer(s), "B人"); Thread c3 = new Thread(new Customer(s), "C人"); p1.start(); p2.start(); p3.start(); c1.start(); c2.start(); c3.start(); } }
class Storage { private final int MAX_SIZE = 10; private List<Object> list = new ArrayList<Object>(); public Storage() { super(); } public Storage(List<Object> list) { super(); this.list = list; } public List<Object> getList() { return list; } public void setList(List<Object> list) { this.list = list; } public void produce() throws InterruptedException { synchronized (list) { while (list.size() + 1 > MAX_SIZE) { System.out.println(Thread.currentThread().getName() + " - 仓库满了..."); list.wait(); } list.add(new Object()); System.out.println(Thread.currentThread().getName() + "生产了1个产品,库存 " + list.size()); list.notifyAll(); } } public void custome() throws InterruptedException { synchronized (list) { while (list.size() == 0) { System.out.println(Thread.currentThread().getName() + " - 仓库空了..."); list.wait(); } list.remove(list.size() - 1); System.out.println(Thread.currentThread().getName() + "消费了1个产品,库存 " + list.size()); list.notifyAll(); } } }
class Producer implements Runnable { private Storage s; public Producer() {} public Producer(Storage s) { super(); this.s = s; } @Override public void run() { while (true) { try { Thread.sleep((int) (Math.random() * 3000)); this.s.produce(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
class Customer implements Runnable { private Storage s; public Customer() {} public Customer(Storage s) { super(); this.s = s; } @Override public void run() { while (true) { try { Thread.sleep((int) (Math.random() * 3000)); this.s.custome(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
→ 输出结果截图
3. BlockingQueue实现生产者消费者模式
BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是await()/signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。
- put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
- take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
| import java.util.concurrent.LinkedBlockingQueue; public class TestProduceAndCustomer2 { public static void main(String[] args) { StorageQ s = new StorageQ(); Thread p1 = new Thread(new ProducerQ(s), "A厂"); Thread p2 = new Thread(new ProducerQ(s), "B厂"); Thread p3 = new Thread(new ProducerQ(s), "C厂");
Thread c1 = new Thread(new CustomerQ(s), "a人"); Thread c2 = new Thread(new CustomerQ(s), "b人"); Thread c3 = new Thread(new CustomerQ(s), "c人"); p1.start(); p2.start(); p3.start(); c1.start(); c2.start(); c3.start(); } }
class StorageQ { private LinkedBlockingQueue<Object> lbq = new LinkedBlockingQueue<>(10); public StorageQ() { super(); } public StorageQ(LinkedBlockingQueue<Object> lbq) { super(); this.lbq = lbq; } public LinkedBlockingQueue<Object> getLbq() { return lbq; } public void setLbq(LinkedBlockingQueue<Object> lbq) { this.lbq = lbq; }
public void produce() { try{ lbq.put(new Object()); System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + lbq.size()); } catch (InterruptedException e){ e.printStackTrace(); } }
public void custome() { try{ lbq.take(); System.out.println("【消费者" + Thread.currentThread().getName() + "】消费了一个产品,现库存" + lbq.size()); } catch (InterruptedException e){ e.printStackTrace(); } } }
class ProducerQ implements Runnable { private StorageQ s; public ProducerQ() {} public ProducerQ(StorageQ s) { this.s = s; } public void run() { while (true) { try { Thread.sleep((int) (Math.random() * 2000)); this.s.produce(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
class CustomerQ implements Runnable { private StorageQ s; public CustomerQ() {} public CustomerQ(StorageQ s) { this.s = s; } public void run() { while (true) { try { Thread.sleep((int) (Math.random() * 2000)); this.s.custome(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
→ 输出结果截图