概览
Kafka 模块本身目前仅在 Pigsty 专业版中提供 Beta 试用预览。
安装
如果您使用开源版 Pigsty,可以使用以下命令,在指定节点上安装 Kafka 及其 Java 依赖。
Pigsty 在官方 Infra 仓库中提供了 Kafka 3.8.0 的 RPM 与 DEB 安装包,如果需要使用,可以直接下载安装。
./node.yml -t node_install -e '{"node_repo_modules":"infra","node_packages":["kafka"]}'
Kafka 依赖 Java 运行环境,因此在安装 Kafka 时,需要安装可用的 JDK (默认使用 OpenJDK 17,但其他 JDK 与版本,例如 8,11 都可以使用)
# EL7 (没有 JDK 17 支持)
./node.yml -t node_install -e '{"node_repo_modules":"node","node_packages":["java-11-openjdk-headless"]}'
# EL8 / EL9 (使用 OpenJDK 17 )
./node.yml -t node_install -e '{"node_repo_modules":"node","node_packages":["java-17-openjdk-headless"]}'
# Debian / Ubuntu (使用 OpenJDK 17)
./node.yml -t node_install -e '{"node_repo_modules":"node","node_packages":["openjdk-17-jdk"]}'
配置
单节点 Kafka 配置样例,请注意,在 Pigsty 单机部署模式下,管理节点上的 9093 端口默认已经被 AlertManager 占用。
建议在管理节点上安装 Kafka 时,Peer Poort 使用其他端口,例如(9095)。
kf-main:
hosts:
10.10.10.10: { kafka_seq: 1, kafka_role: controller }
vars:
kafka_cluster: kf-main
kafka_data: /data/kafka
kafka_peer_port: 9095 # 9093 is already hold by alertmanager
三节点 Kraft 模式 Kafka 集群配置样例:
kf-test:
hosts:
10.10.10.11: { kafka_seq: 1, kafka_role: controller }
10.10.10.12: { kafka_seq: 2, kafka_role: controller }
10.10.10.13: { kafka_seq: 3, kafka_role: controller }
vars:
kafka_cluster: kf-test
管理
以下是基本的 Kafka 集群基本管理操作:
使用 kafka.yml
创建 Kafka 集群:
./kafka.yml -l kf-main
./kafka.yml -l kf-test
创建一个名为 test
的 Topic:
kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
这里 --replication-factor 1
表示每个数据只会复制一次,--partitions 1
表示只创建一个分区。
使用以下命令,查看 Kafka 中的 Topic 列表:
kafka-topics.sh --bootstrap-server localhost:9092 --list
使用 Kafka 自带的消息生产者,向 test
Topic 发送消息:
kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>haha
>xixi
>hoho
>hello
>world
> ^D
使用 Kafka 自带的消费者,从 test
Topic 中读取消息:
kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
剧本
Pigsty 提供了两个与 KAFKA 模块相关的剧本,分别用于纳管与移除节点。
node.yml
:纳管节点,并调整节点到期望的状态node-rm.yml
:从 pigsty 中移除纳管节点
此外, Pigsty 还提供了两个包装命令工具:node-add
与 node-rm
,用于快速调用剧本。
kafka.yml
用于部署 Kafka KRaft 模式集群的 kafka.yml
剧本包含以下子任务:
kafka-id : generate kafka instance identity
kafka_clean : remove existing kafka instance (DANGEROUS)
kafka_user : create os user kafka
kafka_pkg : install kafka rpm/deb packages
kafka_link : create symlink to /usr/kafka
kafka_path : add kafka bin path to /etc/profile.d
kafka_svc : install kafka systemd service
kafka_dir : create kafka data & conf dir
kafka_config : generate kafka config file
kafka_boot : bootstrap kafka cluster
kafka_launch : launch kafka service
kafka_exporter : launch kafka exporter
kafka_register : register kafka service to prometheus
监控
Pigsty 提供了两个与 KAFKA
模块有关的监控面板:
KAFKA Overview 展示了 Kafka 集群的整体监控指标。
KAFKA Instance 展示了单个 Kafka 实例的监控指标详情
参数
Kafka 的可用配置项:
#kafka_cluster: #CLUSTER # kafka cluster name, required identity parameter
#kafka_role: controller #INSTANCE # kafka role, controller, broker, or controller-only
#kafka_seq: 0 #INSTANCE # kafka instance seq number, required identity parameter
kafka_clean: false # cleanup kafka during init? false by default
kafka_data: /data/kafka # kafka data directory, `/data/kafka` by default
kafka_version: 3.8.0 # kafka version string
scala_version: 2.13 # kafka binary scala version
kafka_port: 9092 # kafka broker listen port
kafka_peer_port: 9093 # kafka broker peer listen port, 9093 by default (conflict with alertmanager)
kafka_exporter_port: 9308 # kafka exporter listen port, 9308 by default
kafka_parameters: # kafka parameters to be added to server.properties
num.network.threads: 3
num.io.threads: 8
socket.send.buffer.bytes: 102400
socket.receive.buffer.bytes: 102400
socket.request.max.bytes: 104857600
num.partitions: 1
num.recovery.threads.per.data.dir: 1
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.retention.hours: 168
log.segment.bytes: 1073741824
log.retention.check.interval.ms: 300000
#log.retention.bytes: 1073741824
#log.flush.interval.ms: 1000
#log.flush.interval.messages: 10000
资源
Pigsty 为 PostgreSQL 提供了一些 Kafka 相关的扩展插件:
kafka_fdw
,一个有趣的 FDW,允许用户直接从 PostgreSQL 中读写 Kafka Topic 数据wal2json
,用于从 PostgreSQL 中逻辑解码 WAL 日志,生成 JSON 格式的变更数据wal2mongo
,用于从 PostgreSQL 中逻辑解码 WAL 日志,生成 BSON 格式的变更数据decoder_raw
,用于从 PostgreSQL 中逻辑解码 WAL 日志,生成 SQL 格式的变更数据test_decoding
,用于从 PostgreSQL 中逻辑解码 WAL 日志,生成 RAW 格式的变更数据