欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识
原文出处: Valleylord

RocketMQ 的 Java API

RocketMQ 是用 Java 语言开拓的,因此,其 Java API 相对是较量富厚的,虽然也有部门原因是 RocketMQ 自己提供的成果就较量多。RocketMQ API 提供的成果包罗,

  1. 广播消费,这个在之前已经提到过;
  2. 动静过滤,支持简朴的 Message Tag 过滤,也支持按 Message Header、body 过滤;
  3. 顺序消费和乱序消费,之前也提到过,这里的顺序消费应该指的是普通顺序性,这一点与 Kafka 沟通;
  4. Pull 模式消费,这个是相对 Push 模式来说的,Kafka 就是 Pull 模式消费;
  5. 事务动静,这个仿佛没有开源,可是 example 代码中有示例,总之,不推荐用;
  6. Tag,RocketMQ 在 Topic 下面又分了一层 Tag,用于暗示动静种别,可以用来过滤,可是顺序性照旧以 Topic 来看;

单看成果的话,纵然不算事务动静,也不算 Tag,RocketMQ 也远超 Kafka,Kafka 应该只实现了 Pull 模式消费 + 顺序消费这2个成果。RocketMQ 的代码示例在 rocketmq-example 中,留意,代码是不能直接运行的,因为所有的代码都少了配置 name server 的部门,需要本身手动加上,譬喻,producer.setNamesrvAddr("192.168.232.23:9876");

先来看一下出产者的 API,较量简朴,只有一种,如下,

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import java.util.List;
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.232.23:9876");
        producer.start();
        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest1",// topic
                        "TagA",// tag
                        "OrderID188",// key
                        ("RocketMQ "+String.format("%05d", i)).getBytes());// body
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, i));
                    System.out.println(String.format("%05d", i)+sendResult);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}

可以发明,对比 Kafka 的 API,只多了 Tag,但实际上行为有很大差异。Kafka 的出产者客户端,有同步和异步两种模式,但都是阻塞模式,send 要领返回发送状态的 Future,可以通过 Future 的 get 要领阻塞得到发送状态。而 RocketMQ 回收的是同步非阻塞模式,发送之后立即返回发送状态(而不是 Future)。正常环境下,两者利用上不同不大,可是在高可用场景中产生主备切换的时候,Kafka 的同步可以期待切换完成并重连,最后返回;而 RocketMQ 只能立即报错,昆山软件开发,由出产者选择是否重发。所以,在出产者的 API 上,劳务派遣管理系统,其实 Kafka 是要强一些的。

别的,RocketMQ 可以通过指定 MessageQueueSelector 类的实现来指定将动静发送到哪个分区去,Kafka 是通过指定出产者的 partitioner.class 参数来实现的,机动性上 RocketMQ 略胜一筹。

再来看消费者的API,由于 RocketMQ 的成果较量多,我们先看 Pull 模式消费的API,如下,

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
public class PullConsumer {
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("192.168.232.23:9876");
        consumer.start();
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
        for (MessageQueue mq : mqs) {
            System.out.println("Consume from the queue: " + mq);
            SINGLE_MQ: while (true) {
                try {
                    long offset = consumer.fetchConsumeOffset(mq, true);
                    PullResult pullResult =
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    if (null != pullResult.getMsgFoundList()) {
                        for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                            System.out.print(new String(messageExt.getBody()));
                            System.out.print(pullResult);
                            System.out.println(messageExt);
                        }
                    }
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        // TODO
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                        break SINGLE_MQ;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }
}