下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.0/kafka_2.12-2.5.0.tgz

下载

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar zxvf kafka_2.12-2.5.0.tgz
cd kafka_2.12-2.5.0

开启服务器

# 开启zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# 开启kafka
bin/kafka-server-start.sh config/server.properties

创建一个topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

上面创建了一个叫test的topic

我们现在可以运行list topic命令查看刚才创建的topic

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

发送消息

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
这是一条信息
这是另外一条消息

开启一个消费者consumer

接收消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
这是一条信息
这是另一条信息

安装一个多broker集群,即kafka集群

# 首先为每个broker创建一个配置
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

# 然后为每个配置文件设置
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
   
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

broker.id是broker的唯一标识

现在启动另外两个节点

bin/kafka-server-start.sh config/server-1.properties &

...

bin/kafka-server-start.sh config/server-2.properties &

现在创建一个新的topic用三个副本

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

可以查看topic的描述

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic

制作systemd启动服务脚本

zookeeper启动服务脚本

mv /path/to/kafka_2.12-2.5.0 /usr/local/kafka
vim /etc/systemd/system/zookeeper.service

内容如下:

[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

kafka启动服务脚本

vim /etc/systemd/system/kafka.service

内容如下:

[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service

[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

最后reload

systemctl daemon-reload

之后就可以通过systemctl控制kafka的启动和停止了