1.问题配景
对付动静行列任务的监听,我们一般利用Java写一个独立的措施,在Linux处事器上运行。当订阅者措施启动后,会通过动静行列客户端吸收动静,放入线程池中并发的处理惩罚。
那么问题来了,当我们修改措施后,需要从头启动时,如何担保动静都可以或许被处理惩罚呢?
一些开源的动静行列中间件,会提供ACK机制(动静确认机制),当订阅者处理惩罚完动静后,会通知处事端删除对应动静,假如订阅者呈现异常,处事端未收到确认消费,则会重试发送。
那假如动静行列中间件没有提供ACK机制,可能为了高吞怀抱的思量封锁了ACK成果,如何最大大概担保动静都可以或许被处理惩罚呢?
正常来说,订阅者措施封锁后,动静会在行列中会萃,期待订阅者下次订阅消费,昆山软件开发,所以未吸收的动静是不会丢失的。大概呈现的问题就是在封锁的一瞬间,已经从动静行列中取出,劳务派遣管理系统,但还没有被处理惩罚的动静。
因此我们需要一套滑腻封锁的机制,担保在重启的时候,已吸收的动静可以获得正常处理惩罚。
2.问题阐明
滑腻封锁的思路如下:
封锁动静订阅:动静行列的客户端城市提供封锁毗连的要领,详细可以自行查察API。
封锁线程池:Java的ThreadPoolExecutor线程池提供shutdown()和shutdownNow()两个要领,区别是前者会期待线程池中的动静都处理惩罚完毕,后者会直接遏制所有线程并返回未处理惩罚完的线程List。因为我们需要利用shutdown()要领举办封锁,并通过isTerminated()要领,判定线程池是否已经封锁。
那么问题又来了,我们如何通知到措施,需要执行封锁操纵呢?
在Linux中,历程的封锁是通过信号通报的,我们可以用kill -9 pid封锁历程,除了-9之外,我们可以通过 kill -l,查察kill呼吁的其它信号量。

这里提供两种封锁要领:
增补说明:addShutdownHook要领和handle要领中假如再挪用System.exit,会造成deadlock,使历程无法正常退出。
伪代码别离如下
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
//封锁订阅者
//封锁线程池
//退出
}
});
//注册linux kill信号量 kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
@Override
public void handle(Signal signal) {
//封锁订阅者
//封锁线程池
//退出
}
});
模仿Demo
下面通过一个demo模仿相关逻辑操纵
首先模仿一个出产者,每秒出产5个动静
然后模仿一个订阅者,收到动静后,放入线程池举办处理惩罚,线程池牢靠4个线程,每个线程处理惩罚时间1秒,这样线程池每秒会积存1个动静。
package com.lujianing.demo;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import java.util.concurrent.*;
/**
* @author lujianing01@58.com
* @Description:
* @date 2016/11/14
*/
public class MsgClient {
//模仿消费线程池 同时4个线程处理惩罚
private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
//模仿动静出产任务
private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
//用于判定是否封锁订阅
private static volatile boolean isClose = false;
public static void main(String[] args) throws InterruptedException {
//注册钩子要领
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
close();
}
});
BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
producer(queue);
consumer(queue);
}
//模仿动静行列出产者
private static void producer(final BlockingQueue queue){
//每200毫秒向行列中放入一个动静
SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {
public void run() {
queue.offer("");
}
}, 0L, 200L, TimeUnit.MILLISECONDS);
}
//模仿动静行列消费者 出产者每秒出产5个 消费者4个线程消费1个1秒 每秒积存1个
private static void consumer(final BlockingQueue queue) throws InterruptedException {
while (!isClose){
getPoolBacklogSize();
//从行列中拿到动静
final String msg = (String)queue.take();
//放入线程池处理惩罚
if(!THREAD_POOL.isShutdown()) {
THREAD_POOL.execute(new Runnable() {
public void run() {
try {
//System.out.println(msg);
TimeUnit.MILLISECONDS.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
//查察线程池会萃动静个数
private static long getPoolBacklogSize(){
long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));
return backlog;
}
private static void close(){
System.out.println("收到kill动静,执行封锁操纵");
//封锁订阅消费
isClose = true;
//封锁线程池,期待线程池积存动静处理惩罚
THREAD_POOL.shutdown();
//判定线程池是否封锁
while (!THREAD_POOL.isTerminated()) {
try {
//每200毫秒 判定线程池积存数量
getPoolBacklogSize();
TimeUnit.MILLISECONDS.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("订阅者封锁,昆山软件开发,线程池处理惩罚完毕");
}
static {
String osName = System.getProperty("os.name").toLowerCase();
if(osName != null && osName.indexOf("window") == -1) {
//注册linux kill信号量 kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
@Override
public void handle(Signal signal) {
close();
}
});
}
}
}
