8.2 用户指南

提供了一个快速入门的例子,运行一个单机版的Kafka Connect。本节更详细的介绍如何配置,运行和管理Kafka Connect。

运行Kafka Connect

Kafka Connect目前支持两种执行模式:独立(单进程)和分布式。在独立模式下,所有的工作都在一个单进程中进行的。这样易于配置,在一些情况下,只有一个在工作是好的(例如,收集日志文件),但它不会从kafka Connection的功能受益,如容错。通过下面的命令开始一个单进程的例子:

> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

第一个参数是worker(工人)的配置,这包括 Kafka连接的参数设置,序列化格式,以及频繁地提交offset(偏移量)。本节提供的例子用的是默认的配置 conf/server.properties。其余的参数是connector(连接器)配置文件。你可以配置你需要的,但是所有的执行都在同一个进程(在不同的线程)。分布式的模式会自动平衡。允许你动态的扩展(或缩减),并在执行任务期间和配置、偏移量提交中提供容错保障,非常类似于独立模式:

bin/connect-distributed.sh config/connect-distributed.properties

在不同的类中,配置参数定义了Kafka Connect如何处理,哪里存储配置,如何分配work,哪里存储offset和任务状态。在分布式模式中,Kafka Connect在topic中存储offset,配置和任务状态。建议手动创建offset的topic,可以自己来定义需要的分区数和副本数。如果启动Kafka Connect时还没有创建topic,那么topic将自动创建(使用默认的分区和副本),这可能不是最合适的(因为kafka可不知道业务需要,只能根据默认参数创建)。特别是以下配置参数尤为关键,启动集群之前设置:

  • group.id (默认connect-cluster) - Connect cluster group使用唯一的名称;注意这不能和consumer group ID(消费者组)冲突。

  • config.storage.topic (默认connect-configs) - topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic。你需要手动创建这个topic,以确保是单个partition(自动创建的可能会有多个partition)。

  • offset.storage.topic (默认 connect-offsets) - topic用于存储offsets;这个topic应该配置多个partition和副本。

  • status.storage.topic (默认 connect-status) - topic 用于存储状态;这个topic 可以有多个partitions和副本

注意,在分布式模式中,connector(连接器)配置不能使用命令行。要使用下面介绍的REST API来创建,修改和销毁connector。

配置连接器(connector)

Connector的配置是简单的key-value映射。对于独立模式,这些都是在属性文件中定义,并通过在命令行上的Connect处理。在分布式模式,JSON负载connector的创建(或修改)请求。大多数配置都是依赖的connector,有几个常见的选项:

  • name
    • 连接器唯一的名称,不能重复。
  • connector.calss
    • 连接器的Java类。
  • tasks.max
    • 连接器创建任务的最大数。
  • connector.class 配置支持多种格式:全名或连接器类的别名。比如连接器是 org.apache.kafka.connect.file.FileStreamSinkConnector ,你可以指定全名,也可以使用 FileStreamSinkFileStreamSinkConnector 。Sink connector也有一个额外的选项来控制它们的输入:
  • topics - 作为连接器的输入的topic列表。

对于其他的选项,你可以查看连接器的文档。

REST API

由于Kafka Connect的目的是作为一个服务运行,提供了一个用于管理connector的REST API。默认情况下,此服务的端口是8083。以下是当前支持的终端入口:

  • GET /connectors
    • 返回活跃的connector列表
  • POST /connectors
    • 创建一个新的connector;请求的主体是一个包含字符串name字段和对象config字段(connector的配置参数)的JSON对象。
  • GET /connectors/{name}
    • 获取指定connector的信息
  • GET /connectors/{name}/config
    • 获取指定connector的配置参数
  • PUT /connectors/{name}/config
    • 更新指定connector的配置参数
  • GET /connectors/{name}/status
    • 获取connector的当前状态,包括它是否正在运行,失败,暂停等。
  • GET /connectors/{name}/tasks
    • 获取当前正在运行的connector的任务列表。
  • GET /connectors/{name}/tasks/{taskid}/status
    • 获取任务的当前状态,包括是否是运行中的,失败的,暂停的等,
  • PUT /connectors/{name}/pause
    • 暂停连接器和它的任务,停止消息处理,直到connector恢复。
  • PUT /connectors/{name}/resume
    • 恢复暂停的connector(如果connector没有暂停,则什么都不做)
  • POST /connectors/{name}/restart
    • 重启connector(connector已故障)
  • POST /connectors/{name}/tasks/{taskId}/restart
    • 重启单个任务 (通常这个任务已失败)
  • DELETE /connectors/{name}
    • 删除connector, 停止所有的任务并删除其配置

Kafka Connector还提供了获取有关connector plugins信息的REST API:

  • GET /connector-plugins- 返回已在Kafka Connect集群安装的connector plugin列表。请注意,API仅验证处理请求的worker的connector。这以为着你可能看不不一致的结果,特别是在滚动升级的时候(添加新的connector jar)

  • PUT /connector-plugins/{connector-type}/config/validate - 对提供的配置值进行验证,执行对每个配置验证,返回验证的建议值和错误信息。

results matching ""

    No results matching ""