超爱学习网
kafka入门
一级目录
基础使用
面试题
二级目录
基础概念
概念介绍
部署
普通的demo
顺序消费demo
Partition分配策略机制
批量发送与缓存
常见配置
生产者常见配置
broker常见配置
消费者常见配置
细节概念介绍
AR(Assigned Replicas)
ISR(In Sync Replicas)
OSR(Out Sync Replicas)
HW(High Watermark)
LEO(Log End Offset)
LeaderEpoch
存储
概念
LogSegment
稀疏索引
.timeIndex

Kafka 中的 broker.id 配置参数

broker.id 是 Kafka 集群中每个 broker 的唯一标识。这个参数对于集群中的每个 broker 而言是必须的,因为它用于区分不同的 broker。
每个 broker 必须有一个唯一的 ID。在一个 Kafka 集群中,不能有两个 broker 拥有相同的 ID。
在 Kafka 0.9.0.0 之前的版本中,需要手动为每个 broker 指定一个唯一的 ID。从 Kafka 0.9.0.0 版本开始,如果没有为 broker.id 指定值,它将会自动生成一个唯一的 ID。
broker.id 的默认值是 -1,在 Kafka 0.9.0.0 或更高版本中,这意味着 ID 会自动生成。
// Kafka broker 配置示例
val props = new Properties()
props.put("broker.id", "1") // 显式设置 broker.id

val broker = new KafkaBroker(props)

Kafka 中的 log.dirs 配置参数

log.dirs 参数定义了 Kafka 服务端用来存储日志数据的目录。这些目录可以位于文件系统的任意位置,通常应该是高性能的硬盘。在 Kafka 中,'日志'这个词并不是指日志文件,而是指 Kafka 管理的消息记录。
如果有多个目录被指定,每个 topic 的分区都会尽量平均地分布在这些目录中。这样可以提高性能,因为可以并行地从多个硬盘读写数据。
默认值通常为 '/tmp/kafka-logs',但在生产环境中,建议更改为具有更高数据吞吐量和更好可靠性的磁盘上的路径。
# Kafka 服务端配置示例
log.dirs=/var/lib/kafka/data
# 多个目录可以用逗号分隔
# log.dirs=/var/lib/kafka/data1,/var/lib/kafka/data2

Kafka 中的 listeners 配置参数

listeners 参数定义了 Kafka 服务器监听的地址列表,用于接收来自生产者和消费者的连接。这个参数对于 Kafka 集群的网络配置至关重要,因为它指定了客户端与 Kafka 交互所使用的端口和协议。
格式通常是 PROTOCOL://HOSTNAME:PORT,其中 PROTOCOL 可以是 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL 等。Kafka 支持同时监听多个协议和端口,以适应不同的网络安全需求。
默认情况下,listeners 参数的值可能未设置,这意味着 Kafka 将使用 broker 的默认地址(PLAINTEXT://0.0.0.0:9092)。这表示 Kafka 将接受连接在所有网络接口的 9092 端口上的 PLAINTEXT 连接。
正确配置 listeners 对于 Kafka 集群的安全和高效运行非常重要,尤其是在多网卡或需要安全通信的环境中。
// Kafka 服务器配置示例
val props = new Properties()
props.put("zookeeper.connect", "localhost:2181")
props.put("broker.id", "0")
props.put("log.dirs", "/tmp/kafka-logs")
props.put("listeners", "PLAINTEXT://your.host.name:9092") // 设置 listeners

val server = new KafkaServerStartable(new KafkaConfig(props))

Kafka 中的 advertised.listeners 配置参数

advertised.listeners 是 Kafka 配置中的一个关键参数,用于定义 Kafka broker 对外广播给客户端的监听地址。这个地址是客户端用来连接到 Kafka 集群的。在 Kafka 集群中,每个 broker 可能有多个监听地址,但 advertised.listeners 指定了客户端应该使用哪个地址进行连接。
在多网卡、NAT 或负载均衡器等复杂网络环境下,advertised.listeners 特别重要。因为 broker 可能对内部网络使用一个地址,对外部网络使用另一个地址。
默认情况下,advertised.listeners 没有设置默认值,必须明确指定。如果没有正确配置,客户端可能无法连接到 Kafka 集群。
通常,advertised.listeners 的值应该是 broker 可以被外部网络访问的 IP 地址或域名。
# Kafka broker 配置示例
advertised.listeners=PLAINTEXT://your.host.name:9092
# 注意:'your.host.name' 需要替换为实际的主机名或 IP 地址

Kafka 中的 zookeeper.connect 配置参数

zookeeper.connect 参数指定了 Kafka 集群连接到 ZooKeeper 集群的地址。这个配置对于 Kafka 集群的运行至关重要,因为 Kafka 使用 ZooKeeper 来维护集群的元数据和进行领导选举。
这个参数的值是一个包含 ZooKeeper 服务地址的字符串,各个地址之间用逗号分隔。每个地址可以是 IP 地址或主机名,后面跟着端口号。例如:'host1:2181,host2:2181,host3:2181'。
如果你的 Kafka 集群和 ZooKeeper 集群部署在不同的服务器上,正确配置这个参数是非常重要的。不正确的配置可能导致 Kafka 集群无法正常启动或运行。
默认情况下,zookeeper.connect 参数没有设置默认值,需要在配置 Kafka 时显式提供。
// Kafka 服务器配置示例
val props = new Properties()
props.put("zookeeper.connect", "host1:2181,host2:2181,host3:2181") // 设置 zookeeper.connect

val server = new KafkaServerStartable(new KafkaConfig(props))

Kafka 中的 auto.create.topics.enable 配置参数

`auto.create.topics.enable` 参数控制着 Kafka 是否自动创建消费者请求的尚不存在的主题。这个特性可以方便用户,但在某些情况下可能导致意外的行为(如拼写错误导致创建了不必要的主题)。
当设置为 `true` 时,如果一个生产者或消费者尝试发送或接收一个不存在的主题的消息,Kafka 会自动创建这个主题。这种行为适用于那些希望在使用前无需手动创建主题的场景。
相反,当设置为 `false` 时,Kafka 不会自动创建不存在的主题。在这种情况下,如果尝试访问不存在的主题,将会发生错误。这对于需要严格控制主题创建的环境更为适合。
默认值:`auto.create.topics.enable` 参数的默认值是 `true`。这意味着 Kafka 默认配置下会自动创建不存在的主题。
// Kafka 服务器配置示例
val serverProps = new Properties()
serverProps.put("auto.create.topics.enable", "true") // 启用自动创建主题

// 启动 Kafka 服务器
val server = new KafkaServer(serverProps)

Kafka 中的 auto.leader.rebalance.enable 配置参数

auto.leader.rebalance.enable 参数用于控制 Kafka 集群是否自动平衡分区的领导者。在 Kafka 集群中,每个分区都有一个领导者负责处理读写请求。当领导者节点发生故障时,Kafka 需要从副本中选举新的领导者。
此参数的默认值是 true,意味着 Kafka 集群会自动执行领导者选举和平衡。当设置为 false 时,领导者平衡必须手动执行。自动平衡有助于确保集群在节点故障后能迅速恢复正常操作。
合理地使用此参数可以帮助提高 Kafka 集群的稳定性和性能。在大多数情况下,保持默认设置(即开启自动平衡)是推荐的,因为它减少了手动干预的需要,并且能够更快地恢复分区的领导者功能。
// Kafka 配置文件 server.properties 示例
auto.leader.rebalance.enable=true // 启用自动领导者平衡

Kafka 中的 leader.imbalance.check.interval.seconds 配置参数

leader.imbalance.check.interval.seconds 参数定义了 Kafka 控制器在分区领导者不平衡的情况下重新平衡的时间间隔。这个时间间隔是以秒为单位的。
当 Kafka 集群中的某个分区的领导者不均匀分布时,Kafka 控制器会根据该参数设置的时间间隔检查并尝试平衡领导者分布。
适当地设置这个参数对于保持 Kafka 集群的高性能和稳定性是非常重要的。过短的检查间隔可能导致频繁的领导者选举,影响性能;而过长的间隔可能导致领导者不平衡的问题持续存在,影响负载均衡。
默认值: '300' (即每 5 分钟检查一次)。
// Kafka 服务器配置示例
val props = new Properties()
props.put("zookeeper.connect", "localhost:2181")
props.put("broker.id", "0")
props.put("log.dirs", "/tmp/kafka-logs")
props.put("leader.imbalance.check.interval.seconds", "300") // 设置 leader.imbalance.check.interval.seconds

val server = new KafkaServerStartable(props)

Kafka 中的 leader.imbalance.per.broker.percentage 配置参数

leader.imbalance.per.broker.percentage 参数定义了在触发领导者平衡之前,允许单个 broker 上的领导者数量与平均领导者数量的最大差异百分比。当 Kafka 集群中的某个 broker 拥有的领导者分区数量远高于平均水平时,这可能会导致该 broker 负载过重。
这个配置用于自动平衡集群中的领导者分布。如果 broker 的领导者数量超过了整体平均值的这个百分比,Kafka 将尝试重新平衡领导者,将一些领导者从负载较高的 broker 移动到其他 broker 上。
调整此配置可以帮助防止某些 broker 因为处理过多的领导者分区而过载。适当设置此参数对于保持 Kafka 集群的负载均衡非常重要。
默认值: Kafka 默认的 'leader.imbalance.per.broker.percentage' 值是 10。这意味着当任何单个 broker 的领导者数量超过整体平均水平的 10% 时,Kafka 将尝试进行领导者再平衡。

Kafka 中的 compression.type 配置参数

compression.type 参数用于设置消息压缩的类型。Kafka 支持多种压缩类型,包括 'none'(无压缩)、'gzip'、'snappy' 和 'lz4'。使用压缩可以减少网络和存储的使用量,但可能会增加一些计算开销。
此配置项决定了生产者在发送消息到 Kafka 服务器之前是否对数据进行压缩。如果启用压缩,所有的消息批次都会被压缩。Kafka 的消费者会自动解压缩这些数据。
默认值为 'none',意味着默认情况下不对消息进行压缩。选择合适的压缩类型取决于对吞吐量和延迟的需求,以及网络和CPU资源的可用性。
// Kafka 生产者配置示例
val props = new Properties()
props.put("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("compression.type", "none") // 设置 compression.type,默认为 'none'

val producer = new KafkaProducer(props)

Kafka 中的 delete.topic.enable 配置参数

delete.topic.enable 参数用于控制是否允许在 Kafka 集群中删除主题。当设置为 true 时,可以通过使用 Kafka 的管理员工具或者编程方式来删除主题。
如果设置为 false,那么尝试删除主题的操作将会被忽略。这个设置可以防止意外删除数据,特别是在生产环境中非常有用。
默认值:在 Kafka 0.11.0.0 及之后的版本中,默认值为 true,意味着删除操作是被允许的。在此之前的版本中,此功能可能默认被禁用。
// Kafka 配置文件(server.properties)示例
// 开启删除主题的功能
delete.topic.enable=true

Kafka 中的 log.flush.interval.messages 配置参数

log.flush.interval.messages 参数指定了在强制刷新提交到日志的消息之前,可以接收的消息数量。这个参数的值决定了在发生故障时可能丢失的数据量,因为 Kafka 只保证刷新到磁盘的消息是持久的。
在高吞吐量的场景中,设置一个较大的值可以提高性能,因为这减少了频繁的磁盘I/O操作。然而,这也意味着在故障情况下可能会丢失更多的数据。
相反,较小的值会增加数据的持久性,但可能会降低吞吐量,因为它会导致更频繁的磁盘同步操作。
默认值:log.flush.interval.messages 的默认值通常是 Long.MAX_VALUE,这意味着不会基于消息数量来自动触发日志刷新。
// Kafka 配置示例
val props = new Properties()
props.put("bootstrap.servers", "kafka-broker1:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("log.flush.interval.messages", "10000") // 设置 log.flush.interval.messages

val producer = new KafkaProducer(props)

Kafka 中的 log.flush.scheduler.interval.messages 配置参数

log.flush.scheduler.interval.messages 参数用于配置 Kafka 服务器在将消息写入日志之前在内存中缓存的消息数量。当这个设定的消息数量被累积后,就会触发日志的刷新操作。
这个配置有助于控制数据的持久性和一致性。通过增加此值,可以提高 Kafka 的吞吐量,因为它减少了磁盘 I/O 操作的频率。但是,这也意味着在发生故障时,可能会丢失更多的消息。
默认情况下,此配置通常设置为一个较高的值或者禁用(通过设置为 -1)。禁用此设置意味着 Kafka 仅依靠 log.flush.interval.ms 来控制日志刷新,而不是基于消息计数。
// Kafka 服务器配置示例
val serverProps = new Properties()
serverProps.put("log.flush.scheduler.interval.messages", "10000") // 设置 log.flush.scheduler.interval.messages

val server = new KafkaServer(serverProps)

Kafka 中的 log.flush.interval.ms 配置参数

log.flush.interval.ms 参数定义了 Kafka 服务器在自动将数据从内存刷新到磁盘的时间间隔。这个时间是以毫秒为单位的,指定了日志记录器在两次强制刷新之间的最长时间。
这个参数有助于控制数据的持久性和持续性。较短的刷新间隔可以增加数据安全性,因为更新的数据更频繁地被写入磁盘。然而,这可能会降低总体吞吐量,因为频繁的磁盘写入可能会成为性能瓶颈。
在默认情况下,log.flush.interval.ms 参数通常不会被显式设置,而是依赖于 Kafka 的默认行为。默认情况下,Kafka 依赖于 log.flush.interval.messages 配置(它定义了在强制刷新之间的最大消息数)或者日志段文件达到其最大大小时自动刷新。
默认值:log.flush.interval.ms 的默认值通常是不设置的,意味着 Kafka 会根据 log.flush.interval.messages 或日志段文件大小来自动处理刷新。
// Kafka 服务器配置示例
val props = new Properties()
props.put("log.flush.interval.ms", "1000") // 设置 log.flush.interval.ms
// 其他配置 ...

val server = new KafkaServerStartable(props)

Kafka 中的 log.retention.bytes 配置参数

log.retention.bytes 参数定义了 Kafka 服务器保留日志文件的最大大小。这个设置用于限制 Kafka 分区日志的大小。当日志文件大小达到这个阈值时,旧的日志文件会被删除,以便为新的日志腾出空间。
这个参数对于管理 Kafka 集群的磁盘空间非常重要。通过合理设置这个值,可以防止 Kafka 服务器上的磁盘空间被完全占用,同时也确保了数据的存储时间不会无限长。
默认情况下,log.retention.bytes 的值是 -1,表示没有限制。也就是说,除非达到日志保留时间(log.retention.hours 或 log.retention.minutes)的限制,否则日志文件不会因为大小的原因被删除。
// Kafka 服务器配置示例
val serverProps = new Properties()
serverProps.put("log.retention.bytes", "1073741824") // 设置 log.retention.bytes 为 1GB
// 其他配置...

Kafka 中的 log.retention.hours 配置参数

log.retention.hours 参数定义了 Kafka 服务器保留日志文件的时间长度,单位为小时。此配置项控制着 Kafka 中的数据保存时长,过期的数据将被自动删除。
该参数的默认值通常设为 168 小时(即 7 天),这意味着 Kafka 会保留 7 天的日志数据。在数据量大或者读取模式不同的情况下,可以根据需要调整此配置。
减少 log.retention.hours 的值可以减轻磁盘空间的压力,但同时也降低了数据可恢复的时间窗口。增加这个值可以延长数据的可用性,但会占用更多磁盘空间。
// Kafka 服务器配置示例
val serverProps = new Properties()
serverProps.put("log.retention.hours", "168") // 设置 log.retention.hours 的默认值为 168 小时

val server = new KafkaServer(serverProps)

Kafka 中的 log.retention.minutes 配置参数

log.retention.minutes 参数定义了 Kafka 服务器在删除旧日志之前保留它们的时间长度,单位为分钟。这个设置用于控制日志文件的存储时间,有助于管理磁盘空间。
当设置了 log.retention.minutes,Kafka 会保留指定时间长度的日志文件。一旦日志文件超出这个时间限制,它们将被自动删除。这有助于防止磁盘空间被无限期地占用,特别是在数据写入量很大的情况下。
默认情况下,log.retention.minutes 的值是未设置的,这意味着它会回退到 log.retention.hours 的设置。如果 log.retention.hours 也未设置,它会再回退到 log.retention.ms 的设置。Kafka 默认的 log.retention.ms 值是 168小时(即一周)。
// Kafka 服务器配置示例
val props = new Properties()
props.put("log.retention.minutes", "10080") // 设置 log.retention.minutes 为 10080分钟(即7天)

Kafka 中的 log.retention.ms 配置参数

log.retention.ms 参数定义了 Kafka 服务器在删除旧日志前保留日志的时间长度,单位是毫秒。这个配置项用于控制日志数据的保留期限,确保数据在被删除前可以被消费。
默认情况下,log.retention.ms 的值为 604800000 毫秒,即 7 天。这意味着 Kafka 会保留每个分区中的日志至少 7 天。在这段时间内,消费者可以读取这些日志数据。过了这个期限,日志数据将被自动删除以释放存储空间。
根据实际需求调整这个值是很重要的。较长的保留期限可以让消费者有更多时间来处理数据,但同时也会占用更多的存储空间。较短的保留期限可以减少存储需求,但可能导致消费者无法及时处理所有数据。
// Kafka 服务器配置示例
val props = new Properties()
props.put("log.retention.ms", "604800000") // 设置 log.retention.ms 为 7 天的毫秒数

val server = new KafkaServer(props)