3.1 Broker配置

基本配置如下:

  • broker.id
  • log.dirs
  • zookeeper.connect

下文将详细论述了主题级别配置和默认值。

名称 描述 类型 默认 有效值 重要程度
zookeeper.connect zookeeper host string string
advertised.host.name 过时的:当advertised.listenerslisteners没设置时候才使用。请改用advertised.listeners。Hostname发布到Zookeeper供客户端使用。在IaaS环境中,Broker可能需要绑定不同的接口。如果没有设置,将会使用host.name(如果配置了)。否则将从java.net.InetAddress.getCanonicalHostName()获取。 string null
advertised.listeners 发布到Zookeeper供客户端使用监听(如果不同)。在IaaS环境中,broker可能需要绑定不同的接口。如果没设置,则使用listeners string null
advertised.port 过时的:当advertised.listenerslisteners没有设置才使用。请改用advertised.listeners。端口发布到Zookeeper供客户端使用,在IaaS环境中,broker可能需要绑定到不同的端口。如果没有设置,将和broker绑定的同一个端口。 int null
auto.create.topics.enable 启用自动创建topic boolean true
auto.leader.rebalance.enable 启用自动平衡leader。如果需要,后台线程会定期检查并触发leader平衡。 boolean true
background.threads 用于各种后台处理任务的线程数 int 10 [1,...]
broker.id 服务器的broker id。如果未设置,将生成一个独一无二的broker id。要避免zookeeper生成的broker id和用户配置的broker id冲突,从reserved.broker.max.id + 1开始生成。 int -1
compression.type 为给定topic指定最终的压缩类型。支持标准的压缩编码器('gzip', 'snappy', 'lz4')。也接受'未压缩',就是没有压缩。保留由producer设置的原始的压缩编码。 string producer
delete.topic.enable 启用删除topic。如果此配置已关闭,通过管理工具删除topic将没有任何效果 boolean false
host.name 不赞成:当listeners没有设置才会使用。请改用listeners。如果设置,它将只绑定到此地址。如果没有设置,它将绑定到所有接口 string ""
leader.imbalance.check.interval.seconds 由控制器触发分区再平衡检查的频率 long 300
leader.imbalance.per.broker.percentage 允许每个broker的leader比例不平衡。如果每个broker的值高于此值,控制器将触发leader平衡,该值以百分比的形式指定。 int 10
listeners 监听列表 - 监听逗号分隔的URL列表和协议。指定hostname为0.0.0.0绑定到所有接口,将hostname留空则绑定到默认接口。合法的listener列表是:PLAINTEXT://myhost:9092,TRACE://:9091 PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093 string null
log.dir 保存日志数据的目录 (补充log.dirs属性) string /tmp/kafka-logs
log.dirs 保存日志数据的目录。如果未设置,则使用log.dir中的值 string null
log.flush.interval.messages 消息刷新到磁盘之前,累计在日志分区的消息数 long 9223372036854775807 [1,...]
log.flush.interval.ms topic中的消息在刷新到磁盘之前保存在内存中的最大时间(以毫秒为单位),如果未设置,则使用log.flush.scheduler.interval.ms中的值 null
log.flush.offset.checkpoint.interval.ms 我们更新的持续记录的最后一次刷新的频率。作为日志的恢复点。 int 60000 [0,...]
log.flush.scheduler.interval.ms 日志刷新的频率(以毫秒为单位)检查是否有任何日志需要刷新到磁盘 long 9223372036854775807
log.retention.bytes 删除日志之前的最大大小 long -1
log.retention.hours 删除日志文件保留的小时数(以小时为单位)。第三级是log.retention.ms属性 int 168
log.retention.minutes 删除日志文件之前保留的分钟数(以分钟为单位)。次于log.retention.ms属性。如果没设置,则使用log.retention.hours的值。 int null
log.retention.ms 删除日志文件之前保留的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes的值。 long null
log.roll.hours 新建一个日志段的最大时间(以小时为单位),次于log.roll.ms属性。 int 168 [1,...]
log.roll.jitter.hours 从logRollTimeMillis(以小时为单位)减去最大抖动,次于log.roll.jitter.ms属性。 int 0 [0,...]
log.roll.ms 新建一个日志段之前的最大事时间(以毫秒为单位)。如果未设置,则使用log.roll.hours的值。 long null
og.segment.bytes 单个日志文件的最大大小 int 1073741824 [14,...]
og.segment.delete.delay.ms 从文件系统中删除文件之前的等待的时间 long 60000 [0,...]
message.max.bytes 服务器可以接收的消息的最大大小 int 1000012 [0,...]
min.insync.replicas 当producer设置acks为"all"(或"-1")时。min.insync.replicas指定必须应答成功写入的replicas最小数。如果不能满足最小值,那么producer抛出一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。 当一起使用时,min.insync.replicas和acks提供最大的耐用性保证。一个典型的场景是创建一个复制因子3的topic,设置min.insync.replicas为2,并且ack是“all”。如果多数副本没有接到写入时,将会抛出一个异常。 int 1 [1,...]
num.io.threads 服务器用于执行网络请求的io线程数 int 8 [1,...]
num.network.threads 服务器用于处理网络请求的线程数。 int 3 [1,...]
num.recovery.threads.per.data.dir 每个数据的目录线程数,用于启动时日志恢复和关闭时flush。 int 1 [1,...]
num.replica.fetchers 从源broker复制消息的提取线程数。递增该值可提高follower broker的I/O的并发。 int 1
offset.metadata.max.bytes offset提交关联元数据条目的最大大小 int 4096
offsets.commit.required.acks commit之前需要的应答数,通常,不应覆盖默认的(-1) short -1
offsets.commit.timeout.ms Offset提交延迟,直到所有副本都收到提交或超时。 这类似于生产者请求超时。 int 5000 [1,...]
offsets.load.buffer.size 当加载offset到缓存时,从offset段读取的批量大小。 int 5242880 [1,...]
offsets.retention.check.interval.ms 检查过期的offset的频率。 long 600000 [1,...]
offsets.retention.minutes offset topic的日志保留时间(分钟) int 1440 [1,...]
offsets.topic.compression.codec 压缩编码器的offset topic - 压缩可以用于实现“原子”提交 int 0
offsets.topic.num.partitions offset commit topic的分区数(部署之后不应更改) int 50 [1,...]
offsets.topic.replication.factor offset topic复制因子(ps:就是备份数,设置的越高来确保可用性)。为了确保offset topic有效的复制因子,第一次请求offset topic时,活的broker的数量必须最少最少是配置的复制因子数。 如果不是,offset topic将创建失败或获取最小的复制因子(活着的broker,复制因子的配置) short 3 [1,...]
offsets.topic.segment.bytes offset topic段字节应该相对较小一点,以便于加快日志压缩和缓存加载 int 104857600 [1,...]
port 不赞成:当listener没有设置才使用。请改用listeners。该port监听和接收连接。 int 9092
queued.max.requests 在阻塞网络线程之前允许的排队请求数 int 500 [1,...]
quota.consumer.default 过时的:当默认动态的quotas没有配置或在Zookeeper时。如果每秒获取的字节比此值高,所有消费者将通过clientId/consumer区分限流。 long 9223372036854775807 [1,...]
quota.producer.default 过时的:当默认动态的quotas没有配置,或在zookeeper时。如果生产者每秒比此值高,所有生产者将通过clientId区分限流。 long 9223372036854775807 [1,...]
replica.fetch.min.bytes Minimum 每个获取响应的字节数。如果没有满足字节数,等待replicaMaxWaitTimeMs。 int 1
replica.fetch.wait.max.ms 跟随者副本发出每个获取请求的最大等待时间,此值应始终小于replica.lag.time.max.ms,以防止低吞吐的topic的ISR频繁的收缩。 int 500
replica.high.watermark. checkpoint.interval.ms 达到高“水位”保存到磁盘的频率。 long 5000
replica.lag.time.max.ms 如果一个追随者没有发送任何获取请求或至少在这个时间的这个leader的没有消费完。该leader将从isr中移除这个追随者。 long 10000
replica.socket.receive.buffer.bytes 用于网络请求的socket接收缓存区 int 65536
replica.socket.timeout.ms 网络请求的socket超时,该值最少是replica.fetch.wait.max.ms int 30000
request.timeout.ms 该配置控制客户端等待请求的响应的最大时间,。如果超过时间还没收到消费。客户端将重新发送请求,如果重试次数耗尽,则请求失败。 int 30000
socket.receive.buffer.bytes socket服务的SO_RCVBUF缓冲区。如果是-1,则默认使用OS的。 int 102400
socket.request.max.bytes socket请求的最大字节数 int 104857600 [1,...]
socket.send.buffer.bytes socket服务的SO_SNDBUF缓冲区。如果是-1,则默认使用OS的。 int 102400
unclean.leader.election.enable 是否启用不在ISR中的副本参与选举leader的最后的手段。这样做有可能丢失数据。 boolean true
zookeeper.connection.timeout.ms 连接zookeeper的最大等待时间,如果未设置,则使用zookeeper.session.timeout.ms。 int null
zookeeper.session.timeout.ms Zookeeper会话的超时时间 int 6000
zookeeper.set.acl 设置客户端使用安全的ACL boolean false
broker.id.generation.enable 启用自动生成broker id。启用该配置时应检查reserved.broker.max.id。 boolean true 中等
broker.rack broker机架,用于机架感知副本分配的失败容错。例如:RACK1, us-east-1d string null 中等
connections.max.idle.ms Idle 连接超时:闲置时间超过该设置,则服务器socket处理线程关闭这个连接。 long 600000 中等
controlled.shutdown.enable 启用服务器的关闭控制。 boolean true 中等
controlled.shutdown.max.retries 控制因多种原因导致的shutdown失败,当这样失败发生,尝试重试的次数 int 3 中等
controlled.shutdown.retry.backoff.ms 在每次重试之前,系统需要时间从导致先前故障的状态(控制器故障转移,复制延迟等)恢复。 此配置是重试之前等待的时间数。 long 5000 中等
controller.socket.timeout.ms 控制器到broker通道的sockt超时时间 int 30000
default.replication.factor 自动创建topic的默认的副本数 int 1
fetch.purgatory.purge.interval.requests 拉取请求清洗间隔(请求数) int 1000
group.max.session.timeout.ms 已注册的消费者允许的最大会话超时时间,设置的时候越长使消费者有更多时间去处理心跳之间的消息。但察觉故障的时间也拉长了。 int 300000
group.min.session.timeout.ms 已经注册的消费者允许最小的会话超时时间,更短的时间去快速的察觉到故障,代价是频繁的心跳,这可能会占用大量的broker资源。 int 6000
inter.broker.protocol.version 指定broker内部通讯使用的版本。通常在更新broker时使用。有效的值为:0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1。查看ApiVersion找到的全部列表。 string 0.10.1-IV2
log.cleaner.backoff.ms 当没有日志要清理时,休眠的时间 long 15000 [0,...]
log.cleaner.dedupe.buffer.size 用于日志去重的内存总量(所有cleaner线程) long 134217728
log.cleaner.delete.retention.ms 删除记录保留多长时间? long 86400000
log.cleaner.enable 在服务器上启用日志清洗处理?如果使用的任何topic的cleanup.policy=compact包含内部的offset topic,应启动。如果禁用,那些topic将不会被压缩并且会不断的增大。 boolean true
log.cleaner.io.buffer.load.factor 日志cleaner去重缓冲负载因子。去重缓冲区的百分比,较高的值将允许同时清除更多的日志,但将会导致更多的hash冲突。 double 0.9
log.cleaner.io.buffer.size 所有日志清洁器线程I/O缓存的总内存 int 524288 [0,...]
log.cleaner.io.max.bytes.per.second 日志清理器限制,以便其读写i/o平均小与此值。 double 1.7976931348623157E308
log.cleaner.min.cleanable.ratio 脏日志与日志的总量的最小比率,以符合清理条件 double 0.5
log.cleaner.min.compaction.lag.ms 一条消息在日志保留不压缩的最小时间,仅适用于正在压缩的日志。 long 0
log.cleaner.threads 用于日志清除的后台线程数 int 1 [0,...]
log.cleanup.policy 超过保留时间段的默认清除策略。逗号分隔的有效的策略列表。有效的策略有:“delete”和“compact” list [delete] [compact, delete]
log.index.interval.bytes 添加一个条目到offset的间隔 index int 4096 [0,...]
log.index.size.max.bytes offset index的最大大小(字节) int 10485760 [4,...]
log.message.format.version 指定追加到日志中的消息格式版本。例如: 0.8.2, 0.9.0.0, 0.10.0。通过设置一个特定消息格式版本,用户需要保证磁盘上所有现有的消息小于或等于指定的版本。错误的设置将导致旧版本的消费者中断,因为消费者接收一个不理解的消息格式。 string 0.10.1-IV2
log.message.timestamp.difference.max.ms 如果log.message.timestamp.type=CreateTime,broker接收消息时的时间戳和消息中指定的时间戳之间允许的最大差异。如果时间戳超过此阈值,则消息将被拒绝。如果log.message.timestamp.type=LogAppendTime,则此配置忽略。 long 9223372036854775807 [0,...]
log.message.timestamp.type 定义消息中的时间戳是消息创建时间或日志追加时间。该值可设置为CreateTimeLogAppendTime string CreateTime [CreateTime, LogAppendTime]
log.preallocate 在创建新段时预分配文件?如果你在Windowns上使用kafka,你可能需要设置它为true。 boolean false
log.retention.check.interval.ms 日志清除程序检查日志是否满足被删除的频率(以毫秒为单位) long 300000 [1,...]
max.connections.per.ip 允许每个ip地址的最大连接数。 int 2147483647 [1,...]
max.connections.per.ip.overrides per-ip或hostname覆盖默认最大连接数 string ""
num.partitions topic的默认分区数 int 1 [1,...]
principal.builder.class 实现PrincipalBuilder接口类的完全限定名,该接口目前用于构建与SSL SecurityProtocol连接的Principal。 class class org.apache.kafka. common.security.auth .DefaultPrincipalBuilder
producer.purgatory.purge.interval.requests 生产者请求purgatory的清洗间隔(请求数) int 1000
replica.fetch.backoff.ms 当拉取分区发生错误时休眠的时间 1000 [0,...]
replica.fetch.max.bytes 拉取每个分区的消息的字节数。这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。 int 1048576 [0,...]
replica.fetch.response.max.bytes 预计整个获取响应的最大字节数,这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。 int 10485760 [0,...]
reserved.broker.max.id broker.id的最大数 int 1000 [0,...]
sasl.enabled.mechanisms 可用的SASL机制列表,包含任何可用的安全提供程序的机制。默认情况下只有GSSAPI是启用的。 list [GSSAPI]
sasl.kerberos.kinit.cmd Kerberos kinit 命令路径。 string /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 登录线程在刷新尝试的休眠时间。 long 60000
sasl.kerberos.principal.to.local.rules principal名称映射到一个短名称(通常是操作系统用户名)。按顺序,使用与principal名称匹配的第一个规则将其映射其到短名称。忽略后面的规则。默认情况下,{username}/{hostname}@{REALM} 映射到 {username}。 list [DEFAULT]
sasl.kerberos.service.name Kafka运行的Kerberos principal名称。 可以在JAAS或Kafka的配置文件中定义。 string null
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动的百分比 time. double 0.05
sasl.kerberos.ticket.renew.window.factor 登录线程休眠,直到从上次刷新到ticket的到期的时间已到达(指定窗口因子),在此期间它将尝试更新ticket。 double 0.8
sasl.mechanism.inter.broker.protocol SASL机制,用于broker之间的通讯,默认是GSSAPI。 string GSSAPI
security.inter.broker.protocolSecurity broker之间的通讯协议,有效值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 string PLAINTEXT
ssl.cipher.suites 密码套件列表。认证,加密,MAC和秘钥交换算法的组合,用于使用TLS或SSL的网络协议交涉网络连接的安全设置,默认情况下,支持所有可用的密码套件。 list null
ssl.client.auth 配置请求客户端的broker认证。常见的设置: ssl.client.auth=required 需要客户端认证。 ssl.client.auth=requested 客户端认证可选,不同于requested ,客户端可选择不提供自身的身份验证信息 * ssl.client.auth=none 不需要客户端身份认证 string none [required, requested, none]
ssl.enabled.protocols 已启用的SSL连接协议列表。 list [TLSv1.2, TLSv1.1, TLSv1]
ssl.key.password 秘钥库文件中的私钥密码。对客户端是可选的。 password null
ssl.keymanager.algorithm 用于SSL连接的密钥管理工厂算法。默认值是Java虚拟机的密钥管理工厂算法。 string SunX509
ssl.keystore.location 密钥仓库文件的位置。客户端可选,并可用于客户端的双向认证。 string null
ssl.keystore.password 密钥仓库文件的仓库密码。客户端可选,只有ssl.keystore.location配置了才需要。 password null
ssl.keystore.type 密钥仓库文件的格式。客户端可选。 string JKS
ssl.protocol 用于生成SSLContext,默认是TLS,适用于大多数情况。允许使用最新的JVM,LS, TLSv1.1 和TLSv1.2。 SSL,SSLv2和SSLv3 老的JVM也可能支持,由于有已知的安全漏洞,不建议使用。 string TLS
ssl.provider 用于SSL连接的安全提供程序的名称。默认值是JVM的安全程序。 string null
ssl.trustmanager.algorithm 信任管理工厂用于SSL连接的算法。默认为Java虚拟机配置的信任算法。 string PKIX
ssl.truststore.location 信任仓库文件的位置 string null
ssl.truststore.password 信任仓库文件的密码 password null
ssl.truststore.type 信任仓库文件的文件格式 string JKS
authorizer.class.name 用于认证的授权程序类 string ""
metric.reporters 度量报告的类列表,通过实现MetricReporter接口,允许插入新度量标准类。JmxReporter包含注册JVM统计。 list []
metrics.num.samples 维持计算度量的样本数。 int 2 [1,...]
metrics.sample.window.ms 计算度量样本的时间窗口 long 30000 [1,...]
quota.window.num 在内存中保留客户端限额的样本数 int 11 [1,...]
quota.window.size.seconds 每个客户端限额的样本时间跨度 int 1 [1,...]
replication.quota.window.num 在内存中保留副本限额的样本数 int 11 [1,...]
replication.quota.window.size.seconds 每个副本限额样本数的时间跨度 int 1 [1,...]
ssl.endpoint.identification.algorithm 端点身份标识算法,使用服务器证书验证服务器主机名。 string null
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null
zookeeper.sync.time.ms ZK follower可落后与leader多久。 int 2000

更多关于broker配置的详情,可以在scala类中的kafka.server.KafkaConfig找到。

可查看之前的版本:Kafka Broker配置(0.8.2)

results matching ""

    No results matching ""