欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识

以及声明一个exclusive的 CAD加密 回调queue用来接收响应的消息

点击: 次  来源:宝鼎软件 时间:2017-06-01

原文出处: Listen

在上一小节中我们改造了log系统,由于利用fanout范例的exchange只能举办全局的广播,因此我们利用direct范例的exchange做了取代, 使得我们可以选择性的吸收动静。尽量利用fanout exchange改造了log系统,但它仍然有限制——不能基于多个条件做路由。

Topics

在log系统中大概不可是基于差异的日志级别作订阅,软件开发,也大概会基于日志的来历。你也许听过Unix下名为syslog的东西, 它把日志凭据严重级别(info/warn/crit…)和设备(auth/cron/ker…)举办路由。

这会给我们很多的机动性,也许我们只想监听’cron’中的’critical’级此外错误日志,以及所有’kern’中的日志。 为了实现这种日志系统,我们需要进修一个更巨大的topic范例的exchange。

Topic exchange

发送到topic exchange中的动静不能有一个任意的routing_key——它必需是一个利用点脱离的单词列表。单词可以是任意的, 可是凡是会指定动静的一些特定。一些有效的routing key例子:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。 routing key的长度限制为255个字节数。

binding key也必需是沟通的形式。topic exchange背后的逻辑雷同于direct——一条利用特定的routing key发送的动静将会被通报至所有利用与该routing key沟通的binding key举办绑定的行列中。 然而,对binding key来说有两种非凡的环境:

  1. *(star)可以取代任意一个单词
  2. #(hash)可以取代0个或多个单词

利用一张图可以很简朴地来说明:

在图中,我们将要发送被描写的动物的动静。动静的routing key将由三个单词构成(通过两个点脱离)。routing key中的第一个单词将描写速度, 第二个是颜色,第三个是物种:"<speed>.<colour>.<species>"

我们建设三个绑定:Q1利用binding key"*.orange.*"来绑定,Q2利用"*.*.rabbit"以及lazy.#绑定。

这些绑定可以被总结为:

  • Q1对所有橘色的的动物感乐趣
  • Q2想要吸收所有关于兔子的动静以及所有关于lazy的动物的动静
  • 一条利用routing key"quick.orange.rabbit"发送的动静将被同时通报到两个行列中。动静"lazy.orange.elephant"同样如此。 另一方面,"quick.orange.fox"只会被第一个queue吸收,"lazy.brown.fox"只会被第二个queue吸收。 "lazy.pink.rabbit"只会被通报到Q2一次,纵然它对两个binding key都匹配。"quick.brown.fox"与两个queue的binding key都不匹配, 因此将被扬弃。

    假如冲破我们的约定,利用一个单词可能四个单词的routing key譬喻"orange""quick.orange.male.rabbit"发送动静将会产生什么? 这些动静不会匹配任何绑定,因此会丢失。

    可是对付"lazy.orange.male.rabbit",纵然它有四个单词,可是它与第二个queue的binding key匹配,因此将会被发送到第二个queue中。

    当一个queue利用"#"(hash)作为binding key,那么它将会吸收所有的动静,忽略routing key,就仿佛利用了fanout exchange。 当非凡字符”*“(star)和”#“(hash)在绑定中没有用到,topic exchange将会与direct exchange的行为沟通。

    相识了topic exchange之后,我们将它用在我们的log系统中,我们界说的routing key将会有两个单词构成:"<facility>.<severity>"

    完成的EmitLogTopic.java

    public class EmitLogTopic {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv)
                      throws Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            String routingKey = getRouting(argv);
            String message = getMessage(argv);
    
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    
            connection.close();
        }
        //...
    }

    完整的ReceiveLogsTopic.java:

    public class ReceiveLogsTopic {
      private static final String EXCHANGE_NAME = "topic_logs";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
    
        if (argv.length < 1) {
          System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
          System.exit(1);
        }
    
        for (String bindingKey : argv) {
          channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }
    
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
    }

    运行的时候从呼吁行中输入binding key来举办绑定,吸收差异的动静。

    Remote procedure call (RPC)