• 产品与解决方案
  • 行业解决方案
  • 服务
  • 支持
  • 合作伙伴
  • 新华三人才研学中心
  • 关于我们

H3C 绿洲融合集成平台 消息集成 配置指导-5W102

01-正文

本章节下载  (1.44 MB)

docurl=/cn/Service/Document_Software/Document_Center/Big_Data/Catalog/H3C_LZRH/H3C_LZRH/Configure/Operation_Manual/H3C-2889/202211/1719029_30005_0.htm

01-正文


1 功能简介

说明

本文以融合集成平台E5205版本为例(对接的资产市场也为E5205版本),介绍消息集成的整体操作流程及主要功能。其他版本类似,但页面和操作可能存在差异,请以实际情况为准。

 

1.1  功能介绍

消息集成定义为融合集成平台中各集成服务之间提供可靠的、可持久化的、高吞吐量的准实时消息管道系统。消息集成使用统一的消息接入机制,标准化的消息通道,具有如下优势:

·     支持原生的Kafka特性:具备原生Kafka所有消息处理特性。

·     支持安全的消息传输:通过SASL认证、消息存储加密等措施加强网络访问控制。

·     支持消息数据高可靠:支持消息持久化,多副本存储机制,支持节点级扩容与Topic重分配。

1.2  使用流程

用户可直接在消息集成模块中创建Topic供组织内的用户使用,也可将创建好的Topic上架到资产市场供组织外的用户使用。

1. Topic创建及使用

用户可以在消息集成模块中创建Topic然后供组织内的用户进行使用,整体配置流程如图1-1

图1-1 Topic创建及使用流程

 

表1-1 Topic创建及使用流程说明

操作

说明

开始

/

创建工作空间

工作空间是为了让用户更好的将项目相关资源进行统一管理。用户可根据实际需要在工程配置模块下的[工作空间]页面新增工作空间

创建Topic

新增工作空间后,可在[Topic管理/Topic列表]页面下,新建Topic

Topic权限配置

根据实际的需要,对Topic进行权限配置,为工作空间赋予生产、消费的权限

Topic使用

Topic创建后,可以使用命令行、开源Kafka客户端等方式连接Topic,并向Topic生产和消费消息

结束

/

 

2. Topic上线到资产市场(可选)

Topic创建后只有组织内的用户可见,如果组织外的用户需要使用该Topic,该Topic必须先通过上线功能,发布到资产市场,然后其他部门用户可以在资产市场进行订阅,订阅审批通过后即可使用。整体操作流程如图1-2所示。

图1-2 Topic上线及订阅

 

表1-2 Topic上线及订阅流程说明

操作

说明

开始

/

Topic上线

将创建好的Topic上线到资产市场。Topic上线需经过组织管理员审批,组织管理员审批通过后,Topic即可上线到资产市场中

Topic上架

Topic上线到资产市场后,由管理员进行上架,对外共享

Topic订阅

资产上架到资产市场后,其他组织用户可在资产市场进行查看,然后根据业务需要,对Topic进行订阅

Topic使用

Topic订阅需要经过管理员审批,审批通过后,用户可查看到Topic的详细信息,然后进行使用

结束

/

 

1.3  功能使用前提条件

·     用户已在绿洲平台上完成了消息集成模块的安装,具体安装要求及流程请参见《H3C 绿洲平台 安装部署手册》。

·     由于消息集成模块的正常使用依赖DataEngine大数据集群,请确认DataEngine集群的组件配置已经根据要求完成修改,并已在系统中完成了DataEngine集群的初始化。具体操作流程请参见《H3C 绿洲平台 安装部署手册》。

 


2 Topic创建及使用流程

2.1  Topic创建及分配权限

1. 创建工作空间

每个Topic都要归属到某个工作空间下,在创建Topic前需要有可用的工作空间,否则需要提前联系管理员创建工作空间。工作空间是为了让用户更好的将项目相关资源进行统一管理。

(1)     在工程配置模块下,选择左侧导航树中的[工作空间],进入工作空间页面。

(2)     工作空间页面单击<新增>按钮,弹出新增工作空间页面,用户可根据实际进行新增。

图2-1 新增工作空间

 

(3)     根据实际需要配置对应参数项的值,参数说明如下:

¡     工作空间ID:必填,根据实际需要配置工作空间ID。工作空间ID用于工作空间下Topic使用(生产、消费)时的账户。

¡     工作空间名:必填,根据实际使用需要配置工作空间名称。

¡     认证类型:必选,支持私钥认证。根据认证类型生成工作空间下Topic使用(生产、消费)时的密码。

¡     描述:非必填,工作空间相关的描述信息。

(4)     配置完成后,单击<确定>按钮完成工作空间的新建。

2. 创建Topic

创建用于存储消息的Topic,供消息生产方发布消息和消息消费方订阅消息。

(1)     在消息集成模块下选择[Topic管理/Topic列表],进入Topic列表页面。

(2)     在页面顶部导航栏选择工作空间,页面显示对应工作空间下的Topic列表。

(3)     单击<新建>按钮,弹出新建Topic窗口,在选择的工作空间下创建Topic。

图2-2 新建Topic

 

(4)     根据实际的需要,对Topic进行权限配置,为工作空间赋予生产、消费的权限。

¡     权限:必选,下拉列表有三个选项,分别是生产+消费、生产、消费,默认为生产+消费。

¡     分区数:必填,默认为1,取值范围为1-20。标识该Topic下的分区数量。

¡     副本数:必填,默认为1,取值范围受后端实际集群节点数量限制。表示每个分区有多少个数据的副本。

¡     老化时间(小时):必填,默认为72,取值范围1~168。定义了消息的存储时间,超过老化时间将进行删除处理。

¡     同步复制:必填,默认为关闭。开启后,消息生产后,均同步传输到该分区的所有副本后才进行响应。

¡     同步落盘,必填,默认为关闭。开启后,消息生产后,均持久到至硬盘后再进行响应。

¡     标签:为Topic配置标签,用户可在输入框中选择系统内已存在的标签,或者直接单击<新增>按钮,新增一个标签,支持一次为Topic配置多个标签(最多5个)。

¡     上传附件:单击<点击上传>按钮,可以上传Topic相关的使用资料。

(5)     配置完成后,单击<新建>按钮,即可完成Topic的新建。

3. Topic权限配置

注意

当消息集成所连接的Kafka集群使用PLAINTEXT协议,Kafka集群不支持权限设置(页面无<权限>按钮)。

 

客户端向Topic发布和订阅消息时,需要使用授权工作空间的ID和密钥信息进行安全认证。默认只有Topic所属的工作空间具备向Topic发布和订阅消息的权限,用户可通过权限设置功能为其他工作空间分配该Topic的操作权限。

(1)     Topic列表页面,单击Topic列表中的<权限>按钮,弹出Topic权限配置窗口。

¡     所有工作空间:显示当前用户所属组织下的所有工作空间。复选框选择某工作空间后,会显示在已授权工作空间列表中。

¡     已授权工作空间:显示要被授予权限的工作空间。用户可根据实际需要设置工作空间的Topic权限,默认权限设置为生产+消费。

¡     配置信息:单击<配置信息>按钮,可查看生产/消费该Topic所需的集群和工作空间配置信息,用户可根据实际需要获取这些信息进行使用。

图2-3 Topic权限配置

 

(2)     单击<确定>按钮,完成工作空间权限的设置。

2.2  Topic使用

Topic创建后,可以使用命令行、开源Kafka客户端等方式连接Topic,并向Topic生产和消费消息。

2.2.1  数据对接规范

(1)     Topic创建完成后,为保证业务的正常进行,请遵循如下使用要求。

¡     要求数据使用JSON格式写入对应的Topic。

¡     如果数据中某个字段是文件等二进制类型,需要将二进制字段的内容转换为字符串,且生产方和消费方需要统一二进制转换算法。推荐JDK自带的Base64.getEncoder().encodeToString()减少依赖。

¡     数据在消息集成Kafka中默认保存3天,最多保存7天,请及时消费。

¡     数据有大小限制,Kafka默认接收数据最大10M,如果数据大于10M,可通过在[Topic管理/Topic配置]页面修改键值进行调整,如图2-4

图2-4 根据业务需求修改Topic配置

 

(2)     消息生产者和消费者使用的Kafka客户端推荐使用集群信息对应的版本,否则可能会出现使用问题。Kafka集群版本可通过集群信息页面获得,如图2-5。Kafka客户端可通过Kafka官网进行获取。

图2-5 集群信息

 

(3)     用户在Topic列表页面,单击<权限>按钮,弹出Topic权限配置窗口,然后单击<配置信息>按钮,可查看使用对应工作空间下Topic的对接参数,如图2-6。sasl.jaas.config中的username、password对应工作空间的ID和秘钥。

图2-6 查看配置信息

 

2.2.2  内外网映射

跨网络生产消费需要配置IP和端口的内外网映射,需要一个外网IP,多个端口分别映射Kafka各个broker节点的IP和端口。

另外还需要修改DataEngine集群的相关配置,具体操作请联系DataEngine技术工程师进行修改。

2.2.3  Topic使用

可以使用命令行、Java代码等方式连接Topic,并向Topic生产和消费消息。

1. 通过命令行使用Topic

以下操作命令以Linux系统为例进行说明。

(1)     用户可通过Kafka官网获取Kafka客户端,然后解压到实际使用目录下。

(2)     修改Kafka命令行工具配置文件。

在Kafka命令行工具的/config目录中找到consumer.properties和producer.properties文件,并分别在文件中增加如下内容。

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \

username="**********" \

password="**********";

sasl.mechanism=PLAIN

security.protocol=SASL_PLAINTEXT

说明:username和password的值分别为Topic所属工作空间的ID和秘钥。

(3)     进入Kafka命令行工具的/bin目录下。

(4)     向Topic生产消息。

a.     执行以下命令,与Topic建立生产消息的连接。

./kafka-console-producer.sh --bootstrap-server <Address> --topic <TopicName> --producer.config ../config/producer.properties

其中:

-     <Address>为融合集成平台中消息集成连接地址,可以在集群信息页面进行查看,如图2-7。如果是公网访问,则使用公网连接地址。

-     <TopicName>为要生产消息的Topic名称。

-     ../config/producer.properties为配置文档所在的相对路径。

图2-7 消息集成链接地址

 

b.     输入消息内容,向Topic发送消息。

>Message1

>Message2

>Message3

其中,Message1、Message2、Message3为向Topic发送的实际消息内容,一行为一条消息。

c.     若要断开与Topic的连接,按“Ctrl+C”断开连接。

(5)     从Topic消费消息。

说明

一个消费者从一个Topic的多个分区消费消息时,一次只能消费一个分区的消息,多个分区会分多次进行消费。

 

a.     执行以下命令,与Topic建立消费消息的连接并读取消息。

./kafka-console-consumer.sh --bootstrap-server <Address> --topic <TopicName> --from-beginning --consumer.config ../config/consumer.properties

其中:

-     <Address>为融合集成平台中消息集成连接地址,可以在集群信息页面进行查看,如图2-7。如果是公网访问,则使用公网连接地址。

-     <TopicName>为要消费消息的Topic名称。

-     ../config/consumer.properties为配置文档所在的相对路径。

b.     执行命令后,会持续连接Topic并读取消息。若要断开与Topic的连接,按“Ctrl+C”断开连接。

2. 通过java代码使用Topic

注意

使用的kafka jar包版本需要与集群信息页面中的版本对应。

 

用户也可通过Java代码使用Topic,生产消息和消费消息代码示例如下。

·     Maven pom文件引用:

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.11</artifactId>

<version>2.3.0</version>

</dependency>

 

·     生产/消费消息代码示例:

package xxx.xxx.xxx;

 

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

public class KafkaDemo {

    /**
     * 入口
     * @param args 参数
     */
    public static void main(String[] args) {
        KafkaDemo kafkaDemo = new KafkaDemo();
        kafkaDemo.producer();
        kafkaDemo.consumer();
    }

    /**

    *生产消息代码

    */

    public void producer() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka broker对应的IP和端口,多个以逗号隔开");
        properties.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000");
        properties.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        properties.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
        properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required "
                + "username=\"请填写工作空间id\" password=\"请填写工作空间id对应的秘钥\";");
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
        properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "104857600");
        properties.put(ProducerConfig.RETRIES_CONFIG, "3");
        properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        try {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("tian", "jsonData");
            kafkaProducer.send(producerRecord, (recordMetadata, e) -> {
                if (e != null) {
                    //todo 记录错误日志
                }
            });
        } finally {
            kafkaProducer.close(Duration.ofMinutes(1L));
        }
    }

 

    /**

    *消费消息代码

    */

    public void consumer() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka broker对应的IP和端口,多个以逗号隔开");
        properties.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000");
        properties.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        properties.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
        properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required "
                + "username=\"请填写工作空间id\" password=\"请填写工作空间id对应的秘钥\";");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
        properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupid_xxxx");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        TopicPartition topicPartition = new TopicPartition("请填写真实的topic名称", 0);
        kafkaConsumer.assign(Collections.singleton(topicPartition)); //topicpartition 消费topic的哪些分区
        kafkaConsumer.seek(topicPartition, 0); //0 代表偏移量
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5L));
        List<ConsumerRecord<String, String>> recordList =  records.records(topicPartition); //读取哪个分区的数据
        for (ConsumerRecord<String, String> record : recordList) {
            System.out.println(record.value());
        }

    }
}

 


3 Topic上线到资产市场

说明

用户需要在系统中完成资产市场模块的部署,才能将Topic上线到资产市场。

 

Topic创建后只有组织内的用户可见,如果组织外的用户需要使用该Topic,该Topic必须先通过上线功能,发布到资产市场,然后其他部门用户可以在资产市场进行订阅,订阅审批通过后即可使用。

1. Topic上线

(1)     在消息集成模块下选择[Topic管理/Topic列表],进入Topic列表页面。

(2)     在页面顶部导航栏中选择工作空间,Topic列表页面显示对应工作空间下的Topic列表信息。

(3)     Topic列表中,单击<上线>按钮,弹出上线窗口,用户需配置如下属性:

¡     发布权限:配置该Topic上线到资产市场的权限,可选消费、生产。

¡     资源来源:选择资产所属的来源名称。资产来源会关联资产管理员,资产上线后会展示在资产管理员的资产列表中。

图3-1 Topic上线

 

(4)     配置权限及资产来源后,单击<确定>按钮,系统会自动发出上线审批流程,Topic状态变为“上线审批中”。

(5)     用户可在个人中心模块下[我的申请]页面中查看自己提交的申请的审批进度。

图3-2 我的申请

 

(6)     组织管理员可在个人中心模块下[待办审批]页面中审批组织内用户提交的申请。

图3-3 待办审批

 

2. Topic上架

资产上线后会展示在资产管理员的资产列表中。资产管理员可在资产列表页面对资产进行上架,上架的资产会展示在资产中心下让用户使用。

(1)     资产管理员登录系统后,在个人中心模块下,选择左侧导航树中的[我的资产/资产列表],进入资产列表页面。

(2)     资产列表页面中展示了当前资产管理员的所有资产,展示的信息包括资产名称、标识和状态、资产来源等。页面中包括已上架、待上架、已下架和不可用页签,分类展示了不同状态的资产。

图3-4 资产列表

 

(3)     选择待上架页签,可查看到上线的资产,单击对应资产的<上架>按钮,可对资产进行上架。上架时可配置该资产标签、资产来源、是否开启订阅审核等。

图3-5 上架资产

 

(4)     资产上架后,会展示在资产市场中,用户可在资产市场中查看已上架的资产。

图3-6 公共资产页面

 

3. Topic订阅

资产管理员将Topic上架后,用户可在资产市场中查看到该Topic,并进行订阅使用。

(1)     用户在资产市场页面下单击<订阅>按钮,弹出订阅对话框,选择订阅的Topic权限、订阅该资产的应用,填写申请理由,然后单击<提交>进行订阅申请。

图3-7 订阅申请

 

(2)     用户提交订阅申请后,系统会创建一个订阅申请流程,该流程发送给资产管理员进行审批。资产管理员可在[个人中心/待办审批]中查看收到的订阅流程。

图3-8 待办审批

 

(3)     资产管理员审批通过后,订阅者在[个人中心/资产订阅]页面可以查看审批通过的资产。

图3-9 订阅的资产

 

(4)     单击<详情>按钮,可查看订阅的Topic详情,如图3-10。可以查看到该Topic的具体信息,包括Topic名称、Topic权限以及生产及消费所需的配置信息,用户可获取相关信息,然后使用该Topic。

图3-10 Topic详情

 


4 Topic管理

消息集成模块还提供了消息查询、Topic配置、重分配、消费进度等功能,帮助用户更好的使用Topic。

4.1  消息查询

说明

当Kafka生产者开启事务时,消息列表中展示的数据会包含事务标记,查询的数据总条数因包含事务标记,所以会比实际生产的数据条数多。

 

消息集成模块提供了毫秒级、可视化的Kafka集群中的消息查询能力,支持按照分区和生产时间进行过滤。消息查询的列表中展示的当前Topic的指定分区中相应offset偏移量处的消息条目,单击<查看消息正文>按钮可以查看消息的具体内容。当用户查看消息内容的时候可以使用该功能。

(1)     在消息集成模块下选择[消息查询],进入消息查询页面。

(2)     在页面顶部导航栏选择工作空间,页面显示对应工作空间下的消息。

(3)     消息查询页面,在顶部搜索栏中,选择Topic名称(必选)、分区、消息生产时间,然后单击<搜索>按钮,可查看该Topic下对应分区在对应时间生产的消息列表。

(4)     消息列表中,选择需要查询的偏移量,单击<查看消息正文>可查看消息内容。

图4-1 消息查询

 

4.2  Topic配置

注意

·     添加Topic配置项可能导致生产或消费消息失败,如添加compression.type参数后,生产消息必须设置消息的key值,否则会生产消息失败。请谨慎执行本操作,仅建议对Topic各种配置项参数完全理解后进行此操作。

·     删除Topic配置项可能导致生产或消费消息失败,如删除max.message.bytes=1048576000参数后,如果生产者生产的消息体大于Kafka集群默认消息大小限制,则生产消息失败。请谨慎执行本操作,仅建议对Topic各种配置项参数完全理解后进行此操作。

 

Topic配置页面主要提供对Topic属性配置进行新增、删除、查看配置操作。当用户对已创建的Topic有属性配置的诉求时,可以使用该模块进行配置。

(1)     在消息集成模块下选择[Topic管理/Topic配置],进入Topic配置页面。

(2)     在页面顶部导航栏选择工作空间,页面显示对应工作空间下的Topic配置。

(3)     Topic配置页面支持查看配置、添加配置及删除配置操作。

¡     Topic配置页面,选择“类型”为“描述配置”。“Topic名称”中选择当前工作空间下的Topic名称,然后单击<查询>按钮,该Topic配置描述即显示在配置信息文本框中。

¡     Topic配置页面,选择“类型”为“添加配置”,然后配置Topic相关配置信息,单击<提交>,配置在集群中立即生效。

-     Topic名称:选择需要添加配置的Topic名称。

-     关键字:必选,选择需要新增的关键字配置。支持多种关键字配置,具体参数详见表4-1

-     键值:必选,选择关键字对应的值。不同的关键字有不同的取值,具体参数详见表4-1

¡     Topic配置页面,选择“类型”为“删除配置”,可删除Topic的配置信息。单击<提交>按钮,即可删除该Topic的配置,集群中可以立即生效,同时在配置信息文本框显示当前Topic配置信息。

-     Topic名称:选择需要删除配置的Topic名称。

-     关键字:必选,选择需要删除的关键字配置。

图4-2 Topic配置

 

表4-1 Topic关键字键值配置参数说明

关键字

说明

取值范围

cleanup.policy

清理策略

delete/compact,默认值为delete

compression.type

压缩类型

uncompressed、zstd、lz4、snappy、gzip、producer,默认值为producer

delete.retention.ms

被标记为删除的Topic多久后彻底删除

大于等于0的整数,默认值为86400000(一天)

file.delete.delay.ms

删除文件前等待的时间

大于等于0的整数,默认值为60000(一分钟)

flush.messages

几条消息刷新一次磁盘

大于等于0的整数,默认值为9223372036854775807

flush.ms

多久刷新一次磁盘

大于等于0的整数,默认值为9223372036854775807

follower.replication.throttled.replicas

限制从节点复制的副本列表

[partitionId]:[brokerId],默认值为空字符串

index.interval.bytes

索引间隔字节(不推荐修改)

大于等于0的整数,默认值为4096(4kb)

leader.replication.throttled.replicas

限制首领节点复制的副本列表

[partitionId]:[brokerId],默认值为空字符串

max.compaction.lag.ms

不符合日志压缩条件的消息的最大保留时间

大于等于1的整数,默认值为9223372036854775807

max.message.bytes

消息的最大字节数

大于等于0的整数,默认值为1000012

message.format.version

消息格式化版本

0.8.0、0.8.1、0.8.2、0.9.0、0.10.0-IV0、0.10.0-IV1、0.10.1-IV0、0.10.1-IV1、0.10.1-IV2、0.10.2-IV0、0.11.0-IV0、0.11.0-IV1、0.11.0-IV2、1.0-IV0、1.1-IV0、2.0-IV0、2.0-IV1、2.1-IV0、2.1-IV1、2.1-IV2、2.2-IV0、2.2-IV1、2.3-IV0、2.3-IV1、2.4-IV0、2.4-IV1、2.5-IV0,默认值为2.3-IV1

message.timestamp.difference.max.ms

客户端时间与服务端时间允许的最大差值

大于等于0的整数,默认值为9223372036854775807

message.timestamp.type

消息时间戳类型

CreateTime/LogAppendTime,默认值为CreateTime

min.cleanable.dirty.ratio

清理脏数据的频率

大于等于0,小于等于1,默认值为0.5

min.compaction.lag.ms

消息在日志中保持不压缩的最短时间。仅适用于正在压缩的日志

大于等于0的整数,默认值为0

min.insync.replicas

当生产者acks=-1时,此参数生效,即至少要写入几个副本broker才给生产者回应

大于等于1的整数,默认值为1

preallocate

是否在创建新的日志段时在磁盘上预分配文件

是/否,默认值为是

retention.bytes

此配置控制分区可以增长到的最大大小,然后丢弃旧的日志段以释放空间(-1不限制)

大于等于-1的整数,默认值为-1

retention.ms

老化时间(-1不限制)

大于等于-1的整数,默认值为604800000

segment.bytes

日志段文件大小

大于等于14的整数,默认值为1073741824

segment.index.bytes

日志段文件索引的大小

大于等于0的整数,默认值为10485760

segment.jitter.ms

从计划的段滚动时间减去最大随机抖动,以避免异常的段滚动

大于等于0的整数,默认值为0

segment.ms

达到此时间后即使日志段文件未满,也会创建新的log文件

大于等于1的整数,默认值为604800000

unclean.leader.election.enable

是否允许不在ISR集合中的副本被选作首领

是/否,默认为是

message.downconversion.enable

是否启用消息格式向下转换以满足低版本的消费者请求

是/否,默认为是

 

4.3  重分配

注意

重分配期间此Topic将无法进行生产和消费的操作。请谨慎执行本操作,仅建议此Topic无任何业务使用的情况下进行此操作。进行此操作的人员要求对Topic重分配特性非常了解且对Kafka集群配置及各节点使用情况非常了解。

 

Topic重分配功能用于Topic修改分区副本所在broker节点位置。

在Kafka集群资源使用已超出系统配置的资源时,需要通过扩容Kafka节点来实现Kafka集群的资源扩容。新增的Kafka节点,只有在创建新的Topic才会参与工作,对于之前其他Broker节点上的分区副本是不会自动均衡的,不能达到负载的效果。这时需要在Broker之间重新分配分区副本,该模块即提供这种场景下的分区重分配功能。

(1)     在消息集成模块下选择[Topic管理/Topic重分配],进入Topic重分配页面。

(2)     在Topic名称中下拉选择需要进行分区副本重分配的Topic。

(3)     单击<生成>按钮,可以看到待实施配置与当前配置。在“待实施配置”的编辑框中按照重分配的规则进行输入,例如可以修改“replicas”后对应的brokerid。当多次单击<生成>按钮时,待实施配置中的部分参数会随机生成待实施配置值,请注意核对是否为所需配置后再执行。

(4)     待实施配置修改完成后,单击<执行>按钮,即运行该待实施配置。

(5)     单击<核实>,确认结果是否执行完。Topic重分配页面,在Topic下拉框中选择Topic名称,单击<生成>按钮可以查看到Topic已经使用最新配置。

图4-3 Topic重分配

 

4.4  消息转发

说明

使用消息集成消息转发功能时,需要配置Flink客户端,具体配置方法可参见绿洲平台安装部署手册。

 

消息转发功能主要用于将一个Topic中的数据经过一定规则过滤后写入其他Topic。用户可在消息转发页面下配置新的消息转发规则,并对这些消息转发规则进行管理。

(1)     在消息集成模块下选择[消息转发],进入消息转发页面。

(2)     消息转发列表页面,单击<新建>按钮,进入新增消息转发规则页面,用户可根据实际需要进行转发规则设计。

(3)     如果需要修改已创建好的消息转发,可单击消息转发列表中<编辑>按钮进行修改。

(4)     消息转发规则设计页面部分参数说明如下:

¡     源Topic配置:

-     转发名称:必填,转发规则的名称,3-249位,以字母、数字开头,由字母、数字、"_"、"-"组成。

-     源Topic:选择消息转发的源Topic,从该Topic读取数据进行过滤。

-     消息体最大字节数:配置源Topic中消息体最大字节数。

¡     过滤规则:

-     关键字:过滤条件关键字。

-     条件:过滤规则条件,可选in、not in、=、like。

-     值:过滤值,可以是多个,以,隔开。

-     目的Topic:目的topic,将过滤后的数据写入到该Topic。

(5)     配置完成后,单击<确认>按钮,即可完成新增消息转发规则,新增消息规则后,默认为停止状态,如果想要该规则生效,需要在消息转发列表中单击<启动>按钮,启动该消息转发规则。

图4-4 新建消息转发规则

 

4.5  消费进度

消费进度页面展示当前系统中用户所在组织下Topic被消费者组消费的进度(包括消费组信息、消费偏移量、剩余堆积量等)。

(1)     在消息集成模块下选择[消费进度],进入消费进度页面。

(2)     消费进度页面展示了当前系统中的消费者组列表及活跃Topic的情况。

(3)     消费者组列表展示了系统中所有的消费者组。展示内容包括组ID,在用户所在组织内消费的Topic数量、使用哪个节点进行消费等信息。消费者组中单击组ID,可查看该消费者组消费的Topic名称,继续单击Topic名称,可以查看Topic的分区、总的数据量、偏移量、堆积量、所属用户。

图4-5 消费进度

 

不同款型规格的资料略有差异, 详细信息请向具体销售和400咨询。H3C保留在没有任何通知或提示的情况下对资料内容进行修改的权利!

新华三官网
联系我们