欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 宝鼎售后问题提交 | 后台管理


新闻资讯

MENU

软件开发知识

RabbitMQ指南 昆山软件开发 (上)

点击: 次  来源:宝鼎软件 时间:2017-06-01

原文出处: Listen

RabbitMQ是一个动静中间件,在一些需要异步处理惩罚、宣布/订阅等场景的时候,利用RabbitMQ可以完成我们的需求。 下面是我在进修RabbitMQ的进程中的一些记录,内容主要翻译自RabbitMQ官网的Tutorials, 再加上我的一些小我私家领略。我将会用三篇文章来从RabbitMQ的Hello World先容起,到最后的通过RabbitMQ实现RPC挪用, 相信看完这三篇文章各人应该会对RabbitMQ的根基观念和利用有必然的相识。

说明:

  1. 由于RabbitMQ支持很多种语言的client,在这里我利用的是Java语言的client。
  2. 所有的图片均来自RabbitMQ官网。

Hello World

首先需要安装RabbitMQ,关于RabbitMQ的安装这里就不赘述了,可以到RabbitMQ的官网去看相应的OS的安装要领。 安装完成后利用rabbitmq-server即可启动RabbitMQ,RabbitMQ还提供了一个UI打点界面,当地默认的地点为localhost:15672, 用户名和暗码均为guest。

安装完成之后,凭据老例,先来完成一个简朴的Hello World的例子。 最简朴的一种动静发送的模子为一个动静发送者(Producer)将动静发送到Queue中,另一端的动静接管者(Consumer)从Queue中接管动静, 大抵模子如下图所示:

RabbitMQ

先来看发送的代码,新建一个类定名为Send.java,代码的第一步为毗连server

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

connection抽象了socket的毗连,而且为我们处理惩罚了协议版本的协商、权限认证等等。这里我们毗连的是当地的中间件, 也就是localhost,接下来我们建设一个channel,这是大大都API完成任务的地址,也就是说我们的API操纵根基都是通过channel来完成的。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

首先是通过channel来声明一个queue,而且声明queue的操纵是幂等的,也等于说只有在这个queue不存在的环境下才会新建设一个queue。 这里发送一个Hello World!的动静,实际通报的动静内容为字节数组。

channel.close();
connection.close();

最后封锁channel和connection的毗连,留意封锁的顺序,是先封锁channel的毗连,再封锁connection的毗连。

完整的Send.java代码

public class Send {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }

}

完成发送的代码之后是接管动静的代码,新建一个类为Recv.java

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

      Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
            throws IOException {
          String message = new String(body, "UTF-8");
          System.out.println(" [x] Received '" + message + "'");
        }
      };
      channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

可以发明一开始的毗连部门的代码是沟通的,在吸收的时候我们也要声明一个queue,软件开发,留意这里queue的名称和之前发送动静声明的queue的名称必需是沟通的, 不然就收不到动静了。