8.2 用户指南
提供了一个快速入门的例子,运行一个单机版的Kafka Connect。本节更详细的介绍如何配置,运行和管理Kafka Connect。
运行Kafka Connect
Kafka Connect目前支持两种执行模式:独立(单进程)和分布式。在独立模式下,所有的工作都在一个单进程中进行的。这样易于配置,在一些情况下,只有一个在工作是好的(例如,收集日志文件),但它不会从kafka Connection的功能受益,如容错。通过下面的命令开始一个单进程的例子:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
第一个参数是worker(工人)的配置,这包括 Kafka连接的参数设置,序列化格式,以及频繁地提交offset(偏移量)。本节提供的例子用的是默认的配置 conf/server.properties
。其余的参数是connector(连接器)配置文件。你可以配置你需要的,但是所有的执行都在同一个进程(在不同的线程)。分布式的模式会自动平衡。允许你动态的扩展(或缩减),并在执行任务期间和配置、偏移量提交中提供容错保障,非常类似于独立模式:
bin/connect-distributed.sh config/connect-distributed.properties
在不同的类中,配置参数定义了Kafka Connect如何处理,哪里存储配置,如何分配work,哪里存储offset和任务状态。在分布式模式中,Kafka Connect在topic中存储offset,配置和任务状态。建议手动创建offset的topic,可以自己来定义需要的分区数和副本数。如果启动Kafka Connect时还没有创建topic,那么topic将自动创建(使用默认的分区和副本),这可能不是最合适的(因为kafka可不知道业务需要,只能根据默认参数创建)。特别是以下配置参数尤为关键,启动集群之前设置:
group.id
(默认connect-cluster
) - Connect cluster group使用唯一的名称;注意这不能和consumer group ID(消费者组)冲突。config.storage.topic
(默认connect-configs
) - topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic。你需要手动创建这个topic,以确保是单个partition(自动创建的可能会有多个partition)。offset.storage.topic
(默认connect-offsets
) - topic用于存储offsets;这个topic应该配置多个partition和副本。status.storage.topic
(默认connect-status
) - topic 用于存储状态;这个topic 可以有多个partitions和副本
注意,在分布式模式中,connector(连接器)配置不能使用命令行。要使用下面介绍的REST API来创建,修改和销毁connector。
配置连接器(connector)
Connector的配置是简单的key-value映射。对于独立模式,这些都是在属性文件中定义,并通过在命令行上的Connect处理。在分布式模式,JSON负载connector的创建(或修改)请求。大多数配置都是依赖的connector,有几个常见的选项:
name
- 连接器唯一的名称,不能重复。
connector.calss
- 连接器的Java类。
tasks.max
- 连接器创建任务的最大数。
connector.class
配置支持多种格式:全名或连接器类的别名。比如连接器是org.apache.kafka.connect.file.FileStreamSinkConnector
,你可以指定全名,也可以使用FileStreamSink
或FileStreamSinkConnector
。Sink connector也有一个额外的选项来控制它们的输入:- topics - 作为连接器的输入的topic列表。
对于其他的选项,你可以查看连接器的文档。
REST API
由于Kafka Connect的目的是作为一个服务运行,提供了一个用于管理connector的REST API。默认情况下,此服务的端口是8083
。以下是当前支持的终端入口:
GET /connectors
- 返回活跃的connector列表
POST /connectors
- 创建一个新的connector;请求的主体是一个包含字符串name字段和对象config字段(connector的配置参数)的JSON对象。
GET /connectors/{name}
- 获取指定connector的信息
GET /connectors/{name}/config
- 获取指定connector的配置参数
PUT /connectors/{name}/config
- 更新指定connector的配置参数
GET /connectors/{name}/status
- 获取connector的当前状态,包括它是否正在运行,失败,暂停等。
GET /connectors/{name}/tasks
- 获取当前正在运行的connector的任务列表。
GET /connectors/{name}/tasks/{taskid}/status
- 获取任务的当前状态,包括是否是运行中的,失败的,暂停的等,
PUT /connectors/{name}/pause
- 暂停连接器和它的任务,停止消息处理,直到connector恢复。
PUT /connectors/{name}/resume
- 恢复暂停的connector(如果connector没有暂停,则什么都不做)
POST /connectors/{name}/restart
- 重启connector(connector已故障)
POST /connectors/{name}/tasks/{taskId}/restart
- 重启单个任务 (通常这个任务已失败)
DELETE /connectors/{name}
- 删除connector, 停止所有的任务并删除其配置
Kafka Connector还提供了获取有关connector plugins
信息的REST API:
GET /connector-plugins
- 返回已在Kafka Connect集群安装的connector plugin
列表。请注意,API仅验证处理请求的worker的connector。这以为着你可能看不不一致的结果,特别是在滚动升级的时候(添加新的connector jar)PUT /connector-plugins/{connector-type}/config/validate
- 对提供的配置值进行验证,执行对每个配置验证,返回验证的建议值和错误信息。