Press "Enter" to skip to content

CentOS 7 安装配置 kafka

通常来说, logstash的处理能力有限, 为了防止高峰期日志数量太高导致kafka挂掉, 一般使用kafka来缓存日志消息.

kafka依赖zookeeper, 因此需要先安装配置zookeeper, 再安装配置kafka.

系统环境:

系统统一采用CentOS 7.8 64bit

IP地址 Zookeeper安装目录 Zookeeper DATA目录 kafka安装目录 kafka内网调用域名
172.29.4.168 /data/zookeeper /data/zookeeper/data /data/kafka kafka1.zhukun.net
172.29.4.169 /data/zookeeper /data/zookeeper/data /data/kafka kafka2.zhukun.net
172.29.4.170 /data/zookeeper /data/zookeeper/data /data/kafka kafka3.zhukun.net

1, 部署并配置Zookeeper

以下操作需要同时在3台服务器上操作

$ wget http://mirror.bit.edu.cn/apache/zookeeper/stable/apache-zookeeper-3.5.8-bin.tar.gz
$ tar zxvf apache-zookeeper-3.5.8-bin.tar.gz
$ mv apache-zookeeper-3.5.8-bin /data/
$ ln -s /data/apache-zookeeper-3.5.8-bin /data/zookeeper && mkdir /data/zookeeper/data

准备zk配置文件

$ vim /data/zookeeper/conf/zoo.cfg    # 写入如下配置
dataDir=/data/zookeeper/data
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
# admin.serverPort=8080
initLimit=10
syncLimit=5
server.1=172.29.4.168:2888:3888
server.2=172.29.4.169:2888:3888
server.3=172.29.4.170:2888:3888

准备系统服务

$ vim /usr/lib/systemd/system/zookeeper.service    # 写入如下配置
[Unit]
Description=Zookeeper
Requires=network.target
After=network.target
 
[Service]
Type=forking
ExecStart=/data/zookeeper/bin/zkServer.sh start
ExecStop=/data/zookeeper/bin/zkServer.sh stop
ExecReload=/data/zookeeper/bin/zkServer.sh restart
 
[Install]
WantedBy=multi-user.target

准备myid文件(需要在3台服务器上各自执行)

$ echo "1" > /data/zookeeper/data/myid    # 仅在172.29.4.168上执行
 
$ echo "2" > /data/zookeeper/data/myid    # 仅在172.29.4.169上执行
 
$ echo "3" > /data/zookeeper/data/myid    # 仅在172.29.4.170上执行

启动服务

$ systemctl daemon-reload
$ systemctl start zookeeper
$ systemctl enable zookeeper

验证服务启动成功

$ /data/zookeeper/bin/zkServer.sh status

2, 部署并配置kafka

以下操作需要同时在3台服务器上操作

$ wget http://apache.stu.edu.tw/kafka/2.4.1/kafka_2.13-2.4.1.tgz
$ tar zxvf kafka_2.13-2.4.1.tgz
$ mv kafka_2.13-2.4.1 /data/ && ln -s /data/kafka_2.13-2.4.1 /data/kafka
 
$ cp /data/kafka/config/server.properties /data/kafka/config/server.properties.ori    # 备份原始配置文件
$ grep -v ^$ /data/kafka/config/server.properties.ori | grep -v ^# > /data/kafka/config/server.properties    # 清理配置文件
 
# 配置kafka
$ sed -i '/^zookeeper.connect=/czookeeper.connect=172.29.4.168:2181,172.29.4.169:2181,172.29.4.170:2181' /data/kafka/config/server.properties
$ sed -i '/^log.retention.hours=/clog.retention.hours=72' /data/kafka/config/server.properties
$ sed -i '/^log.dirs=/clog.dirs=\/var\/log\/kafka' /data/kafka/config/server.properties

# 下面2条命令仅在172.29.4.168上执行
$ sed -i 's/broker.id=0/broker.id=1/g' /data/kafka/config/server.properties
$ echo 'listeners=PLAINTEXT://172.29.4.168:9092' >> /data/kafka/config/server.properties
# 下面2条命令仅在172.29.4.169上执行
$ sed -i 's/broker.id=0/broker.id=2/g' /data/kafka/config/server.properties
$ echo 'listeners=PLAINTEXT://172.29.4.168:9092' >> /data/kafka/config/server.properties

# 下面2条命令仅在172.29.4.170上执行
$ sed -i 's/broker.id=0/broker.id=3/g' /data/kafka/config/server.properties
$ echo 'listeners=PLAINTEXT://172.29.4.170:9092' >> /data/kafka/config/server.properties
 
# 确保上面的配置都修改成功
$ egrep -e '(listeners|broker.id|log.retention.hours|log.dirs|zookeeper.connect)' /data/kafka/config/server.properties

准备系统服务

$ vim /usr/lib/systemd/system/kafka.service    # 写入如下内容
[Unit]
Description=Apache Kafka
Requires=zookeeper.service
After=zookeeper.service
 
[Service]
Type=simple
ExecStart=/data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties
ExecStop=/data/kafka/bin/kafka-server-stop.sh
 
[Install]
WantedBy=multi-user.target

启动服务

$ systemctl daemon-reload
$ systemctl start kafka
$ systemctl enable kafka

3, 测试kafka & 一些有用命令

我们可以在kafka里创建一个topic, 然后模拟生产者写入消息, 并模拟消费者读取消息.

# 创建一个topic
$ ./bin/kafka-topics.sh --zookeeper 172.29.4.168:2181,172.29.4.169:2181,172.29.4.170:2181 --create --replication-factor 3 --partitions 3 --topic zhukun.net
 
# 列出所有topic
$ ./bin/kafka-topics.sh --zookeeper 172.29.4.168:2181,172.29.4.169:2181,172.29.4.170:2181 --list
 
# 查看某个topic的详细信息
$ ./bin/kafka-topics.sh --zookeeper 172.29.4.168:2181,172.29.4.169:2181,172.29.4.170:2181 --describe --topic zhukun.net
 
# 接下来我们来模拟生产/消费的过程

# 模拟生产者发送消息(仅在172.29.4.168上运行)
$ ./bin/kafka-console-producer.sh --broker-list 172.29.4.168:9092 --topic zhukun.net

# 模拟消费者消费消息(仅在172.29.4.169上运行, 如果加上--from-beginning则会读取topic里的全部消息,包括以前的消息)
$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka1.zhukun.net:9092,kafka2.zhukun.net:9092,kafka3.zhukun.net:9092 --topic zhukun.net
# 此时在生产者端随便输入内容并回车, 能在消费者端看到消息内容
 
# 删除某个topic
$ ./bin/kafka-topics.sh --zookeeper 172.29.4.168:2181,172.29.4.169:2181,172.29.4.170:2181 --delete --topic zhukun.net

参考文档:
centos7部署kafka+zookeeper集群
在 CentOS 上安裝 Apache Kafka cluster

Leave a Reply

Your email address will not be published. Required fields are marked *