Kafka 的东西和编程接口
Kafka 的东西
Kafka 提供的东西照旧较量全的,昆山软件开发,bin/ 目次下的东西有以下一些,
bin/connect-distributed.sh bin/kafka-consumer-offset-checker.sh bin/kafka-replica-verification.sh bin/kafka-verifiable-producer.sh bin/connect-standalone.sh bin/kafka-consumer-perf-test.sh bin/kafka-run-class.sh bin/zookeeper-security-migration.sh bin/kafka-acls.sh bin/kafka-mirror-maker.sh bin/kafka-server-start.sh bin/zookeeper-server-start.sh bin/kafka-configs.sh bin/kafka-preferred-replica-election.sh bin/kafka-server-stop.sh bin/zookeeper-server-stop.sh bin/kafka-console-consumer.sh bin/kafka-producer-perf-test.sh bin/kafka-simple-consumer-shell.sh bin/zookeeper-shell.sh bin/kafka-console-producer.sh bin/kafka-reassign-partitions.sh bin/kafka-topics.sh bin/kafka-consumer-groups.sh bin/kafka-replay-log-producer.sh bin/kafka-verifiable-consumer.sh
我常用的呼吁有以下几个,
bin/kafka-server-start.sh -daemon config/server.properties & bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1 bin/kafka-topics.sh --list --zookeeper 192.168.232.23:2181 bin/kafka-topics.sh --delete --zookeeper 192.168.232.23:2181 --topic topic1 bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1 bin/kafka-console-consumer.sh --zookeeper 192.168.232.23:2181 --topic topic1 --from-beginning bin/kafka-console-producer.sh --broker-list 192.168.232.23:9092 --topic topic1
kafka-server-start.sh 是用于 Kafka 的 Broker 启动的,主要就一个参数 config/server.properties,该文件中的设置项待会再说.尚有一个 -daemon 参数,这个是将 Kafka 放在靠山用守护历程的方法运行,假如不加这个参数,Kafka 会在运行一段时间后自动退出,听说这个是 0.10.0.0 版本才有的问题 5。kafka-topics.sh 是用于打点 Topic 的东西,我主要用的 --describe、--list、--delete、--create 这4个成果,上述的例子根基是不言自明的,--replication-factor 3、--partitions 2 这两个参数别离暗示3个副本(含 Leader),和2个分区。kafka-console-consumer.sh 和 kafka-console-producer.sh 是出产者和消费者的浅易终端东西,在调试的时候较量有用,我常用的是 kafka-console-consumer.sh。我没有用 Kafka 自带的 zookeeper,而是用的 zookeeper 官方的宣布版本 3.4.8,端口是默认2181,与 Broker 在同一台呆板上。
下面说一下 Broker 启动的设置文件 config/server.properties,我在默认设置的基本上,修改了以下一些,
broker.id=0 listeners=PLAINTEXT://192.168.232.23:9092 log.dirs=/tmp/kafka-logs delete.topic.enable=true
broker.id 是 Kafka 集群中的 Broker ID,不行反复,我在多副本的尝试中,将他们别离配置为0、1、2;listeners 是 Broker 监听的地点,默认是监听 localhost:9092,因为我不是单机尝试,所以修改为本机局域网地点,虽然,假如要监听所有地点的话,也可以配置为 0.0.0.0:9092,多副本尝试中,将监听端口别离配置为 9092、9093、9094;log.dirs 是 Broker 的 log 的目次,多副本尝试中,差异的 Broker 需要有差异的 log 目次;delete.topic.enable 设为 true 后,可以删除 Topic,而且连带 Topic 中的动静也一并删掉,不然,纵然挪用 kafka-topics.sh --delete 也无法删除 Topic,这是一个便利性的配置,对付开拓情况可以,出产情况必然要设为 false(默认)。尝试中发明, 假如有消费者在消费这个 Topic,那么也无法删除,照旧较量安详的。
剩下的东西大都在文档中也有提到。假如看一下这些剧本的话,会发明大都剧本的写法都是一致的,先做一些参数的校验,最后运行 exec $base_dir/kafka-run-class.sh XXXXXXXXX "$@",可见,这些东西都是利用运行 Java Class 的方法挪用的。
Kafka 的 Java API
在编程接口方面,官方提供了 Scala 和 Java 的接口,社区提供了更多的其他语言的接口,根基上,无论用什么语言开拓,都能找到相应的 API。下面说一下 Java 的 API 接口。
出产者的 API 只有一种,相比拟力简朴,代码如下,
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SimpleProducerDemo {
public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.232.23:9092,192.168.232.23:9093,192.168.232.23:9094");
props.put("zookeeper.connect", "192.168.232.23:2181");
props.put("client.id", "DemoProducer");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
String topic = "topic1";
Boolean isAsync = false;
int messageNo = 1;
while (true) {
String messageStr = "Message_" + String.format("%05d",messageNo);
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
++messageNo;
}
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"Send message: (" + String.format("%05d",key) + ", " + message + ") at offset "+ metadata.offset() +
" to partition(" + metadata.partition() +
") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}