0%

Zookeeper + Kafka 部署与配置

Kafka 的服务端由称为 Broker 的服务进程构成, 一个 Kafka 集群由多个 Broker 组成,Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化。

推荐使用 Linux 部署 Kafka,得益于 epoll 的 IO 模型以及稳定的零拷贝技术。

因为 Kafka 实现机制多是顺序读写操作,普通机械硬盘可以胜任生产环境,虽然 Kafka 的分布式机制可以不用进行 RAID,但可以考虑使用 RAID,一方面提供磁盘级别的冗余(双保险),另一方面整体的读写吞吐量也会提升。另外尽量选用1T以内的机械硬盘组建 RAID5(大容量单盘恢复成功率低)。

Kafka 部署需要依赖 Zookeeper。 负责协调管理并保存 Kafka 集群的所有元数据信息,比如集群都有哪些Broker在运行、创建了哪些 Topic,每个Topic都有多少分区等等。

生产环境建议部署独立的 Zookeeper 集群 以及 Kafka 集群,至少需要 6 个节点。

版本信息

推荐使用 Kafka 2.0 +

版本信息

zookeeper 3.5.8
kafka 2.12-2.4.1
JDK 8+

准备工作

做好所有节点的时间同步

Zookeeper 部署

三个节点都绑定 hosts,如:

1
2
3
10.10.16.11 zk1
10.10.16.12 zk2
10.10.16.13 zk3

解压Zookeeper到目录,创建数据日志目录,复制配置文件:

1
2
3
4
5
mkdir -p /data/zookeeper/data
mkdir -p /data/zookeeper/log

cd /usr/local/zookeeper
cp conf/zoo_sample.cfg conf/zoo.cfg

修改配置文件,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
tickTime=2000

initLimit=10

syncLimit=5

dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/log

clientPort=2181

server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888

分别在三个节点上创建节点ID,写入到数据目录:

1
2
3
echo "1" > /data/zookeeper/data/myid
echo "2" > /data/zookeeper/data/myid
echo "3" > /data/zookeeper/data/myid

增加环境变量:

1
2
3
4
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

source /etc/profile

修改 JVM Heap 堆大小,此处修改为固定1G:

1
2
3
4
vi zkEnv.sh

ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-1000}"
export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m -Xms${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS"

启动命令:

1
2
3
4
5
zkServer.sh start
zkServer.sh stop

# status 可以看到当前节点主备状态
zkServer.sh status

Zookeeper 默认会启动一个Web RestFul 管理界面,可以访问:

1
http://IP:8080/commands

根据需要写入 /etc/rc.local 开机自启动。

Kafka 部署

解压 Kafka 到目录,创建数据日志目录,并修改配置文件:

1
2
3
mkdir /data/kafka-logs/

vi /usr/local/kafka/conf/server.properties

三个节点都绑定 hosts:

1
2
3
10.10.16.131 kakfa1
10.10.16.132 kakfa2
10.10.16.133 kakfa3

加入环境变量,其中 JMX_PORT 是管理运维端口,KAFKA_HEAP_OPTS 是配置堆大小。:

1
2
3
4
5
6
export KAFKA_HOME=/usr/local/kafka

export JMX_PORT="9999"
export KAFKA_HEAP_OPTS="-Xms16G -Xmx16G"

export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin

启动与停止 Kafka:

1
2
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
/usr/local/kafka/bin/kafka-server-stop.sh

Server 运行日志

修改 config/log4j.properties 增加必要的日志滚动策略,默认是没有滚动,如:

1
2
log4j.appender.authorizerAppender.MaxFileSize=128MB
log4j.appender.authorizerAppender.MaxBackupIndex=14

运维工具

KafkaManager

1
docker run -d --name kafka-manager -p 9000:9000 -e ZK_HOSTS=10.10.16.111:2181,10.10.16.112:2181,10.10.16.113:2181 kafkamanager/kafka-manager

Kafka Eagle

https://github.com/smartloli/kafka-eagle

Kafka SASL 认证

我们期望 Kafka 监听两个地址与端口,其中内网端口监听 9092,无认证;外网端口监听 9093,需要 SASL 认证:

修改 server.properties

1
2
3
4
5
6
7
# 内网地址与外网地址
listeners=PLAINTEXT://10.0.x.x:9092,SASL_PLAINTEXT://x.x.x.x:9093

sasl.enabled.mechanisms=PLAIN

# 用户名密码
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_logkafka="4d5ccd1dxxx";

其中 user_logkafka="4d5ccd1dxxx" 表示:

1
用户名: logkafka,密码为:4d5ccd1dxxx

SpringBoot SASL 认证代码示例

一定要选对 SpringBoot 所对应的 Kafka 版本。

开启后,客户端需要进行认证使用,SpringBoot 配置示例:

1
2
3
4
5
6
7
8
9
10
11
spring:
kafka:
bootstrap-servers: kafka1:9093,kafka2:9093,kafka3:9093
consumer:
group-id: default
enable-auto-commit: false
auto-offset-reset: latest
properties:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="logkafka" password="4d5ccd1dxxx";