消费者offset跟踪 (Consumer Offset Tracking)

The high-level consumer tracks the maximum offset it has consumed in each partition and periodically commits its offset vector so that it can resume from those offsets in the event of a restart. Kafka provides the option to store all the offsets for a given consumer group in a designated broker (for that group) called the offset manager. i.e., any consumer instance in that consumer group should send its offset commits and fetches to that offset manager (broker). The high-level consumer handles this automatically. If you use the simple consumer you will need to manage offsets manually. This is currently unsupported in the Java simple consumer which can only commit or fetch offsets in ZooKeeper. If you use the Scala simple consumer you can discover the offset manager and explicitly commit or fetch offsets to the offset manager. A consumer can look up its offset manager by issuing a ConsumerMetadataRequest to any Kafka broker and reading the ConsumerMetadataResponse which will contain the offset manager. The consumer can then proceed to commit or fetch offsets from the offsets manager broker. In case the offset manager moves, the consumer will need to rediscover the offset manager. If you wish to manage your offsets manually, you can take a look at these code samples that explain how to issue OffsetCommitRequest and OffsetFetchRequest.

高级别消费者跟踪每个分区已消费的最大的offset,并定期提交offset,在重新启动的情况下,可从这些offset恢复。Kafka提供了一个选项在指定的broker中来存储所有给定的消费者组的offset,称为offset manager。例如,该消费者组的所有消费者实例发送其offset,提交并获取该offset manager(broker)。高级别消费者都将会自动处理这些。如果你使用低级别的消费者,你将需要去手动管理offset。目前在低级别的java消费者不支持,只能在Zookeeper提交或获取offset。如果你使用简单的Scala消费者,你可拿到offset manager,并显式的提交或获取offset。消费者可以通过发送GroupCoordinatorRequest到任何的broker,并接受GroupCoordinatorResponse响应对象,对象包含offset manager,那么消费者可以继续从`offset manager broker`提交或获取offset。如果offset manager位置变动,消费者需要重新发现offset manager。如果你想手动管理你的offset,你可以看看OffsetCommitRequest 和 OffsetFetchRequest如何做的。

When the offset manager receives an OffsetCommitRequest, it appends the request to a special compacted Kafka topic named __consumer_offsets. The offset manager sends a successful offset commit response to the consumer only after all the replicas of the offsets topic receive the offsets. In case the offsets fail to replicate within a configurable timeout, the offset commit will fail and the consumer may retry the commit after backing off. (This is done automatically by the high-level consumer.) The brokers periodically compact the offsets topic since it only needs to maintain the most recent offset commit per partition. The offset manager also caches the offsets in an in-memory table in order to serve offset fetches quickly.

当offset manager接收一个OffsetCommitRequest,它追加请求到一个特定的压缩的名为__consumer_offsets的topic中,当offset topic的所有副本接收offset之后,offset manager发送一个成功的offset提交响应给消费者。万一offset无法在规定的时间内复制,offset将提交失败,消费者在回退之后可重试该提交(高级别消费者自动做的)。broker定期压缩offset topic,只需要保存每个分区最近的offset。offset manager也缓存offset在内存表中,以便offset快速获取。

When the offset manager receives an offset fetch request, it simply returns the last committed offset vector from the offsets cache. In case the offset manager was just started or if it just became the offset manager for a new set of consumer groups (by becoming a leader for a partition of the offsets topic), it may need to load the offsets topic partition into the cache. In this case, the offset fetch will fail with an OffsetsLoadInProgress exception and the consumer may retry the OffsetFetchRequest after backing off. (This is done automatically by the high-level consumer.)

当offset manager接收一个offset的获取请求,将从offset缓存中返回最新的的offset。如果offset manager刚启动或新的消费者组集刚成为offset manager(成为offset topic分区的leader),则需要加载offset topic的分区到缓存中,在这种情况下,offset将获取失败,并报出OffsetsLoadInProgress异常,消费者可后退后,重试OffsetFetchRequest(高级别消费者自动做这些)。

迁移offset从zookeeper到kafka

Kafka consumers in earlier releases store their offsets by default in ZooKeeper. It is possible to migrate these consumers to commit offsets into Kafka by following these steps:

Kafka消费者在早先的版本中offset默认存储在ZooKeeper。可以通过下面的步骤迁移这些消费者到Kafka。

  1. Set offsets.storage=kafka and dual.commit.enabled=true in your consumer config.

    在消费者配置设置offsets.storage=kafka和dual.commit.enabled=true。

  2. Do a rolling bounce of your consumers and then verify that your consumers are healthy.

    消费者做滚动消费,验证你的消费者是健康的。

  3. Set dual.commit.enabled=false in your consumer config.

    在你的消费者配置设置dual.commit.enabled=false

  4. Do a rolling bounce of your consumers and then verify that your consumers are healthy.

    消费者做滚动消费,验证你的消费者是健康的。

A roll-back (i.e., migrating from Kafka back to ZooKeeper) can also be performed using the above steps if you setoffsets.storage=zookeeper.

回滚(就是从kafka回到Zookeeper)也可以使用上面的步骤,通过设置 offsets.storage=zookeeper

ZooKeeper目录

The following gives the ZooKeeper structures and algorithms used for co-ordination between consumers and brokers.

下面给出了Zookeeper的结构和算法,用于协调消费者和经纪人之间。

符号(Notation)

When an element in a path is denoted [xyz], that means that the value of xyz is not fixed and there is in fact a ZooKeeper znode for each possible value of xyz. For example /topics/[topic] would be a directory named /topics containing a sub-directory for each topic name. Numerical ranges are also given such as [0...5] to indicate the subdirectories 0, 1, 2, 3, 4. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world".

当一个path中的元素表示为[XYZ],这意味着xyz的值不是固定的,实际上每个xyz的值可能是Zookeeper的znode,例如`/topic/[topic]` 是一个目录,/topic包含一个子目录(每个topic名称)。数字的范围如[0...5]来表示子目录0,1,2,3,4。箭头`->`用于表示znode的内容,例如 /hello-> world 表示znode /hello包含值”world”。

Broker Node Registry

/brokers/ids/[0...N] --
>
 host:port (ephemeral node)

This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) is an error.

这是所有当前broker的节点列表,其中每个提供了一个唯一的逻辑broker的id标识它的消费者(必须作为配置的一部分)。在启动时,broker节点通过在/brokers/ids/下用逻辑broker id创建一个znode来注册它自己。逻辑broker id的目的是当broker移动到不同的物理机器,而不会影响消费者。尝试注册一个已存在的broker id时将返回错误(因为2个server配置了相同的broker id)。

Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).

由于broker在Zookeeper中用的是临时znode,这个注册是动态的,如果broker关闭或宕机,节点将消失(通知消费者不在可用)。

Broker Topic Registry

/brokers/topics/[topic]/[0...N] --
>
 nPartions (ephemeral node)

Each broker registers itself under the topics it maintains and stores the number of partitions for that topic.

每个broker在它自己的topic下注册,维护和存储该topic分区的数据。

Consumers and Consumer Groups

Consumers of topics also register themselves in ZooKeeper, in order to coordinate with each other and balance the consumption of data. Consumers can also store their offsets in ZooKeeper by settingoffsets.storage=zookeeper. However, this offset storage mechanism will be deprecated in a future release. Therefore, it is recommended to migrate offsets storage to Kafka.

topic的消费者也在zookeeper注册他们自己,以便相互协调和平衡数据的消耗,消费者还可以通过设置offsets.storage=zookeeper来存储offset,但是,这个机制在未来的版本将会弃用。因此,建议迁移数据到kafka。

Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id. For example if one consumer is your foobar process, which is run across three machines, then you might assign this group of consumers the id "foobar". This group id is provided in the configuration of the consumer, and is your way to tell the consumer which group it belongs to.
多个消费者可组成一组,共同消费一个topic,在同一组中的每个消费者共享一个group_id。例如,如果一个消费者是foobar,在三个机器上运行,你可能分配这个这个消费组的ID是“foobar”。这个组id是在消费者的配置文件中配置。

The consumers in a group divide up the partitions as fairly as possible, each partition is consumed by exactly one consumer in a consumer group.
每个分区正好被一个消费者组的消费者所消费,一组中的消费者尽可能公平地分配分区。

Consumer Id Registry

In addition to the group_id which is shared by all consumers in a group, each consumer is given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes. Consumer ids are registered in the following directory.
除了由所有消费者共享的group_id,每个消费者都有一个临时,唯一的consumer_id(主机名的形式:uuid)用于识别。消费者的id在注册到在以下目录中。

/consumers/[group_id]/ids/[consumer_id] --
>
 {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

Each of the consumers in the group registers under its group and creates a znode with its consumer_id. The value of the znode contains a map of

<

topic, #streams

>

. This id is simply used to identify each of the consumers which is currently active within a group. This is an ephemeral node so it will disappear if the consumer process dies.

组中的每个消费者用consumer_id注册znode。znode的值包含一个map

<

topic,#streams

>

。这个id只是用来识别在组里目前活跃的消费者,这是个临时节点,如果消费者在处理中挂掉,它就会消失。

Consumer Offsets

Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory ifoffsets.storage=zookeeper. This valued is stored in a ZooKeeper directory.
消费者追踪它们在每个分区消费的最大offset,如果offsets.storage=zookeeper,那此值就存在zookeeper的目录中。

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --
>
 offset_counter_value ((persistent node)

Partition Owner registry

Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming.
每个broker分区由一个消费者组里的单个消费者消费,任何消费者开始消费之前,消费者必须建立其所有权,为了建立所有权,消费者写入自己的ID到临时节点 。

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --
>
 consumer_node_id (ephemeral node)

Cluster Id(集群ID)

The cluster id is a unique and immutable identifier assigned to a Kafka cluster. The cluster id can have a maximum of 22 characters and the allowed characters are defined by the regular expression [a-zA-Z0-9_-]+, which corresponds to the characters used by the URL-safe Base64 variant with no padding. Conceptually, it is auto-generated when a cluster is started for the first time.

集群ID是Kafka集群中唯一且不可变的标识符。集群ID最多可以包含22个字符,对应于没有填充的URL安全Base64变量所使用的字符,允许通过正则表达式 [a-zA-Z0-9 _ \]]定义。从概念上讲,当第一次启动集群时,集群ID就会自动生成。

Implementation-wise, it is generated when a broker with version 0.10.1 or later is successfully started for the first time. The broker tries to get the cluster id from the /cluster/id znode during startup. If the znode does not exist, the broker generates a new cluster id and creates the znode with this cluster id.

实际上,当第一次成功启动时生成(0.10.1或更高版本)。broker尝试在启动期间从 /cluster/id znode获取集群ID。 如果znode不存在,broker将生成一个新的集群ID,并使用此集群ID创建znode。

Broker node registration

The broker nodes are basically independent, so they only publish information about what they have. When a broker joins, it registers itself under the broker node registry directory and writes information about its host name and port. The broker also register the list of existing topics and their logical partitions in the broker topic registry. New topics are registered dynamically when they are created on the broker.
broker节点基本上都是独立的, 所以它们只发布有关它们的信息,当broker连接,注册broker节点到其注册目录下,并写入它的host name和prot信息。broker还注册了其注册的现有topic和逻辑分区的列表。当创建一个新topic,就在broker上动态的注册新的topic。

Consumer registration algorithm

When a consumer starts, it does the following:
当一个消费者启动,它会做以下几步:

  1. Register itself in the consumer id registry under its group.

    在消费者
    id
    登记处组下
    注册它自己

  2. Register a watch on changes (new consumers joining or any existing consumers leaving) under the consumer id registry. (Each change triggers rebalancing among all consumers within the group to which the changed consumer belongs.)

    在消费ID登记处注册一个观察者(新加入的消费者或任何现有的消费者离开)。(每次变化就会触发,改变所属的组内的所有消费者重新平衡。)

  3. Register a watch on changes (new brokers joining or any existing brokers leaving) under the broker id registry. (Each change triggers rebalancing among all consumers in all consumer groups.)

    在broker的id登陆处注册一个观察者(新加入的broker和现有的离开)。(
    每次改变触发,在所有消费者组的所有消费者之间重新平衡。

  4. If the consumer creates a message stream using a topic filter, it also registers a watch on changes (new topics being added) under the broker topic registry. (Each change will trigger re-evaluation of the available topics to determine which topics are allowed by the topic filter. A new allowed topic will trigger rebalancing among all consumers within the consumer group.)

    如果消费者创建一个消费流用于topic过滤器,并在broker topic登记处注册一个观察者(监听新添加的topic)。(每次变化将触发可用的topic进行重新评估,以确定哪些topic是过滤器允许的。新的topic将触发消费者组中所有消费者之间进行重新平衡)

  5. Force itself to rebalance within in its consumer group.

    在其消费者组中进行强制重新平衡。

消费者再平衡算法(Consumer rebalancing algorithm)

The consumer rebalancing algorithms allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Consumer rebalancing is triggered on each addition or removal of both broker nodes and other consumers within the same group. For a given topic and a given consumer group, broker partitions are divided evenly among consumers within the group. A partition is always consumed by a single consumer. This design simplifies the implementation. Had we allowed a partition to be concurrently consumed by multiple consumers, there would be contention on the partition and some kind of locking would be required. If there are more consumers than partitions, some consumers won't get any data at all. During rebalancing, we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to.
消费者再平衡算法允许组中所有的消费者消费哪一个分区达成共识,同组中的broker和其他的消费者的每一次增加或移除触发消费者再平衡。对于一个给定的topic和给定的消费者组,组内的消费者之间均匀的分配broker分区。如果我们允许一个分区被多个消费者共同消费,这需要锁了,所有我们设计一个分区永远只有一个消费者进行消费。这样设计简化了很多。如果消费者比分区多,那么一些消费者将不会获得任何数据。在再平衡期间,我们试图分配分区给消费者。以这样的方式来减少每个消费者连接到broker的节点数。

Each consumer does the following during rebalancing:
以下是每个消费者再平衡的过程:

  1. For each topic T that Ci subscribes to 
   2.   let PT be all partitions producing topic T
   3.   let CG be all consumers in the same group as Ci that consume topic T
   4.   sort PT (so partitions on the same broker are clustered together)
   5.   sort CG 6.   let i be the index position of Ci in CG and let N = size(PT)/size(CG)
   7.   assign partitions from i*N to (i+1)*N - 1 to consumer Ci 8.   remove current entries owned by Ci from the partition owner registry
   9.   add newly assigned partitions to the partition owner registry
        (we may need to re-try this until the original partition owner releases its ownership)

When rebalancing is triggered at one consumer, rebalancing should be triggered in other consumers within the same group about the same time.

当一个消费者的再平衡被触发时,在同一时间内,相同的组内的其他消费者也会被触发。

results matching ""

    No results matching ""