Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限。当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响。

有2个接口可以实现限制。最简单和最安全的是调用kafka-reassign-partitions.sh时加限制。另外kafka-configs.sh也可以直接查看和修改限制值。

例如,当执行重新平衡时,用下面的命令,它在移动分区时,将不会超过50MB/s。

$ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181--execute --reassignment-json-file bigger-cluster.json —throttle 50000000

当你运行这个脚本,你会看到这个限制:

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.

如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file:

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000

  There is an existing assignment running.
  The throttle limit was set to 700000000 B/s

一旦重新平衡完成,可以使用--verify操作验证重新平衡的状态。如果重新平衡已经完成,限制也会通过--verify命令移除。这点很重要,因为一旦重新平衡完成,并通过--veriry操作及时移除限制。否则可能会导致定期复制操作的流量也受到限制。

当--verify执行,并且重新分配已完成时,此脚本将确认限制被移除:

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --verify --reassignment-json-file bigger-cluster.json
  Status of partition reassignment:
  Reassignment of partition [my-topic,1] completed successfully
  Reassignment of partition [mytopic,0] completed successfully
  Throttle was removed.

管理员还可以使用kafka-configs.sh验证已分配的配置。有2对限制配置用于管理限流。而限制值本身,是个broker级别的配置,用于动态属性配置:

leader.replication.throttled.rate
  follower.replication.throttled.rate

此外,还有枚举集合的限流副本:

leader.replication.throttled.replicas
  follower.replication.throttled.replicas

其中每个topic配置,所有4个配置值通过kafka-reassign-partitions.sh(下面讨论)自动分配。

查看限流配置:

$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
  Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
  Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000

这显示了应用于复制协议的leader和follower的限制。默认情况下,2个都分配了相同的限制值。

要查看限流副本的列表:

$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics
  Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
      follower.replication.throttled.replicas=1:101,0:102

这里我们看到leader限制被应用到broker 102上的分区1和broker 101的分区0.同样,follower限制应用到broker 101的分区1和broker 102的分区0.

默认情况下,kafka-reassign-partitions.sh会将leader限制应用于重新平衡前存在的所有副本,任何一个副本都可能是leader。它将应用follower限制到所有移动目的地。因此,如果broker 101,102上有一个副本分区,被分配给102,103,则该分区的leader限制,将被应用到101,102,并且follower限制将仅被应用于103。

如果需要,你还可以使用kafka-configs.sh的--alter开关手动地更改限制配置。

安全的使用限制复制

在使用限制复制时应各位的小心,特别是:

(1) 限制移除:

一旦重新分配完成,限制应该及时的移除(通过运行kafka-reassign-partitions —verify移除)。

(2) 确保进展:

如果限制设置的太低,与传入的写入速率相比,复制可能无法进行:

max(BytesInPerSec) > throttle

其中BytesInPerSec是监控生产者写入到broker的吞吐量。

可以使用该命令监视重新平衡期间复制是否在进行,使用以下方式:

kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)

在复制期间落后应不断地减少,如果没有缩小,则管理员通过上面介绍的方式增加限制的吞吐量。

设置配额

默认情况下,客户端的配额不受限制。可以为每个(user,client-id),user或client-id分组设置自定义的配额。

配置自定义的配额(user=user1,client-id=clientA):

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.

user=user1配置自定义的配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1
Updated config for entity: user-principal 'user1'.

client-id=clientA配置自定义的配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-name clientA
Updated config for entity: client-id 'clientA'.

可以通过--entity-default为(user,client-id),userclient-id group设置默认的配额。
为user=userA配置默认client-id配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-default
Updated config for entity: user-principal 'user1', default client-id.

为user配置默认配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-default
Updated config for entity: default user-principal.

为client-id配置默认配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-default
Updated config for entity: default client-id.

为指定的(user,client-id)展示配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048

为指定的user展示配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048

为指定的client-id展示配额。

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048

如果没有指定名称,则展示指定的类型的,查看所有user:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048

(user,client)也是一样:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048

可以通过在broker中设置以下配置来设置适用于所有client-id的默认配额。仅当未在Zookeeper中配置配额覆盖或默认值时,才应用这些属性。 默认情况下,每个client-id接收一个无限制的配额。 以下将每个producerconsumer client-id的默认配额设置为10MB /秒。

quota.producer.default=10485760
quota.consumer.default=10485760

请注意,这些属性已被弃用,可能会在将来的版本中删除。 请优先使用kafka-configs.sh。

results matching ""

    No results matching ""