侧边栏壁纸
博主头像
AllInOne博主等级

随风来,随风去

  • 累计撰写 45 篇文章
  • 累计创建 27 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

Kafka客户端超时引起Rebalance(经验篇)

AllInOne
2024-04-22 / 0 评论 / 0 点赞 / 124 阅读 / 795 字
温馨提示:
点赞-关注-不迷路。

线上问题

org.apache.kafka.common.errors.DisconnectException: null

问题的主要原因是客户端连接Kafka的时候,由于超时的问题被Group Coordinator提出了,造成了连接错误。

处理方法

Kafka相关超时

{
    "auto.commit.interval.ms": 10000,
    "session.timeout.ms": 600000,
    "request.timeout.ms": 600000,
    "fetch.max.wait.ms": 600000,
    "max.poll.interval.ms": 600000,
    "max.partition.fetch.bytes": 10485760,
    "max.poll.records": 1000,
    "fetch.message.max.bytes": 10485760
  }

参数说明

  1. auto.commit.interval.ms: 这个参数指定了自动提交偏移量的时间间隔(单位为毫秒)。如果 enable.auto.commit 设置为 true,则每隔这个时间间隔,Kafka Consumer 将自动地把当前消费的最后一条消息的偏移量提交到 Kafka。默认值是 10000 毫秒(即10秒),这意味着每10秒Consumer会自动提交一次消费进度。

  2. session.timeout.ms: 这个配置参数直接影响 Consumer 与 Group Coordinator 的心跳机制。Consumer 必须定期发送心跳到 Group Coordinator 来表明自己仍然活跃并且正常工作。如果在 session.timeout.ms 时间内 Coordinator 没有收到 Consumer 的心跳,就会认为该 Consumer 已经宕机或无法通讯,并触发一次 Consumer Group 的重新平衡(rebalance)操作。

  3. request.timeout.ms: 这是消费者或生产者等待请求(如发送消息或获取消息)完成的最长等待时间。如果在这段时间内没有收到服务器的响应,则认为请求失败。这里的值也是 600000 毫秒(即10分钟)。

  4. fetch.max.wait.ms: 这是消费者在发起fetch请求后愿意等待新数据到达的最大时间(前提是可用数据量小于 fetch.min.bytes 设置的阈值)。设置为 600000 毫秒意味着如果Broker上没有足够的新数据,Consumer最多会等待10分钟。

  5. max.poll.interval.ms: 这是两次调用 poll() 方法之间的最大允许时间间隔。如果消费者的处理时间加上网络延迟超过了这个时间,那么消费者会被认为已经失效并退出消费组,从而触发 rebalance。此处设置为 600000 毫秒,即10分钟。

  6. max.partition.fetch.bytes: 这个参数限制了每次fetch请求从每个分区中能够获取的最大字节数。设置为 10485760 字节,即10MB,表示每个分区一次最多拉取10MB的数据。

  7. max.poll.records: 指定每次调用 poll() 方法时,Consumer一次性返回的最大记录数。设置为 1000 表示每次拉取请求最多返回1000条消息。

  8. fetch.message.max.bytes: 这个参数定义了单次fetch请求能获取的最大消息大小(包括消息体和元数据)。同样设置为 10485760 字节,即单条消息最大不能超过10MB。

"checkpoint.interval": 10000

Flink的Checkpoint不能设置的太多,不然会消费不了消息。

Kafka和消费者通信相关文章

文章链接

0

评论区