Categories
程式開發

Kafka系列8:一网打尽常用脚本及配置,宜收藏落灰!


前言

通过前面 7 篇文章的介绍,小伙伴们应该对 Kafka 运行工作原理有一个相对比较清晰的认识了。为了提高平时的工作效率,帮助我们快速定位一些线上问题,比如查看部分 Partition 堆积机器 IP 等操作,这篇文章总结了一些平时常用到的一些 Kafka 命令及常用配置,方便日后查阅(该文章中提到的相关配置会持续更新)。

文章概览

常用脚本及命令总结。常用配置及说明。

常用命令总结

一. kafka-topic.sh 脚本相关常用命令,主要操作 Topic。

创建名字为 “op_log” 的 Topic。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 3 --topic op_log

查看指定 ZK 管理的 Topic 列表

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

查看指定 Topic 的详细信息,包括 Partition 个数,副本数,ISR 信息

$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe op_log

修改指定 Topic 的 Partition 个数。注意:该命令只能增加 Partition 的个数,不能减少 Partition 个数,当修改的 Partition 个数小于当前的个数,会报如下错误:Error while executing topic command : The number of partitions for a topic can only be increased

$ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic op_log1 --partition 4

删除名字为 “op_log” 的 Topic。注意如果直接执行该命令,而没有设置 delete.topic.enable 属性为 true 时,是不会立即执行删除操作的,而是仅仅将指定的 Topic 标记为删除状态,之后会启动后台删除线程来删除。

$ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic op_log

二. kafka-consumer-groups.sh 脚本常用命令,主要用于操作消费组相关的。

查看消费端的所有消费组信息(ZK 维护消费信息)

$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

查看消费端的所有消费组信息(Kafka 维护消费信息)

$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

查看指定 group 的详细消费情况,包括当前消费的进度,消息积压量等等信息(ZK 维护消息信息)

$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-1291 --describe

查看指定 group 的详细消费情况,包括当前消费的进度,消息积压量等等信息(Kafka 维护消费信息)

$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group mygroup --describe

三. kafka-consumer-offset-checker.sh 脚本常用命令,用于检查 OffSet 相关信息。(注意:该脚本在 0.9 以后可以使用 kafka-consumer-groups.sh 脚本代替,官方已经标注为 deprecated 了)

查看指定 group 的 OffSet 信息、所有者等信息

$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group console-consumer-1291

查看指定 group 的 OffSet 信息,包括消费者所在的 IP 信息等等

$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group console-consumer-1291 --broker-info

四. kafka-configs.sh 脚本常用命令,该脚本主要用于增加/修改 Kafka 集群的配置信息。

对指定 entry 添加配置,其中 entry 可以是 topics、clients;如下,给指定的 topic 添加两个属性,分别为 max.message.byte、flush.message。

$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mytopic --add-config 'max.message.bytes=50000000,flush.message=5'

对指定 entry 删除配置,其中 entry 可以是 topics、clients;如下。对指定的 topic 删除 flush.message 属性。

$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mytopic --delete-config 'flush.message'

查看指定 topic 的配置项。

$ bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name mytopic --describe

五. kafka-reassign-partitions.sh 脚本相关常用命令,主要操作 Partition 的负载情况。

生成 partition 的移动计划,–broker-list 参数指定要挪动的 Broker 的范围,–topics-to-move-json-file 参数指定 Json 配置文件

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --broker-list "0" --topics-to-move-json-file ~/json/op_log_topic-to-move.json --generate

执行 Partition 的移动计划,–reassignment-json-file 参数指定 RePartition 后 Assigned Replica 的分布情况。

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/json/op_log_reassignment.json --execute

检查当前 rePartition 的进度情况

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/json/op_log_reassignment.json --verify

常用配置及说明

kafka 常见重要配置说明,分为四部分

Broker Config:kafka 服务端的配置Producer Config:生产端的配置Consumer Config:消费端的配置Kafka Connect Config:kafka 连接相关的配置

Broker Config

zookeeper.connect

连接 zookeeper 集群的地址,用于将 kafka 集群相关的元数据信息存储到指定的 zookeeper 集群中

advertised.port

注册到 zookeeper 中的地址端口信息,在 IaaS 环境中,默认注册到 zookeeper 中的是内网地址,通过该配置指定外网访问的地址及端口,advertised.host.name 和 advertised.port 作用和 advertised.port 差不多,在 0.10.x 之后,直接配置 advertised.port 即可,前两个参数被废弃掉了。

auto.create.topics.enable

自动创建 topic,默认为 true。其作用是当向一个还没有创建的 topic 发送消息时,此时会自动创建一个 topic,并同时设置 -num.partition 1 (partition 个数) 和 default.replication.factor (副本个数,默认为 1) 参数。

一般该参数需要手动关闭,因为自动创建会影响 topic 的管理,我们可以通过 kafka-topic.sh 脚本手动创建 topic,通常也是建议使用这种方式创建 topic。在 0.10.x 之后提供了 kafka-admin 包,可以使用其来创建 topic。

auto.leader.rebalance.enable

自动 rebalance,默认为 true。其作用是通过后台线程定期扫描检查,在一定条件下触发重新 leader 选举;在生产环境中,不建议开启,因为替换 leader 在性能上没有什么提升。

background.threads

后台线程数,默认为 10。用于后台操作的线程,可以不用改动。

broker.id

Broker 的唯一标识,用于区分不同的 Broker。kafka 的检查就是基于此 id 是否在 zookeeper 中/brokers/ids 目录下是否有相应的 id 目录来判定 Broker 是否健康。

compression.type

压缩类型。此配置可以接受的压缩类型有 gzip、snappy、lz4。另外可以不设置,即保持和生产端相同的压缩格式。

delete.topic.enable

启用删除 topic。如果关闭,则无法使用 admin 工具进行 topic 的删除操作。

leader.imbalance.check.interval.seconds

partition 检查重新 rebalance 的周期时间

leader.imbalance.per.broker.percentage

标识每个 Broker 失去平衡的比率,如果超过改比率,则执行重新选举 Broker 的 leader

log.dir / log.dirs

保存 kafka 日志数据的位置。如果 log.dirs 没有设置,则使用 log.dir 指定的目录进行日志数据存储。

log.flush.interval.messages

partition 分区的数据量达到指定大小时,对数据进行一次刷盘操作。比如设置了 1024k 大小,当 partition 积累的数据量达到这个数值时则将数据写入到磁盘上。

log.flush.interval.ms

数据写入磁盘时间间隔,即内存中的数据保留多久就持久化一次,如果没有设置,则使用 log.flush.scheduler.interval.ms 参数指定的值。

log.retention.bytes

表示 topic 的容量达到指定大小时,则对其数据进行清除操作,默认为-1,永远不删除。

log.retention.hours

标示 topic 的数据最长保留多久,单位是小时

log.retention.minutes

表示 topic 的数据最长保留多久,单位是分钟,如果没有设置该参数,则使用 log.retention.hours 参数

log.retention.ms

表示 topic 的数据最长保留多久,单位是毫秒,如果没有设置该参数,则使用 log.retention.minutes 参数

log.roll.hours

新的 segment 创建周期,单位小时。kafka 数据是以 segment 存储的,当周期时间到达时,就创建一个新的 segment 来存储数据。

log.segment.bytessegment 的大小。当 segment 大小达到指定值时,就新创建一个 segment。message.max.bytes

topic 能够接收的最大文件大小。需要注意的是 producer 和 consumer 端设置的大小需要一致。

min.insync.replicas

最小副本同步个数。当 producer 设置了 request.required.acks 为-1 时,则 topic 的副本数要同步至该参数指定的个数,如果达不到,则 producer 端会产生异常。

num.io.threads

指定 io 操作的线程数

num.network.threads

执行网络操作的线程数

num.recovery.threads.per.data.dir

每个数据目录用于恢复数据的线程数

num.replica.fetchers

从 leader 备份数据的线程数

offset.metadata.max.bytes

允许消费者端保存 offset 的最大个数

offsets.commit.timeout.ms

offset 提交的延迟时间

offsets.topic.replication.factor

topic 的 offset 的备份数量。该参数建议设置更高保证系统更高的可用性

port

端口号,Broker 对外提供访问的端口号。

request.timeout.ms

Broker 接收到请求后的最长等待时间,如果超过设定时间,则会给客户端发送错误信息

zookeeper.connection.timeout.ms

客户端和 zookeeper 建立连接的超时时间,如果没有设置该参数,则使用 zookeeper.session.timeout.ms 值

connections.max.idle.ms

空连接的超时时间。即空闲连接超过该时间时则自动销毁连接。

Producer Config

bootstrap.servers

服务端列表。即接收生产消息的服务端列表

key.serializer

消息键的序列化方式。指定 key 的序列化类型

value.serializer

消息内容的序列化方式。指定 value 的序列化类型

acks

消息写入 Partition 的个数。通常可以设置为 0,1,all;当设置为 0 时,只需要将消息发送完成后就完成消息发送功能;当设置为 1 时,即 Leader Partition 接收到数据并完成落盘;当设置为 all 时,即主从 Partition 都接收到数据并落盘。

buffer.memory

客户端缓存大小。即 Producer 生产的数据量缓存达到指定值时,将缓存数据一次发送的 Broker 上。

compression.type

压缩类型。指定消息发送前的压缩类型,通常有 none, gzip, snappy, or, lz4 四种。不指定时消息默认不压缩。

retries

消息发送失败时重试次数。当该值设置大于 0 时,消息因为网络异常等因素导致消息发送失败进行重新发送的次数。

Consumer Config

bootstrap.servers

服务端列表。即消费端从指定的服务端列表中拉取消息进行消费。

key.deserializer

消息键的反序列化方式。指定 key 的反序列化类型,与序列化时指定的类型相对应。

value.deserializer

消息内容的反序列化方式。指定 value 的反序列化类型,与序列化时指定的类型相对应。

fetch.min.bytes

抓取消息的最小内容。指定每次向服务端拉取的最小消息量。

group.id

消费组中每个消费者的唯一表示。

heartbeat.interval.ms

心跳检查周期。即在周期性的向 group coordinator 发送心跳,当服务端发生 rebalance 时,会将消息发送给各个消费者。该参数值需要小于 session.timeout.ms,通常为后者的 1/3。

max.partition.fetch.bytes

Partition 每次返回的最大数据量大小。

session.timeout.ms

consumer 失效的时间。即 consumer 在指定的时间后,还没有响应则认为该 consumer 已经发生故障了。

auto.offset.reset

当 kafka 中没有初始偏移量或服务器上不存在偏移量时,指定从哪个位置开始消息消息。earliest:指定从头开始;latest:从最新的数据开始消费。

Kafka Connect Config

group.id

消费者在消费组中的唯一标识

internal.key.converter

内部 key 的转换类型。

internal.value.converter

内部 value 的转换类型。

key.converter

服务端接收到 key 时指定的转换类型。

value.converter

服务端接收到 value 时指定的转换类型。

bootstrap.servers

服务端列表。

heartbeat.interval.ms

心跳检测,与 consumer 中指定的配置含义相同。

session.timeout.ms

session 有效时间,与 consumer 中指定的配置含义相同。

总结

本文总结了平时经常用到的一些 Kafka 配置及命令说明,方便随时查看;喜欢的朋友可以收藏以备不时之需。

下篇文章我们来分析一些经常在面试中碰到的问题及相应的解决办法,敬请期待。

历史精彩文章推荐

Kafka系列第7篇:你必须要知道集群内部工作原理的一些事!

Kafka系列第6篇:消息是如何在服务端存储与读取的,你真的知道吗?

重要:Kafka第3篇之一条消息如何被存储到Broker上

微信公众号搜索【z小赵】,更多系列精彩文章等你解锁。

Kafka系列8:一网打尽常用脚本及配置,宜收藏落灰! 1