通常来说, logstash的处理能力有限, 为了防止高峰期日志数量太高导致kafka挂掉, 一般使用kafka来缓存日志消息.
kafka依赖zookeeper, 因此需要先安装配置zookeeper, 再安装配置kafka.
系统环境:
系统统一采用CentOS 7.8 64bit
IP地址 | Zookeeper安装目录 | Zookeeper DATA目录 | kafka安装目录 | kafka内网调用域名 |
172.29.4.168 | /data/zookeeper | /data/zookeeper/data | /data/kafka | kafka1.zhukun.net |
172.29.4.169 | /data/zookeeper | /data/zookeeper/data | /data/kafka | kafka2.zhukun.net |
172.29.4.170 | /data/zookeeper | /data/zookeeper/data | /data/kafka | kafka3.zhukun.net |
1, 部署并配置Zookeeper
以下操作需要同时在3台服务器上操作
$ wget http://mirror.bit.edu.cn/apache/zookeeper/stable/apache-zookeeper-3.5.8-bin.tar.gz $ tar zxvf apache-zookeeper-3.5.8-bin.tar.gz $ mv apache-zookeeper-3.5.8-bin /data/ $ ln -s /data/apache-zookeeper-3.5.8-bin /data/zookeeper && mkdir /data/zookeeper/data
准备zk配置文件
$ vim /data/zookeeper/conf/zoo.cfg # 写入如下配置 dataDir=/data/zookeeper/data clientPort=2181 maxClientCnxns=0 admin.enableServer=false # admin.serverPort=8080 initLimit=10 syncLimit=5 server.1=172.29.4.168:2888:3888 server.2=172.29.4.169:2888:3888 server.3=172.29.4.170:2888:3888
准备系统服务
$ vim /usr/lib/systemd/system/zookeeper.service # 写入如下配置 [Unit] Description=Zookeeper Requires=network.target After=network.target [Service] Type=forking ExecStart=/data/zookeeper/bin/zkServer.sh start ExecStop=/data/zookeeper/bin/zkServer.sh stop ExecReload=/data/zookeeper/bin/zkServer.sh restart [Install] WantedBy=multi-user.target
准备myid文件(需要在3台服务器上各自执行)
$ echo "1" > /data/zookeeper/data/myid # 仅在172.29.4.168上执行 $ echo "2" > /data/zookeeper/data/myid # 仅在172.29.4.169上执行 $ echo "3" > /data/zookeeper/data/myid # 仅在172.29.4.170上执行
启动服务
$ systemctl daemon-reload $ systemctl start zookeeper $ systemctl enable zookeeper
验证服务启动成功
$ /data/zookeeper/bin/zkServer.sh status
2, 部署并配置kafka
以下操作需要同时在3台服务器上操作
$ wget http://apache.stu.edu.tw/kafka/2.4.1/kafka_2.13-2.4.1.tgz $ tar zxvf kafka_2.13-2.4.1.tgz $ mv kafka_2.13-2.4.1 /data/ && ln -s /data/kafka_2.13-2.4.1 /data/kafka $ cp /data/kafka/config/server.properties /data/kafka/config/server.properties.ori # 备份原始配置文件 $ grep -v ^$ /data/kafka/config/server.properties.ori | grep -v ^# > /data/kafka/config/server.properties # 清理配置文件 # 配置kafka $ sed -i '/^zookeeper.connect=/czookeeper.connect=172.29.4.168:2181,172.29.4.169:2181,172.29.4.170:2181' /data/kafka/config/server.properties $ sed -i '/^log.retention.hours=/clog.retention.hours=72' /data/kafka/config/server.properties $ sed -i '/^log.dirs=/clog.dirs=\/var\/log\/kafka' /data/kafka/config/server.properties # 下面2条命令仅在172.29.4.168上执行 $ sed -i 's/broker.id=0/broker.id=1/g' /data/kafka/config/server.properties $ echo 'listeners=PLAINTEXT://172.29.4.168:9092' >> /data/kafka/config/server.properties # 下面2条命令仅在172.29.4.169上执行 $ sed -i 's/broker.id=0/broker.id=2/g' /data/kafka/config/server.properties $ echo 'listeners=PLAINTEXT://172.29.4.168:9092' >> /data/kafka/config/server.properties # 下面2条命令仅在172.29.4.170上执行 $ sed -i 's/broker.id=0/broker.id=3/g' /data/kafka/config/server.properties $ echo 'listeners=PLAINTEXT://172.29.4.170:9092' >> /data/kafka/config/server.properties # 确保上面的配置都修改成功 $ egrep -e '(listeners|broker.id|log.retention.hours|log.dirs|zookeeper.connect)' /data/kafka/config/server.properties
准备系统服务
$ vim /usr/lib/systemd/system/kafka.service # 写入如下内容 [Unit] Description=Apache Kafka Requires=zookeeper.service After=zookeeper.service [Service] Type=simple ExecStart=/data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties ExecStop=/data/kafka/bin/kafka-server-stop.sh [Install] WantedBy=multi-user.target
启动服务
$ systemctl daemon-reload $ systemctl start kafka $ systemctl enable kafka
3, 测试kafka & 一些有用命令
我们可以在kafka里创建一个topic, 然后模拟生产者写入消息, 并模拟消费者读取消息.
# 创建一个topic $ ./bin/kafka-topics.sh --zookeeper 172.29.4.168:2181,172.29.4.169:2181,172.29.4.170:2181 --create --replication-factor 3 --partitions 3 --topic zhukun.net # 列出所有topic $ ./bin/kafka-topics.sh --zookeeper 172.29.4.168:2181,172.29.4.169:2181,172.29.4.170:2181 --list # 查看某个topic的详细信息 $ ./bin/kafka-topics.sh --zookeeper 172.29.4.168:2181,172.29.4.169:2181,172.29.4.170:2181 --describe --topic zhukun.net # 接下来我们来模拟生产/消费的过程 # 模拟生产者发送消息(仅在172.29.4.168上运行) $ ./bin/kafka-console-producer.sh --broker-list 172.29.4.168:9092 --topic zhukun.net # 模拟消费者消费消息(仅在172.29.4.169上运行, 如果加上--from-beginning则会读取topic里的全部消息,包括以前的消息) $ ./bin/kafka-console-consumer.sh --bootstrap-server kafka1.zhukun.net:9092,kafka2.zhukun.net:9092,kafka3.zhukun.net:9092 --topic zhukun.net # 此时在生产者端随便输入内容并回车, 能在消费者端看到消息内容 # 删除某个topic $ ./bin/kafka-topics.sh --zookeeper 172.29.4.168:2181,172.29.4.169:2181,172.29.4.170:2181 --delete --topic zhukun.net
参考文档:
centos7部署kafka+zookeeper集群
在 CentOS 上安裝 Apache Kafka cluster