欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识

会提供ACK机制( 劳务派遣管理系统 消息确认机制)

点击: 次  来源:劳务派遣管理系统 时间:2017-11-07

原文出处: 蛙牛

1.问题配景

对付动静行列任务的监听,我们一般利用Java写一个独立的措施,在Linux处事器上运行。当订阅者措施启动后,会通过动静行列客户端吸收动静,放入线程池中并发的处理惩罚。

那么问题来了,当我们修改措施后,需要从头启动时,如何担保动静都可以或许被处理惩罚呢?

一些开源的动静行列中间件,会提供ACK机制(动静确认机制),当订阅者处理惩罚完动静后,会通知处事端删除对应动静,假如订阅者呈现异常,处事端未收到确认消费,则会重试发送。

那假如动静行列中间件没有提供ACK机制,可能为了高吞怀抱的思量封锁了ACK成果,如何最大大概担保动静都可以或许被处理惩罚呢?

正常来说,订阅者措施封锁后,动静会在行列中会萃,期待订阅者下次订阅消费,昆山软件开发,所以未吸收的动静是不会丢失的。大概呈现的问题就是在封锁的一瞬间,已经从动静行列中取出,劳务派遣管理系统,但还没有被处理惩罚的动静。

因此我们需要一套滑腻封锁的机制,担保在重启的时候,已吸收的动静可以获得正常处理惩罚。

2.问题阐明

滑腻封锁的思路如下:

  • 在封锁措施时,首先封锁动静订阅,担保不再吸收新的动静。
  • 封锁线程池,期待线程池中的动静处理惩罚完毕。
  • 措施退出。
  • 封锁动静订阅:动静行列的客户端城市提供封锁毗连的要领,详细可以自行查察API。

    封锁线程池:Java的ThreadPoolExecutor线程池提供shutdown()和shutdownNow()两个要领,区别是前者会期待线程池中的动静都处理惩罚完毕,后者会直接遏制所有线程并返回未处理惩罚完的线程List。因为我们需要利用shutdown()要领举办封锁,并通过isTerminated()要领,判定线程池是否已经封锁。

    那么问题又来了,我们如何通知到措施,需要执行封锁操纵呢?

    在Linux中,历程的封锁是通过信号通报的,我们可以用kill -9 pid封锁历程,除了-9之外,我们可以通过 kill -l,查察kill呼吁的其它信号量。

    会提供ACK机制( 劳务调派打点系统 动静确认机制)

    这里提供两种封锁要领:

    1. 措施中添加Runtime.getRuntime().addShutdownHook钩子要领,SIGTERM,SIGINT,SIGHUP三种信号城市触发该要领(别离对应kill -1/kill -2/kill -15,Ctrl+C也会触发SIGINT信号)。
    2. 措施中通过Signal类注册信号监听,好比USR2(对应kill -12),在handle要领中执行封锁操纵。

    增补说明:addShutdownHook要领和handle要领中假如再挪用System.exit,会造成deadlock,使历程无法正常退出。

    伪代码别离如下

    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            //封锁订阅者
            //封锁线程池
            //退出
        }
    });
     //注册linux kill信号量  kill -12
    Signal sig = new Signal("USR2");
    Signal.handle(sig, new SignalHandler() {
        @Override
        public void handle(Signal signal) {
            //封锁订阅者
            //封锁线程池
            //退出
        }
    });

    模仿Demo

    下面通过一个demo模仿相关逻辑操纵

    首先模仿一个出产者,每秒出产5个动静

    然后模仿一个订阅者,收到动静后,放入线程池举办处理惩罚,线程池牢靠4个线程,每个线程处理惩罚时间1秒,这样线程池每秒会积存1个动静。

    package com.lujianing.demo;
    
    import sun.misc.Signal;
    import sun.misc.SignalHandler;
    import java.util.concurrent.*;
    
    /**
     * @author lujianing01@58.com
     * @Description:
     * @date 2016/11/14
     */
    public class MsgClient {
    
        //模仿消费线程池 同时4个线程处理惩罚
        private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        
        //模仿动静出产任务
        private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
        
        //用于判定是否封锁订阅
        private static volatile boolean isClose = false;
    
        public static void main(String[] args) throws InterruptedException {
        
            //注册钩子要领
            Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                    close();
                }
            });
    
            BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
            producer(queue);
            consumer(queue);
    
        }
    
        //模仿动静行列出产者
        private static void producer(final BlockingQueue  queue){
            //每200毫秒向行列中放入一个动静
            SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {
                public void run() {
                    queue.offer("");
                }
            }, 0L, 200L, TimeUnit.MILLISECONDS);
        }
    
        //模仿动静行列消费者 出产者每秒出产5个   消费者4个线程消费1个1秒  每秒积存1个
        private static void consumer(final BlockingQueue queue) throws InterruptedException {
            while (!isClose){
                getPoolBacklogSize();
                //从行列中拿到动静
                final String msg = (String)queue.take();
                //放入线程池处理惩罚
                if(!THREAD_POOL.isShutdown()) {
                    THREAD_POOL.execute(new Runnable() {
                        public void run() {
                            try {
                                //System.out.println(msg);
                                TimeUnit.MILLISECONDS.sleep(1000L);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }
    
        //查察线程池会萃动静个数
        private static long getPoolBacklogSize(){
            long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
            System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));
            return backlog;
        }
    
        private static void close(){
            System.out.println("收到kill动静,执行封锁操纵");
            //封锁订阅消费
            isClose = true;
            //封锁线程池,期待线程池积存动静处理惩罚
            THREAD_POOL.shutdown();
            //判定线程池是否封锁
            while (!THREAD_POOL.isTerminated()) {
                try {
                    //每200毫秒 判定线程池积存数量
                    getPoolBacklogSize();
                    TimeUnit.MILLISECONDS.sleep(200L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("订阅者封锁,昆山软件开发,线程池处理惩罚完毕");
        }
    
        static {
            String osName = System.getProperty("os.name").toLowerCase();
            if(osName != null && osName.indexOf("window") == -1) {
                //注册linux kill信号量  kill -12
                Signal sig = new Signal("USR2");
                Signal.handle(sig, new SignalHandler() {
                    @Override
                    public void handle(Signal signal) {
                        close();
                    }
                });
            }
        }
    
    }

    会提供ACK机制( 劳务调派打点系统 动静确认机制)