观察Java的并发编程时,手写“出产者-消费者模子”是一个经典问题。有如下几个考点:
本文主要归纳了4种写法,阅读后,最亏得白板上操练几遍,查抄本身是否把握。这4种写法可能编程接口差异,可能并发粒度差异,但本质是沟通的——都是在利用或实现BlockingQueue。
出产者-消费者模子
网上有许多出产者-消费者模子的界说和实现。本文研究最常用的有界出产者-消费者模子,简朴归纳综合如下:
可通过如下条件验证模子实现的正确性:
该模子的应用和变种很是多,不赘述。
几种写法
筹备
口试时可语言说明以下筹备代码。要害部门需要实现,如AbstractConsumer。
下面会涉及多种出产者-消费者模子的实现,可以先抽象出要害的接口,并实现一些抽象类:
public interface Consumer {
void consume() throws InterruptedException;
}
public interface Producer {
void produce() throws InterruptedException;
}
abstract class AbstractConsumer implements Consumer, Runnable {
@Override
public void run() {
while (true) {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
abstract class AbstractProducer implements Producer, Runnable {
@Override
public void run() {
while (true) {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
差异的模子实现中,出产者、消费者的详细实现也差异,所以需要为模子界说抽象工场要领:
public interface Model {
Runnable newRunnableConsumer();
Runnable newRunnableProducer();
}
我们将Task作为出产和消费的单元:
public class Task {
public int no;
public Task(int no) {
this.no = no;
}
}
假如需求还不明晰(这切合大部门工程事情的实际环境),发起边实现边抽象,不要“面向将来编程”。
实现一:BlockingQueue
BlockingQueue的写法最简朴。焦点思想是,把并发和容量节制封装在缓冲区中。而BlockingQueue的性质天生满意这个要求。
public class BlockingQueueModel implements Model {
private final BlockingQueue<Task> queue;
private final AtomicInteger increTaskNo = new AtomicInteger(0);
public BlockingQueueModel(int cap) {
// LinkedBlockingQueue 的行列是 lazy-init 的,但 ArrayBlockingQueue 在建设时就已经 init
this.queue = new LinkedBlockingQueue<>(cap);
}
@Override
public Runnable newRunnableConsumer() {
return new ConsumerImpl();
}
@Override
public Runnable newRunnableProducer() {
return new ProducerImpl();
}
private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable {
@Override
public void consume() throws InterruptedException {
Task task = queue.take();
// 固按时间范畴的消费,模仿相对不变的处事器处理惩罚进程
Thread.sleep(500 + (long) (Math.random() * 500));
System.out.println("consume: " + task.no);
}
}
private class ProducerImpl extends AbstractProducer implements Producer, Runnable {
@Override
public void produce() throws InterruptedException {
// 不按期出产,模仿随机的用户请求
Thread.sleep((long) (Math.random() * 1000));
Task task = new Task(increTaskNo.getAndIncrement());
queue.put(task);
System.out.println("produce: " + task.no);
}
}
public static void main(String[] args) {
Model model = new BlockingQueueModel(3);
for (int i = 0; i < 2; i++) {
new Thread(model.newRunnableConsumer()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(model.newRunnableProducer()).start();
}
}
}
截取前面的一部门输出:
produce: 0 produce: 4 produce: 2 produce: 3 produce: 5 consume: 0 produce: 1 consume: 4 produce: 7 consume: 2 produce: 8 consume: 3 produce: 6 consume: 5 produce: 9 consume: 1 produce: 10 consume: 7