Kafka消费者客户端从Kafka cluster中读打动静并处理惩罚。
Kafka消费者可以手动绑定本身到某个topic的某些partition上可能通过subscribe要领监听某个topic自动绑定。Kafka消费者绑定到某个parition后就和这个partition的leader毗连,然后发出fetch request, 获打动静后举办处理惩罚。
offset打点
kafka的消费模子是一个partition最多被一个consumer消费,而offset可以有consumer节制,譬喻通过seek前进或退却到某个offset位置。
首次毗连时,可以通过KafkaConsumer设置参数里的auto.offset.reset参数抉择是从最新的位置(默认)照旧从就早的位置开始消费。
默认环境下, enable.auto.commit参数是true,即KafkaConsumer客户端会按时commit offset,所有要留意的一点是假如poll函数获得ConsumerRecords后假如处理惩罚是异步的,则大概呈现消费处理惩罚还没有完成可是却commit offset了,昆山软件开发,这时假如历程挂掉则重启后则会产生丢动静的环境。这里有两种办理方案,1是poll后的处理惩罚是同步的,这样下一次poll会实验commit offset,则能担保at least one语义。2是封锁enable.auto.commit, 然后通过KafkaConsumer.commitSync要领来手动commit offset。
max.poll.interval.ms参数用于配置kafka消费者处理惩罚一次poll的消费功效的最大时间(默认300s),假如高出了这个时间则consumer被认为挂了会从头rebalance。
Consumer线程相关
消费者多线程处理惩罚有几种方法
KafkaConsumer.subscribe
监听某个topic
subscribe(Collection topics, ConsumerRebalanceListener listener)
当消费者利用kafka cluster来打点group membership时,ConsumerRebalanceListener会在consumer rebalance时挪用,consumer rebalance产生在消费者或消费干系变革的时候
这个Listener的常见用途是生存这个partition的最新消费offset,在void onPartitionsRevoked(java.util.Collection<TopicPartition> partitions)里生存当前的partition和offset到数据库中。然后reassign完成后,昆山软件开发,void onPartitionsAssigned(java.util.Collection partitions)中从数据库读取之前的消费位置,通过seek要领配置消费位置继承消费。
Kafka.poll
public ConsumerRecords<K, V> poll(long timeout) {
// KafkaConsumer不是线程安详的
acquireAndEnsureOpen();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}
pollOnce处理惩罚
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
client.maybeTriggerWakeup();
// 协调者举办一次poll,内里会按照auto.commit.interval.ms抉择是否自动提交offset
coordinator.poll(time.milliseconds(), timeout);
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// 假如已经有record数据了直接返回
// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// 发送一次fetch请求
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
// 期待fetch请求功效
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
return Collections.emptyMap();
// 返回fetch功效
return fetcher.fetchedRecords();
}