RocketMQ 的 Java API
RocketMQ 是用 Java 语言开拓的,因此,其 Java API 相对是较量富厚的,虽然也有部门原因是 RocketMQ 自己提供的成果就较量多。RocketMQ API 提供的成果包罗,
单看成果的话,纵然不算事务动静,也不算 Tag,RocketMQ 也远超 Kafka,Kafka 应该只实现了 Pull 模式消费 + 顺序消费这2个成果。RocketMQ 的代码示例在 rocketmq-example 中,留意,代码是不能直接运行的,因为所有的代码都少了配置 name server 的部门,需要本身手动加上,譬喻,producer.setNamesrvAddr("192.168.232.23:9876");。
先来看一下出产者的 API,较量简朴,只有一种,如下,
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import java.util.List;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.232.23:9876");
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"OrderID188",// key
("RocketMQ "+String.format("%05d", i)).getBytes());// body
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, i));
System.out.println(String.format("%05d", i)+sendResult);
}
}
catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
可以发明,对比 Kafka 的 API,只多了 Tag,但实际上行为有很大差异。Kafka 的出产者客户端,有同步和异步两种模式,但都是阻塞模式,send 要领返回发送状态的 Future,可以通过 Future 的 get 要领阻塞得到发送状态。而 RocketMQ 回收的是同步非阻塞模式,发送之后立即返回发送状态(而不是 Future)。正常环境下,两者利用上不同不大,可是在高可用场景中产生主备切换的时候,Kafka 的同步可以期待切换完成并重连,最后返回;而 RocketMQ 只能立即报错,昆山软件开发,由出产者选择是否重发。所以,在出产者的 API 上,劳务派遣管理系统,其实 Kafka 是要强一些的。
别的,RocketMQ 可以通过指定 MessageQueueSelector 类的实现来指定将动静发送到哪个分区去,Kafka 是通过指定出产者的 partitioner.class 参数来实现的,机动性上 RocketMQ 略胜一筹。
再来看消费者的API,由于 RocketMQ 的成果较量多,我们先看 Pull 模式消费的API,如下,
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
public class PullConsumer {
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("192.168.232.23:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ: while (true) {
try {
long offset = consumer.fetchConsumeOffset(mq, true);
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
if (null != pullResult.getMsgFoundList()) {
for (MessageExt messageExt : pullResult.getMsgFoundList()) {
System.out.print(new String(messageExt.getBody()));
System.out.print(pullResult);
System.out.println(messageExt);
}
}
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
// TODO
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
return offset;
return 0;
}
}