线上问题
org.apache.kafka.common.errors.DisconnectException: null
问题的主要原因是客户端连接Kafka的时候,由于超时的问题被Group Coordinator提出了,造成了连接错误。
处理方法
Kafka相关超时
{
"auto.commit.interval.ms": 10000,
"session.timeout.ms": 600000,
"request.timeout.ms": 600000,
"fetch.max.wait.ms": 600000,
"max.poll.interval.ms": 600000,
"max.partition.fetch.bytes": 10485760,
"max.poll.records": 1000,
"fetch.message.max.bytes": 10485760
}
参数说明
-
auto.commit.interval.ms: 这个参数指定了自动提交偏移量的时间间隔(单位为毫秒)。如果 enable.auto.commit 设置为 true,则每隔这个时间间隔,Kafka Consumer 将自动地把当前消费的最后一条消息的偏移量提交到 Kafka。默认值是 10000 毫秒(即10秒),这意味着每10秒Consumer会自动提交一次消费进度。 -
session.timeout.ms: 这个配置参数直接影响 Consumer 与 Group Coordinator 的心跳机制。Consumer 必须定期发送心跳到 Group Coordinator 来表明自己仍然活跃并且正常工作。如果在session.timeout.ms时间内 Coordinator 没有收到 Consumer 的心跳,就会认为该 Consumer 已经宕机或无法通讯,并触发一次 Consumer Group 的重新平衡(rebalance)操作。 -
request.timeout.ms: 这是消费者或生产者等待请求(如发送消息或获取消息)完成的最长等待时间。如果在这段时间内没有收到服务器的响应,则认为请求失败。这里的值也是 600000 毫秒(即10分钟)。 -
fetch.max.wait.ms: 这是消费者在发起fetch请求后愿意等待新数据到达的最大时间(前提是可用数据量小于 fetch.min.bytes 设置的阈值)。设置为 600000 毫秒意味着如果Broker上没有足够的新数据,Consumer最多会等待10分钟。 -
max.poll.interval.ms: 这是两次调用 poll() 方法之间的最大允许时间间隔。如果消费者的处理时间加上网络延迟超过了这个时间,那么消费者会被认为已经失效并退出消费组,从而触发 rebalance。此处设置为 600000 毫秒,即10分钟。 -
max.partition.fetch.bytes: 这个参数限制了每次fetch请求从每个分区中能够获取的最大字节数。设置为 10485760 字节,即10MB,表示每个分区一次最多拉取10MB的数据。 -
max.poll.records: 指定每次调用 poll() 方法时,Consumer一次性返回的最大记录数。设置为 1000 表示每次拉取请求最多返回1000条消息。 -
fetch.message.max.bytes: 这个参数定义了单次fetch请求能获取的最大消息大小(包括消息体和元数据)。同样设置为 10485760 字节,即单条消息最大不能超过10MB。
Flink 的Checkpoint
"checkpoint.interval": 10000
Flink的Checkpoint不能设置的太多,不然会消费不了消息。

评论区