• 产品与解决方案
  • 行业解决方案
  • 服务
  • 支持
  • 合作伙伴
  • 关于我们

H3C Kafka 用户手册 E5137-5W100

01-正文

本章节下载  (1.18 MB)

01-正文


1 产品简介

1.1  产品概述

Kafka是一个分布式流平台(不只是消息系统),具备以下关键特性:

·     发布-订阅消息流(类似于消息队列或企业消息系统)

·     同时支持离线数据处理和实时数据处理

·     以容错持久的方式存储消息流

·     支持在线水平扩展、高吞吐

Kafka支持两类应用程序:

·     构建实时流数据管道,能够可靠地在系统之间获取数据

·     构建实时流应用程序,能够对数据流进行转换或响应

Kafka应用特性如下:

·     Kafka可以运行在一个或多个服务器上,跨越多个数据中心

·     Kafka将消息流存储在Topic中

·     Kafka消息由键(key)、值(value)和时间戳(timestamp)组成

1.2  组件架构

Kafka依赖Zookeeper,Zookeeper负责维护Kafka的元数据(broker、topic、partition)等信息,实现Kafka的动态管理。如图1-1所示,Kafka架构主要由Broker、Producer、Consumer以及Consumer Group组成。其中:

·     Broker:Kafka包含的一个或多个服务器

·     Producer:负责发布消息到Kafka Broker

·     Consumer:从Kafka Broker读取消息的客户端

·     Consumer Group:每个Consumer属于一个特定的Consumer Group

图1-1 Kafka架构

 

1.3  资源模式

在新建集群时,选择不同资源模式需要准备不同的资源类型,在Kafka云服务中,Kafka支持部署的资源模式有虚拟机部署、裸金属部署和容器部署。资源类型准备说明如下:

·     虚拟机集群:新建虚拟机集群前,要求在云操作系统中有可用资源,可通过[资源/计算可用域]完成虚拟机云资源的准备。

·     裸金属集群:裸金属的节点规格根据硬件配置自动获取,规格数据来自云平台的[资源/裸金属资源池]中处于“可分配”状态的节点。

·     容器集群:要先创建KaaS集群,KaaS集群可视为容器集群的资源域。可通过[云服务/云容器引擎/集群]新建KaaS集群。

1.4  应用场景

·     替代传统消息系统

相较于传统的消息系统,Kafka具备更好的吞吐量、低延迟、分区、副本、容错等特性,有利于处理大规模的消息。

·     网站活动追踪

Kafka可以用来记录用户的各种活动,例如网页浏览、搜索、点击等活动,这些活动被发布到Kafka的Topic中,可用于实时监控处理、实时监测或加载到Hadoop或离线数据仓库中。

·     日志聚合

日志聚合通常从服务器收集日志文件,并将它们放在日志收集中心(可以是文件服务器或HDFS)进行处理。Kafka抽象出文件的细节,将日志或事件数据更清晰地抽象为消息流,能够保证更低延迟的处理,也更容易支持多个生产者和消费者。

·     流处理

流处理通常包含多个阶段,可以通过Kafka进行中转。例如新闻推荐场景中,新闻内容可以从“articles”主题获取,经过进一步处理得到新内容后再推荐给用户。


2 快速入门

2.1  快速使用指导

本章主要介绍创建Kafka集群和集群状态检查。

2.2  新建Kafka集群

当前版本中,Kafka集群支持部署在虚拟机、裸金属和容器上;用户选择不同的集群模式,对应的集群配置不同。新建集群前,要求在云平台中已完成云资源的准备。

根据集群部署规划,在云资源准备完成以后,可新建Kafka集群,步骤详情可参见产品安装部署手册或在线联机帮助。

本文档内容以集群已创建成功为前提,介绍如何进行状态检查和对集群的基本操作等。

2.3  状态检查

Kafka集群创建成功后可以通过界面上的集群状态检查,确保创建的集群可用。可在集群管理页面查看集群列表中的运行状态,若运行状态为“运行中”、健康状态为“健康”,且未发生告警,则表示集群运行正常;否则,集群可能处于异常状态,需排查相应问题。如图2-1所示。

图2-1 集群管理

 


3 使用指南

3.1  虚拟机和裸金属集群使用指南

3.1.1  集群详情

进入集群详情页面,集群详情页主要展示该集群的基本信息、概览、组件、主机、Topic、消费者组、消息查询、告警和计费等相关信息,同时可对集群进行各项操作。

·     基本信息:展示该集群的基本信息,包含资源区域、资源类型、网络、密钥对、连接信息、运行状态、监控状态等更多信息。

·     概览:展示了该集群中组件的统计、CPU\磁盘\内存使用率的性能趋势、CPU\磁盘\内存使用率等。

·     组件:展示集群中安装的组件以及对组件的启动、停止、重启等操作;单击组件名,可查看组件中进程的部署拓扑、配置、配置修改历史信息。

·     主机:展示集群中的主机信息,可以执行主机扩容、查看监控详情等操作;单击主机名可跳转至主机详情页面。

·     Topic:展示集群中Topic列表,可以执行新建Topic、增加分区、删除Topic等操作。单击Topic名可跳转至Topic详情页面,查看订阅关系和分区状态信息。

·     消费者组:展示集群的消费者组信息。

·     消息查询:可查看消息查询列表;用户可自定义查询条件查询分区信息。

·     告警:展示集群中告警信息,可配置该集群的告警规则。

·     计费:根据在[系统/系统配置/运营配置/计费策略]页面配置的对应计费策略,按照使用集群的时长计算使用费用。

¡     计费开始时间:当计费策略时间早于集群创建时间时,将按照创建集群的时间开始计费;当集群配置时间早于配置计费策略时间时,将按照配置计费策略的时间开始计费。

¡     计费结束时间:展示当前时间,删除集群或计费策略时将停止计费。

¡     费用:(计费结束时间-计费开始时间)×单价

图3-1 集群详情

 

3.1.2  组件详情

Kafka组件详情页面主要展示部署拓扑、配置详情和配置修改历史等相关信息,可查看或修改组件的各配置项信息,查看组件的配置修改历史及当前使用配置版本,同时可对组件或组件进程执行相关管理操作。

主要功能如下:

·     部署拓扑:在[组件详情/部署拓扑]页签,可查看组件进程的安装详情以及运行状态详情,并可对组件进程执行停止、重启等相关操作。

¡     【说明】进程名:同一个进程可分别安装在多个主机节点上,所以进程列表中某一进程名可能重复出现,但同一进程名对应的主机名和主机IP不同。

·     配置:在[组件详情/配置]页签,可查看或修改组件各配置项的信息。

·     配置修改历史:在[组件详情/配置修改历史]页签,可查询组件的配置历史版本以及当前使用版本。

·     组件操作:在组件详情右上角,可对组件执行相关管理操作。比如:重启、停止组件等。

图3-2 组件详情

 

3.1.3  集群扩容

集群在使用过程中,根据实际需要,可执行主机扩容的操作。

(1)     在云服务管理页面的左侧导航树中选择[集群管理],进入集群管理页面。

(2)     在集群管理页面,选择[集群列表]页签,单击集群名称可跳转至集群详情页面。

(3)     在[集群详情/主机]页签下单击<主机扩容>按钮,或在集群详情页面右上角<集群操作>下拉框中选择“主机扩容”,会弹出主机扩容窗口。

¡     基本信息:显示节点对应的资源区域、虚拟化类型,与集群中其他主机节点保持一致。

¡     节点信息:主机扩容的节点类型为Kafka实例,节点规格默认为原有主机规格,扩容后节点数范围为1~99。进程列表中的所有进程均会在扩容的节点上部署。

(4)     集群节点配置完成后,单击<确定>按钮即可完成扩容。

图3-3 主机扩容

 

 

(5)     查看进程变化

Kafka扩容完成之后,在组件详情页面[部署拓扑]页签中可以查看Kafka Broker安装数量的变化以及状态。

(6)     重启组件(根据实际情况选择)

进入集群详情页面,选择[组件]页签,需根据页面提示重新启动相关组件。

3.1.4  主机、磁盘管理

通过顶部导航栏[云服务/消息中间件Kafka]进入Kafka后,在左侧导航树[主机管理]及[磁盘管理]进入对应页面,可以查看创建的所有Kafka集群主机,以及主机的资源使用情况和磁盘使用情况。

图3-4 主机管理配置

 

图3-5 磁盘管理

 

3.1.5  告警管理

消息中间件Kafka对Kafka集群提供监控告警的功能。通过顶部导航栏[云服务/消息中间件Kafka]进入消息中间件Kafka后,选择左侧导航树[告警管理]进入告警管理页面,可以查看告警信息,并对告警联系人及告警联系组进行管理。

图3-6 告警管理配置

 

3.1.6  权限访问控制

1. 权限管理

说明

仅创建Kafka集群时开启“安全管理/权限管理”且运行正常的集群可执行新建角色的操作。

·     开启权限管理后,组件权限需通过[系统管理/角色管理]中的角色分配给用户,用户通过绑定角色获得相应的权限后,才能对组件进行操作。

·     Kafka组件支持对Topic配置权限,权限包括:publish和consumer。

 

权限管理是安全管理的重要组成部分,在开启安全管理的集群中,权限基于角色进行统一管理,角色是权限的集合;用户对Kafka集群中Topic的操作需被赋予Topic相关权限后才能执行。

为用户赋予权限的整体流程如下:

(1)     新建角色,并为角色配置权限。

(2)     新建用户,并将角色分配给用户,用户即拥有角色所具有的权限。

2. 权限操作示例

下面以将“test”主题的“publish”权限授予“kafkarole”角色为例,介绍Kafka组件的权限访问控制。操作步骤如下:

(1)     新建角色

在Kafka云服务管理页面的左侧导航树中选择[系统管理/角色管理],进入角色管理页面,创建角色kafkarole,为角色配置相关Topic的权限,也可在创建角色时不选择任何组件权限,在角色创建后通过编辑角色修改权限配置;本例中暂不配置权限,如图3-7所示。

图3-7 新建角色

 

(2)     [系统管理/用户管理]页面,创建用户kafkauser,单击对应操作列的<修改用户授权>,在弹出的页面中为用户绑定角色kafkarole;用户kafkauser继承角色kafkarole的权限,此时不具备组件权限。

图3-8 用户授权

 

(3)     在[系统管理/角色管理]页面,修改角色kafkarole的权限设置,使kafkarole角色对主题test有publish消息的权限;已绑定角色kafkarole的用户kafkauser,此时继承对主题Test的publish权限。

图3-9 编辑角色权限

 

(4)     在[系统管理/用户管理]页面下载认证文件:单击用户列表中某用户对应的<下载认证文件>按钮,会下载用户对应的认证文件至本地,文件名称为“用户名-keytab”。将认证文件配置到客户端上,进行生产消费,可参考4.1.2  Kerberos环境

3.1.7  数据同步

一般情况下用户使用一套Kafka集群来完成业务,但是在某些情况下用户需要多套Kafka集群同时工作,比如一些灾难恢复场景或者为地理相近的客户提供低延时服务,此时可使用数据同步功能,将源Kafka集群中的数据同步至目标Kafka集群。

1. 新建Kafka数据同步链路

当前版本中,Kafka数据同步链路支持部署在虚拟机和裸金属上;用户选择不同的集群模式,对应的集群配置不同。新建集群前,要求在云平台中已完成云资源的准备。

根据集群部署规划,在云资源准备完成后,可新建Kafka数据同步链路,步骤详情可参见产品安装部署手册或在线联机帮助。

本文档内容将以集群已创建成功为前提,介绍如何进行状态检查。对Kafka数据同步链路的基本操作请参见产品在线联机帮助。

2. 状态检查

Kafka数据同步链路创建成功后可以通过界面上的集群状态检查,确保创建的集群可用。可在集群管理页面查看集群列表中的运行状态,若运行状态为“运行中”且健康状态为“健康”,则表示集群运行正常。如图3-10所示。

图3-10 数据同步链路列表

 

3. 数据同步链路详情

同步链路详情页面展示同步链路的基本信息、同步任务、组件、主机和计费等信息,并进行基本操作。如图3-11

(1)     在Kafka云服务管理页面的左侧导航树中选择[数据同步]菜单项,选择[同步链路列表]页签,单击同步链路名称可跳转至同步链路详情页面。

¡     基本信息:展示同步链路基本配置信息,以便于快速了解同步链路。

¡     同步任务列表:在[同步任务]页签中,可查看同步链路中正在运行的同步任务。

-     单击<新建同步任务>按钮,可执行新建同步任务操作,将源集群中未同步的Topic纳入同步范围内。

-     操作:可通过任务对应操作列的按钮,对同步任务执行停止/启动/删除/重启同步任务等操作。

¡     组件:在[组件]页签中,查看当前数据同步集群的组件状态和启停操作。

¡     主机:显示数据同步节点的基本信息,包括主机名、状态、主机IP、CPU/内存/磁盘使用率和CPU核数。

¡     计费:根据在[系统/系统配置/运营配置/计费策略]页面配置的对应计费策略,按照使用链路的时长计算使用费用。

-     计费开始时间:当计费策略时间早于链路创建时间时,将按照创建链路的时间开始计费;当链路配置时间早于配置计费策略时间时,将按照配置计费策略的时间开始计费。

-     计费结束时间:展示当前时间,删除链路或计费策略时将停止计费。

-     费用:(计费结束时间-计费开始时间)×单价。

图3-11 数据同步链路详情

 

3.1.8  容灾切换

在灾难恢复场景,通过容灾切换,实现将应用连接从主Kafka集群切换到备Kafka集群,本文中称之为正向切换;待主Kafka集群故障恢复,再将应用连接从备Kafka集群切换回主Kafka集群,本文称之为反向回切。

说明

·     待切换的两个主备Kafka集群之间,要成功建立正向Kafka数据同步链路和反向Kafka数据同步链路,且数据同步链路集群必须与目的端Kafka集群在同一个云平台。

·     若主Kafka集群作为源端集群,与备Kafka集群建立数据同步链路,安装了Kafka-Connector 3.2.3版本组件,则这两个集群之间反向的数据同步链路必须安装2.7.0版本的Kafka-Connector,以避免同步的Topic循环复制。

·     自E5137版本MQS支持容灾切换功能,E5137之前版本创建的数据同步链路,无法用于容灾切换;在此情况下,需要删除旧版本的数据同步链路,重新创建数据同步链路,方可使用容灾切换。

·     创建主Kafka集群到备Kafka集群之间正向和反向的数据同步任务;注意,配置同步任务的同步对象时,只能选择“自定义同步的Topic”,不能选择“同步全部Topic”。

·     当前版本,此功能暂不支持开启安全管理的Kafka集群。

 

1. 正向切换

主Kafka集群到备Kafka集群正向切换之前的必须工作,需要进行激活offset,使应用能够在本平台的Kafka集群上继续消费。本操作需要在备Kafka集群所在的云平台执行。

激活offset

(1)     在正向切换页面,显示主Kafka集群所有消费者组订阅的Topic。

(2)     单击<全部激活>按钮,进入激活offset过程。

(3)     页面上方正在激活offset的Kafka集群数量为0时,激活过程结束,请查看Topic的激活结果。

失败重试

执行激活offset操作后,若存在激活offset失败的Topic,可通过失败重试操作,对激活失败的Topic重新执行激活offset命令。

2. 反向回切

备Kafka集群到主Kafka集群反向回切之前的必须工作,需要进行消费状态检查和重置offset。

消费状态检查

容灾切换时,将应用重新连接到主Kafka集群之前,在此页面查看有效的消费者组是否消费完成,以防容灾切换后造成数据丢失。本操作需要在备Kafka集群所在的云平台执行,待消费完成的Kafka集群数量为0时,表示有效的消费者组已经完成消费,消费状态检查通过。

重置offset

(1)     在反向回切页面,选择[重置offset]页签。

(2)     单击<选择topic>,弹出选择Topic页面;勾选待重置offset的Topic,单击<保存>,回到重置offset页面。

(3)     单击<全部重置>,完成操作。

失败重试

可通过失败重试操作,对重置offset失败的Topic重新执行重置offset命令。

3.1.9  日志管理

日志管理主要是对Kafka服务运行日志及集群运行日志的收集并下载供用户使用的,提高快速定位异常问题的效率。在[日志管理]页面可收集运行日志、集群日志。本节仅支持系统管理员操作。

1. 运行日志收集

在服务运行过程中,若遇到异常或其他情况,可查看运行日志信息以便定位异常。

(1)     在日志管理页面,选择<运行日志收集>,弹出运行日志收集窗口。

(2)     输入SSH端口信息和root密码后,单击<测试远程连接>按钮,返回远程连接校验结果。

(3)     显示运行日志收集条件。部分参数说明如下:

¡     远程连接信息

-     SSH端口:云平台远程SSH连接端口。

-     root密码:root用户的远程登录密码。

¡     日志收集类型

-     基础日志:base服务中的相关日志,默认选中不可修改。

-     服务日志:Kafka服务运行的日志,默认选中不可修改。

¡     日志收集时间:日志产生的时间范围,默认选择前一天和当天的时间,最大范围30天。

(4)     相关信息配置完成后,单击<确定>按钮,即可开始运行日志收集并下载日志压缩文件。

2. 集群日志收集

在集群使用过程中,若出现集群运行异常或其他情况,可查看集群节点的日志信息以便定位异常。

说明

·     支持“Kafka集群”和“数据同步”两种集群类型的日志下载。

·     仅支持下载集群节点为“在线”状态的集群日志信息。

·     集群日志收集不支持容器集群日志的收集。用户可通过查看实例列表操作,进入实例列表页面;单击实例对应操作列的<日志>,跳转至该实例详情页面,查看实例日志,或单击日志上方<导出>按钮,导出日志。

 

(1)     在日志管理页面,选择<集群日志收集>,单击该页签,弹出集群日志收集窗口。

(2)     选择需要下载日志信息的集群类型和集群名称,会查询当前集群下的节点信息并展示在下方列表中,部分参数说明如下:

¡     节点名:集群节点的名称。

¡     状态:集群节点的状态,分为在线、离线两种状态。

¡     节点IP:集群节点的IP信息。

(3)     当集群节点状态为“在线”时,单击列表操作列的<下载>按钮,即可开始集群日志收集并下载日志压缩文件。

3.2  容器集群使用指南

3.2.1  集群详情

容器集群详情页面展示集群基本信息、集群中安装的组件和实例等,同时支持对集群执行扩容操作。

·     基本信息:展示集群基本配置信息,以便于快速了解集群。

·     组件:查看当前集群中已安装的所有组件列表、组件当前运行状态并对组件执行相关操作;单击组件名,可进入组件配置页面。

·     实例:查看集群中的实例列表、实例状态、重启次数、命名空间等信息,部分参数说明如下:

¡     实例名称:单击实例名后可跳转至实例详情页面。

¡     重启次数:该实例重启的次数。

¡     命名空间:该实例所属命名空间。

¡     PV名称:该实例使用的PV存储卷名称。

¡     实例IP:该实例的IP地址。

¡     所在节点:该实例所在Kubernetes宿主机IP。

¡     操作:单击<监控详情>查看实例监控信息;单击<查看YAML>查看该实例的YAML信息;单击<日志>,跳转到实例详情“日志”页签,并显示实例日志信息。

·     告警:展示集群中告警信息,可配置该集群的告警规则。

·     计费:根据在[系统/系统配置/运营配置/计费策略]页面配置的对应计费策略,按照使用集群的时长计算使用费用。

¡     计费开始时间:当计费策略时间早于集群创建时间时,将按照创建集群的时间开始计费;当集群配置时间早于配置计费策略时间时,将按照配置计费策略的时间开始计费。

¡     计费结束时间:展示当前时间,删除集群或计费策略时将停止计费。

¡     费用:(计费结束时间-计费开始时间)×单价

图3-12 容器集群详情

 

3.2.2  组件详情

在容器集群的组件详情页面可查看、修改组件的配置。

说明

查看或修改组件的配置信息,保存配置后,系统会自动重启集群,重启后才会生效。

图3-13 容器集群组件详情

 

3.2.3  集群扩容

集群在使用过程中,根据实际需要,可执行容器集群扩容的操作。

(1)     在Kafka云服务管理页面的左侧导航树中选择[集群管理],进入集群管理页面。

(2)     在集群管理页面,选择[集群列表]页签,单击集群名称可跳转至集群详情页面。

(3)     单击页面右上方<集群扩容>按钮,在弹出的页面中输入要增加的实例个数即可,最大可扩容至9个实例。

(4)     集群节点配置完成后,单击<确定>按钮即可。

图3-14 集群扩容

 

(5)     查看实例变化。

(6)     Kafka扩容完成之后,在集群详情页面[实例]页签中可以查看Kafka 实例数量的变化以及状态。

 

 


4 开发指南

4.1  常用API

开发所使用jar包,Maven引入

<dependency>

          <groupId>org.apache.kafka</groupId>

          <artifactId>kafka-clients</artifactId>

          <version>2.3.1</version>

      </dependency>

4.1.1  非Kerberos环境

场景描述:向指定Topic(in)发送数据,并打印发送内容。消费者指定Topic数据,并打印到控制台。为提高吞吐量,该案例结合多线程使用。

1. producerAPI案例代码(非Kerberos环境)

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class kafkaProducerDemo {

    public static void main(String[] args) throws Exception{

        System.out.println("Usage: java -cp YourJar YourPackageClassName");

        Properties props = new Properties();

        props.put("bootstrap.servers", "node1:6667,node2:6667,node3:6667");

        props.put("acks", "1");

        props.put("retries",1);

        props.put("batch.size", 16384);

        props.put("linger.ms", 10);

        props.put("buffer.memory", 33554432);

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

final KafkaProducer<String, String> producer = new KafkaProducer<>(props);

//线程数

ExecutorService executorService = Executors.newFixedThreadPool(1);

//kafka topic

final String sendTopic = "in";

 

executorService.execute(new Runnable() {

   @Override

   public void run() {

      while (true){

      //发送的数据内容

           String sendData = "timestamp:" + System.currentTimeMillis();

           producer.send(new ProducerRecord<String, String>(sendTopic, sendData));

           try{

               Thread.sleep(1000);

           }catch (Exception e){

               e.printStackTrace();

           }

           System.out.println( "Topic:" + sendTopic + " Data:" + sendData);

   }

}

});

}

}

2. consumerAPI案例代码(非Kerberos环境)

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;

import java.util.Properties;

 

public class kafkaConsumerDemo {

public static void main(String[] args) {

   System.out.println("java -cp ${jar} kafkaConsumerDemo ${bootstrap.servers} ${topic}");

 

   Properties props = new Properties();

   props.put("bootstrap.servers", args[0]);

   props.put("group.id", "kafkaConsumerDemo1");

   props.put("enable.auto.commit", "true");

   props.put("auto.offset.reset", "earliest");

   //props.put("auto.offset.reset", "latest");

   props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

       

      KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);

   consumer.subscribe(Arrays.asList(args[1]));

   while (true) {

            ConsumerRecords<String, String> records = consumer.poll(1000);

            for (ConsumerRecord<String, String> record : records) {

                System.out.println(record);

            }

            System.out.println("---");

        }

    }

}

 

4.1.2  Kerberos环境

如果集群开启kerberos安全认证,运行环境需要增加以下配置:

·     运行环境的hosts文件中需要配置Kafka集群的主机名和IP的映射。

·     运行环境的时间需要和Kafka集群的主机时间保持一致,或者配置ntp时间同步。

说明

认证文件的获取方式:

在CloudOS云平台进入[云服务/消息中间件Kafka]页面,在左侧导航树选择[系统管理/用户管理]下载认证文件,下载包中包含:krb5.conf、“用户名”_kafka_client_jaas.conf、“用户名”.keytab、hosts文件,其中:

“用户名”_kafka_client_jaas.conf在使用时将keyTab的值修改为认证文件“用户名”.keytab在客户端的实际路径。

 

场景描述:向指定Topic(in)发送数据,并打印发送内容。消费者指定Topic数据,并打印到控制台。为提高吞吐量,该案例结合多线程使用。

1. producerAPI案例代码(Kerberos环境)

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class kafkaProducerDemo {

    public static void main(String[] args) throws Exception{

        System.out.println("Usage: java -cp YourJar YourPackageClassName");

//kerberos环境下,需额外加载如下两项配置文件

System.setProperty("java.security.auth.login.config", "/home/”用户名”_kafka_client_jaas.conf");

System.setProperty("java.security.krb5.conf", "/home/krb5.conf");

        Properties props = new Properties();

        props.put("bootstrap.servers", "node1:6667,node2:6667,node3:6667");

        props.put("acks", "1");

        props.put("retries",1);

        props.put("batch.size", 16384);

        props.put("linger.ms", 10);

        props.put("buffer.memory", 33554432);

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//kerberos环境下,需额外添加如下三项配置

props.put("security.protocol", "SASL_PLAINTEXT");

props.put("sasl.mechanism", "GSSAPI");

props.put("sasl.kerberos.service.name", "kafka");

final KafkaProducer<String, String> producer = new KafkaProducer<>(props);

//线程数

ExecutorService executorService = Executors.newFixedThreadPool(1);

//kafka topic

final String sendTopic = "in";

 

executorService.execute(new Runnable() {

   @Override

   public void run() {

      while (true){

           //发送的数据内容

           String sendData = "timestamp:" + System.currentTimeMillis();

           producer.send(new ProducerRecord<String, String>(sendTopic, sendData));

           try{

               Thread.sleep(1000);

           }catch (Exception e){

               e.printStackTrace();

           }

           System.out.println( "Topic:" + sendTopic + " Data:" + sendData);

   }

}});}}

2. consumerAPI案例代码(Kerberos环境)

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;

import java.util.Properties;

public class kafkaConsumerDemo {

public static void main(String[] args) {

   System.out.println("java -cp ${jar} kafkaConsumerDemo ${bootstrap.servers} ${topic}");

//kerberos环境下,需额外加载如下两项配置文件

System.setProperty("java.security.auth.login.config", "/home/kafka_kafka_client_jaas.conf");

System.setProperty("java.security.krb5.conf", "/home/krb5.conf");

   Properties props = new Properties();

   props.put("bootstrap.servers", "node1:6667,node2:6667,node3:6667");

   props.put("group.id", "kafkaConsumerDemo1");

   props.put("enable.auto.commit", "true");

   props.put("auto.offset.reset", "earliest");

   //props.put("auto.offset.reset", "latest");

   props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

       

//kerberos环境下,需额外添加如下三项配置

props.put("security.protocol", "SASL_PLAINTEXT");

props.put("sasl.mechanism", "GSSAPI");

props.put("sasl.kerberos.service.name", "kafka");

 

      KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);

   consumer.subscribe(Arrays.asList("out"));

   while (true) {

            ConsumerRecords<String, String> records = consumer.poll(1000);

            for (ConsumerRecord<String, String> record : records) {

                System.out.println(record);

            }

            System.out.println("---");

        }

    }

}

 

 


5 常见问题解答

5.1  性能调优

5.1.1  吞吐量调优

1. Broker调优

·     适当增加参数num.reolica.fetchers的值,但不要超过CPU的核数。该参数用于控制broker端follower副本从leader副本处获取消息的最大线程数,对于设置了acks=all的producer,适当增加该值通常能够缩短同步的时间间隔。

·     调优GC避免经常性的Full GC。

2. Producer调优

·     适当增加batch.size,适当增大该值,可以令更多的消息封装进同一个请求,减少producer的负载,也降低broker端CPU的开销。

·     适当增加linger.ms,适当增大该值,使producer等待更长的时间才发送消息,能够缓存更多消息填充batch,从而提升整体的TPS。

·     设置compression.type=lz4,设置消息压缩算法,对消息进行压缩可以极大地减少网络传输量,目前Kafka支持的压缩算法中,LZ4的性能最好。

·     ack=0或者1,producer发送消息时,指定必须有多少个分区副本收到消息,才认为该消息是写入成功的。

3. Consumer调优

·     采用多consumer实例。

·     增加fetch.min.bytes,该参数控制每次拉取消息的最小数据量。

5.1.2  可用性调优

1. Broker调优

·     设置unclean.leader.election.enabletrue=true,该参数允许Kafka从非ISR列表中选举leader,在保证服务高可用的同时,也增加了因unclean leader选举引起的数据丢失风险。

·     设置min.insync.replicas=1,当ISR缩减到小于该值的数量时,producer会停止对特定分区发送消息,从而影响服务的可用性。

2. Producer调优

·     设置acks=1,若将ack=all或者-1,将配合broker端的参数min.insync.replicas一起使用。

3. Consumer调优

·     设置session.time.ms为较低的值。

·     设置max.poll.interval.ms为大于预估的消息平均处理时间的值。

5.2  Kafka容器集群限制

5.2.1  使用限制

1. 连接限制

容器集群仅支持同一KaaS集群中的客户端连接,即需要连接Kafka容器集群的应用,需要和Kafka集群部署在同一个KaaS集群中。连接地址请到集群详情页面的基本信息中获取。

2. 权限访问限制

容器集群不支持Kerberos安全管理。

3. 数据同步限制

当前仅裸金属及虚拟机集群支持数据同步功能,容器集群暂不支持。

5.2.2  运维限制

1. 无主机、磁盘管理

容器集群中,Kafka的Broker运行在容器中,容器的资源是创建时给定的,不支持主机和磁盘的管理。如需查看Kafka实例的资源监控,请到云平台的云容器引擎页面查看。

5.3  运维类

1. Kafka JVM参数如何设置?

Kafka JVM参数在Kafka Broker节点的bin/kafka-server-start.sh中设置,目前Kafka Broker JVM内存大小可以根据节点内存自动进行设置。

·     节点内存大于32G:Kafka Broker堆内存为8G。

·     节点内存为16G~32G:Kafka Broker堆内存为4G。

·     节点内存小于16G:Kafka Broker堆内存为1G。

说明

一般Kafka Broker 堆内存不需要调整。

 

2. Kafka Topic删除后,Zookeeper里的保存的Consumer信息仍保留,怎么解决?

Kafka Topic删除后,Consumer并不一定已经停止消费。因此Kafka中使用命令删除Topic后,实际上Topic中保存的数据依旧存在,不影响正常使用。

3. Kafka使用过程中,出现超时异常类错误,怎么解决?

报错示例一

通过java客户端访问Kafka,当生产者线程向Kafka插入数据时候出现错误“org.apache.kafka.common.errors.TimeoutException: Batch Expired”或者

“org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms”

解决方法

(1)     检查Kafka Broker节点网络连接是否正常。

(2)     网络连接正常情况下,建议检查bootstrap.servers等配置是否正确。

报错示例二

Kafka使用过程中出现超时异常提示“java.util.concurrent.ExecutionException:org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for xx due to xx ms has passed since batch creation plus linger time”

解决方法

可以增大request.timeout.ms参数值。

4. Kafka使用中,出现错误提示“java.util.concurrent.ExecutionException:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition”时,怎么解决?

可能原因

Producer向Kafka Broker写数据时,若正在进行leader选举,本来是向Broker0上写的,选举之后Broker1成为leader,导致无法写成功,会抛异常“java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition”。

解决办法

修改Producer的重试参数retries,默认是0,生产环境中一般建议设置为10。

5. Kafka broker挂掉时,怎么解决?

可能原因

·     磁盘空间不足。例如,查看Kafka报错日志,看到报错信息“No space left on device”或查看组件磁盘使用情况,发现使用率为95%。

·     数据目录权限不足。例如,查看Kafka报错日志,看到报错信息“Permission denied”或查看数据目录权限为“root”。

·     数据磁盘损坏。例如,查看Kafka报错日志,看到关键报错信息“read-only file system:’/opt/disk9/kafka-logs’”或查看对应数据磁盘发现磁盘为只读。

·     Broker id已经被使用。例如,查看Kafka报错日志,看到报错信息“A broker is already registered on the path /brokers/ids/1”。

解决方法

·     磁盘空间不足:例如,现场跑了很多bulkload任务,且数据量较大,建议先停掉所有bulkload任务,然后清理磁盘中不需要的数据或者根据自身业务情况修改Kafka中磁盘数据保留时长,再重启Kafka Broker后正常。

·     数据目录权限不足:授予Kafka用户读写数据目录权限。

·     数据磁盘损坏:利用fsck进行磁盘修复。

·     Broker.id已经被使用:将meta.properties中的broker.id改为其他值后,重启所有的Kafka Broker。

6. Kafka生产者和消费者测试不通时,怎么解决?

可能原因

·     生产者和消费者访问端口输入错误,访问端口与Kafka组件的配置端口不一致。

·     语法错误:例如端口号错误,操作命令格式参数错误。

解决办法

定位思路:

·     检查生产者和消费者访问端口。

·     检查并修改错误语法。

处理方法:

生产者和消费者的访问端口修改为6667。

7. Topic无法删除时,怎么解决?

可能原因

·     参数delete.topic.enable=false。

·     对应Topic还有程序在读写。

解决办法

定位思路:

·     检查参数delete.topic.enable的配置。

·     检查是否有程序在读写该Topic。

处理步骤:

(1)     修改参数delete.topic.enable值为“true”。

(2)     停掉读写该Topic的进程。

8. Kafka使用partition数与硬盘分区不匹配时,怎么解决?

可能原因

Topic分区数设置不合理。

解决办法

定位思路:

使用./kafka-topics.sh --zookeeper <ip>:2181 --describe --topic <topic-name>命令查看topic详情,看分区个数与计划使用硬盘数是否相同。

处理步骤:

调整topic分区个数与规划的硬盘数一致,命令为:./kafka-topic.sh --zookeeper <ip>:2181 --alter --partitions <number> --topic <topic-name>。

 

不同款型规格的资料略有差异, 详细信息请向具体销售和400咨询。H3C保留在没有任何通知或提示的情况下对资料内容进行修改的权利!

新华三官网
联系我们