# Kafka

# 副本机制

为提高可靠性,kafka为每个partition维护一个AR(Assigned Replicas)列表,列表由ISR和OSR组成,由于(网络、磁盘、内存等)导致副本同步速度慢于参数 replica.lag.time.max.ms 指定的阈值时,副本会移出ISR到OSR中。

  • ISR:In Sync Replicas 与leader数据同步的副本
  • OSR:Out Sync Replicas 与leaer数据不同步的副本

# 截断机制

leader异常下线时,需要从所有Follow中选举新的Leader,Kafka从ISR列表中选举新的Leader。老Leader恢复上线后,发现与新的Leader数据不一致,旧leader会进行数据截断到宕机前的位置,然后同步新Leader的数据

# 消息存储

# 消息生产

# 重复生产

  • At Most Once:消息可能会丢,但不会重复传输
  • At Least Once:消息不会丢,但可能会重复传输
  • Exactly Once:肯定且只存在一条消息,不支持,需要在具体业务中实现

# 配置项

# 确保消息是CP,要么不工作,要么得到ack后,消息不会丢失并且状态一致

request.required.acks:ack策略,-1全量同步确认,强可靠性保证 1主分片确认收到则可 0不确认,吞吐量大

min.insync.replicas:保证当前集群中处理正常同步状态的副本的数量

unclean.leader.election.enable:是否请允许消息截断

# 其它配置项

batch.size: 当多条消息的目标 Partition 相同时,Producer 会尝试将它们组装成一个批量消息,以便减少请求的数量,有助于提升客户端和服务器的性能。批量消息的大小由参数 batch.size 控制。batch.size 为 Int 型,默认值为 16384,单位是字节,即默认大小 16KB 参数 batch.size 不宜过大或过小,过大浪费内存,过小则有可能降低吞吐量(大小为零将完全禁用批处理)

linger.ms: 在批处理模式下,如果消息到达的速度比后台线程发送到 Partition 的速度快,则会出现消息堆积的问题。不过,在某些情况下,客户端可能通过添加人工延迟来减少请求数量,也就是说,生产者将等待给定延迟后才允许发送其它消息,这样可以将待发送的消息批量处理,而不是立即发送。这种处理方式类似于 TCP 中的 Nagle 算法。人工延迟由参数 linger.ms 控制。设置批处理延迟上限时,请注意,一旦我们得到一个分区的 batch.size 值的记录,它将立即发送,而不管该设置如何;但如果这个分区的累积字节数少于 batch.size,我们将“逗留”指定的等待时间,到达 linger.ms 指定的延迟后,即使累积的消息字节数少于 batch.size,也会发送。

buffer.memory: Long 型,默认值为 33554432,单位为字节,即默认大小为 32MB 用来缓冲等待发送给服务器的消息的总字节数。如果消息的发送速度快于传送到服务器的速度,那么缓冲区将被占满,之后 Producer 将阻塞 max.block.ms,随后将抛出异常。buffer.memory 的大小大致与 Producer 可使用的总内存相对应,但不是硬绑定,因为并非 Producer 使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用压缩)以及维护等请求

max.block.ms 为 Long 型,默认值为 60000,单位为毫秒,即大小为 60s。指定的缓存被占满后,Producer 相关的方法可阻塞的最大时间由 max.block.ms 控制,之后将抛出异常

retries Int 型,默认值为 0

**max.in.flight.requests.per.connection ** 避免消息重排序 Int 型,默认值为 5 The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).

# 消息消费

**enable.auto.commit:**boolean型,默认值True 可能导致消息丢失。

log.flush.interval.ms: 单位ms,控制日志刷盘时间间隔

log.flush.interval.messages:控制日志刷盘消息量,每积累多少条刷到磁盘

# 主流消息队列方案比较

消息队列解决应用解耦合、异步消息、流量削锋等问题,可实现高性能、高可用、可伸缩和最终一致性架构

上次更新: : 7 months ago