欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识

队列中堆积了 图纸加密 比较多的消息

点击: 次  来源:宝鼎软件 时间:2017-06-21

原文出处: 朱小厮

QueueingConsumer在Rabbitmq客户端3.x版本顶用的如火如荼,可是在4.x版本开初就被标志为@Deprecated,这是为什么呢?本文就此展开探讨。

在我的博文《RabbitMQ之Consumer消费模式(Push & Pull)》中讲到,Consumer的消费模式有Pull 和 Push两种,而常常用到的就是Push模式,Push模式在3.x的用法demo如下:

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer);

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [X] Received '" + message + "'");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    break;
}

在官方文档中推荐利用担任DefaultConsumer的方法:

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.getContentType();
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
});

在源码注释中有关QueueingConsumer的先容有这样一段:

QueueingConsumer was introduced to allow applications to overcome a limitation in the way Connection managed threads and consumer dispatching. When QueueingConsumer was introduced, callbacks to Consumers ware made on the Connection’s thread. This had two main drawbacks. Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the Client would deadlock.

QueuingConsumer provided client code with an easy way to obviate the problem by queueing incoming messages and processing them on a separate, application-managed thread.

The threading behaviour of Connection and Channel has been changed so that each Channel uses a distinct thread for dispatching to Consumers. This prevents Consumers on one Channel holding up Consumers on another and it also prevents recursive calls from deadlocking the client.

As such, it is now safe to implement Consumer directly of to extend DefaultConsumer and QueueingConsumer is a lot less relevant.

上面提及了两个drawbacks:

  1. the Consumer could stall the processing of all Channels on the Connection. =>QueueingConsumer会拖累Connection的所有Channels的操纵
  2. if a Consumer made a recursive synchronous call into its Channel the Client would deadlock.=>同步递归挪用时会产存亡锁

对付这两句简朴的言辞,博主没有停下追求真理的脚步,既而去github上发问,当我咨询rabbitmq-Java-client的作者时(issue @265),他是这么回覆的:

Search rabbitmq-users archives. That consumer implementation was merely a workaround for the consumer operation dispatch deficiency that no longer exists. It has significant limitations in that automatic connection recovery does not support it and when deliveries happen faster than consumers actually process them, its internal j.u.c. queue data structure can grow very large. It has been deprecated for years prior to the removal.

上面提及的rabbitmq-users的链接是:https://groups.google.com/forum/#!forum/rabbitmq-users,虽然在我大天朝是会见不了的。博主的翻墙软件也失效了,就没法search,有乐趣的小同伴search到的话贫苦奉告下(下方留言,私信,可能留下你的资料地点~)。不外作者也提交了两点:1. automatic connection recovery不支持QueueingConsumer的这种形式;2. 内存溢出问题。

对付QueueingConsumer的内存溢出问题,我在博文《[九]RabbitMQ-客户端源码之Consumer》中讲到QueueingConsumer内部其实是一个LinkBlockingQueue,它将从broker端接管到的信息先暂存到这个LinkBlockingQueue中,然后消费端措施在从这个LinkBlockingQueue中take出动静。试下一下,假如我们不take动静可能说take的很是慢,那么LinkBlockingQueue中的动静就会越来越多,最终造成内存溢出。

这里我们来看一段英文先容: You have a queue in RabbitMQ. You have some clients consuming from that queue. If you don’t set a Qos setting at all (Basic.Qos), then RabbitMQ will push all the queue’s messages to the client as fast as the network and the clients will allow. 也就是说,假如由于某些原因,行列中会萃了较量多的动静,就大概导致Consumer内存溢出卡死,于是产生恶性轮回,行列动静不绝会萃得不到消化,彻底地悲剧了。其实这个问题可以通过配置Basic.Qos来很好的办理。