实时作业组件配置参数介绍

数据源表组件

当前实时计算支持KafkaSTDBHudi类型数据源表,即使用Kafka类型数据源中的表、STDB类型数据源中的表、Hive类型数据源中Hudi类型的表,作为数据源输入。

将组件拖入画布中后,双击节点,页面右侧会弹出节点概览,可配置节点参数,这几种数据源输入参数说明如下表所示。

目前,采用数据源表组件Hudi作为输入源时,在画布中暂不支持与joinwindowJoinoverAggreatecep数据处理组件相连。

 

表-1 数据源表组件配置参数介绍

数据源表

说明

kafka

该组件可以从Kafka类型的数据源中读取数据,并作为实时作业的输入。该组件配置参数说明如下:

  • 节点类型:当前选定的流表类型kafka,不可修改

  • 表类型:输入组件的表类型均为数据源表,不可修改

  • 表名称:在下拉框中选择在表管理中已注册流表的Kafka输入类型的表名称。下拉框中的表名称按照数据源名.表名格式呈现,以区分不同数据源下的同名表

  • field:系统会自动选择表中的所有字段

  • groupidKafka消费者组ID,用于标识Kafka消费者组。选取流表后,该参数值会自动匹配

  • offset:设置从Kafka中开始消费数据的位置。latest表示消费最新的数据;earliest表示会从未消费数据的第一行数据开始消费。选取流表后,该参数值会自动匹配,也可手动修改

  • timeCharacteristic:设置流计算的时间属性。提供processTimeeventTimenone三种选项,缺省为none。其中,processTime表示若实时计算作业涉及到窗口操作,则以Flink处理数据的时间为时间基准;eventTime表示若实时计算作业涉及到窗口操作,则以数据中带有时间的字段值为时间基准

  • processTimeField:表示对数据进行处理的Flink系统时间,默认为proctime,不可修改。仅timeCharacteristic参数选择processTime选项后会出现本参数

  • eventTimeField:选择数据源表中数据类型为timestamp的字段,该字段会被作为时间基准。仅timeCharacteristic参数选择eventTime选项后会出现本参数

  • delay:配置对数据的延迟最大容忍时间, 取值范围为059,单位为秒。timeCharacteristic参数选择eventTime选项后会出现本参数

  • watermarkInterval:配置系统发送watermark的间隔时间,取值范围为059,单位为秒。仅timeCharacteristic参数选择eventTime选项后会出现本参数

STDB

该组件可以从STDB类型的数据源中读取数据,并作为实时作业的输入。该组件配置参数说明如下:

  • 节点类型:当前选定的流表类型STDB

  • 表类型:输入组件的表类型均为数据源表

  • 表名称:可下拉选择在流表管理中已注册的STDB类型的流表名称

  • Kafka主题名:所选表名称(流表)在注册时所使用的Kafka主题名,选择表名称后,该参数会自动匹配

  • field:系统会自动选择表中的所有字段

  • groupidKafka消费者组ID,用于标识Kafka消费者组。选取流表后,该参数值会自动匹配

  • offset:设置从Kafka中开始消费数据的位置。latest表示消费最新的数据;earliest表示会从未消费数据的第一行数据开始消费。选取流表后,该参数值会自动匹配

  • timeCharacteristic:设置流计算的时间属性。提供processTimeeventTimenone三种选项,缺省为none。其中,processTime表示若实时计算作业涉及到窗口操作,则以Flink处理数据的时间为时间基准;eventTime表示若实时计算作业涉及到窗口操作,则以数据中带有时间的字段值为时间基准

  • processTimeField:表示对数据进行处理的本地系统时间,默认为proctime,不可修改。仅timeCharacteristic参数选择processTime选项后会出现本参数

  • eventTimeField:选择数据源表中数据类型为timestamp的字段,该字段会被作为时间基准。仅timeCharacteristic参数选择eventTime选项后会出现本参数

  • delay:配置对数据的延迟最大容忍时间, 取值范围为059,单位为秒。timeCharacteristic参数选择eventTime选项后会出现本参数

  • watermarkInterval:配置系统发送watermark的间隔时间,取值范围为059,单位为秒。仅timeCharacteristic参数选择eventTime选项后会出现本参数

hudi

该组件可以从Hive类型的数据源中Hudi类型的数据表内读取数据,并作为实时作业的输入。该组件配置参数说明如下:

  • 节点类型:当前选定的流表类型hudi,不可修改

  • 表类型:输入组件的表类型均为数据源表,不可修改

  • 表名称:可下拉选择在表管理中已注册的Hudi类型的流表名称

  • field:系统会自动选择表中的所有字段

  • 增量数据读取时间:Hudi数据支持从某个提交时间(执行commit的时间)版本读取数据,指定形如yyyyMMddHHmmss格式的起始commit时间,若不选择增量读取时间,默认值为earliest,从最早开始读取

 

数据维表组件

当前实时计算支持PostgreSQL、达梦、VerticaMySQLHBase数据维表,分别代表PostgreSQL类型、达梦类型、Vertica类型、MySQL类型、HBase类型的数据源输入。这几种数据源输入参数说明如下表所示。

表-2 数据维表组件配置参数介绍

数据维表

说明

PostgreSQL、达梦、VerticaMySQLHBase

可以从PostgreSQL、达梦、VerticaMySQLHBase数据源读取数据,并进行数据处理

当前版本只支持HBase原生数据维表,且该维表的列族名必须为F

  • 节点类型:当前选定的流表类型PostgreSQL/达梦/Vertica/MySQL/HBase

  • 表类型:输入组件的表类型均为数据维表

  • 表名称:在流表管理中已注册的PostgreSQL/达梦/Vertica/MySQL/HBase输入类型的流表名称。下拉框中的表名称按照数据源名.表名格式呈现,以区分不同数据源下的同名表

  • field:系统会自动选择表中的所有字段

 

数据处理组件

数据处理可以对数据进行逻辑运算操作,具体数据处理组件说明如下表所示。

表-3 数据处理组件配置参数介绍

数据处理组件

说明

project

投影,需配置已选字段。关于选择/编辑字段的操作步骤如下:

1.     单击<选择/编辑字段>按钮,弹出选择字段窗口

2.     在窗口中,用户可以选择已有字段,也可以新增字段。选择字段时,可以通过在输入关键字搜索字段,选择后,还可对字段顺序进行排序

  • 选择左侧字段列表中的字段(父节点输出的字段),右侧已选字段列表中即会出现选择的字段,可设置字段的别名

  • 单击<增加字段>按钮,新增字段并指定别名(目前支持函数对已有的一个或者多个字段进行处理,而后生成一个新的字段)

3.     字段选择完成后,单击<确定>按钮,已选择的字段会展示在已选字段中

filter

过滤,包含输入字段和过滤表达式,需配置过滤表达式

  • 输入字段:父节点输出的字段即为该节点的输入字段,不可配置

  • 过滤表达式:配置对输入字段列表中字段进行过滤的表达式

split

拆分列,需配置节点概览和已选字段中的参数

  • 拆分列名:选择要拆分的字段

  • 分隔符:输入字段拆分的分隔字符,对于字段中存在该分隔字符的值,会以该分隔字符为分隔点,对值进行拆分

  • 拆分格式:选择拆分的格式,支持多行、多列两种格式

  • 增加列:如果按照多行拆分,输入增加列的名称;如果按照多列拆分,输入列名及其值对应拆分列名后的位置

  • 选择/编辑字段:单击<选择/编辑字段>按钮,在弹出的选择字段窗口中,从左侧表中选择字段(父节点输出的字段+增加列的列名,可通过关键字筛选),右侧即可显示已选字段,可以对已选字段顺序进行调整。字段选择完成后,单击<确定>按钮即可配置完成。

topN

取前N个字段值,需配置节点概览中的参数

  • 条数:配置取前几条数据

  • 排序字段:选择要排序的字段,并配置升序还是降序排序,支持对多个排序字段

  • 分区字段;选择要分区的字段,支持多选

unionAll

合并,需配置选择字段和已选字段

  • 选择字段:选择左表/右表中的字段(左右两个父节点的输出字段)

  • 已选字段:展示在选择字段中选择的字段。该区域中可为字段设置别名,修改字段的顺序及删除字段

join

连接,需配置选择字段、等值条件和已选字段

注意:若join组件的父节点为一个数据源表节点和一个数据维表节点,则join组件的左输入需要为数据源表节点,右输入需要为数据维表节点,请勿更改位置顺序。数据源表与数据源表进行Join操作非常消耗内存和计算资源,对于这种作业场景,建议根据服务器资源情况,尽可能调大作业的管理节点和工作节点的内存

  • 选择字段:选择左表/右表中的字段(左右两个父节点的输出字段)

  • 等值条件:选择左表与右表相等的字段,点击图标可以添加多个等值条件,点击图标可以删除多余的等值条件。若父节点为HBase数据维表,则等值条件只能配置一个。若两个父节点的均为数据源表,则等值条件的个数不限。(对于实时计算的VARCHAR类型所对应的若干关系型数据库的char类型,由于char类型会自动补全,在进行等值条件判断时会产生影响,所以在增加等值条件时,请务必注意是否严格符合join的等值条件)

  • 已选字段:展示在选择字段中选择的字段。该区域中可为字段设置别名(字段别名必须设置且不能重复。如果不填,则默认使用该字段的名称。但如果已选字段列表中有相同名称的字段,则需配置不同的别名),修改字段的顺序及删除字段

windowJoin

连接两个流表的元素,它们共享一个公共key并位于同一个窗口中

配置前需确保KafkaSTDB类型的数据源节点中已开启了timeCharacteristic参数(也即选择了除none以外的时间属性值)

  • 选择字段:选择左表/右表中的字段(左右两个父节点的输出字段)

  • 窗口条件:配置上限或者下限,两者必须选一个进行配置。填写左表时间属性字段与右表时间属性字段间的条件关系。时间最大支持7

  • 等值条件:选择左表与右表相等的字段,点击图标可以添加多个等值条件,点击图标可以删除多余的等值条件

  • 已选字段:展示在选择字段中选择的字段。该区域中可为字段设置别名(字段别名必须设置且不能重复。如果不填,则默认使用该字段的名称。但如果已选字段列表中有相同名称的字段,则需配置不同的别名),修改字段的顺序及删除字段

aggregate

对数据进行聚合

如聚合类型需选择窗口聚合和分组+窗口聚合,则需要确保KafkaSTDB类型的数据源节点中已开启了timeCharacteristic参数(也即选择了除none以外的时间属性值)

  • 输入字段:父节点输出的字段即为该节点的输入字段

  • 输出字段:填写需要输出的字段,多个字段间用英文逗号分隔,支持函数处理已有的字段

  • 聚合类型:选择聚合的类型,支持分组聚合、窗口聚合、分组+窗口聚合三种类型

  • 分组字段:选择分组聚合、分组+窗口聚合时,需输入分组的字段,可多选

  • 窗口类型:选择窗口聚合、分组+窗口聚合时,需选择窗口类型,支持滚动窗口、滑动窗口、SESSION窗口三种类型。

  • 窗口大小:选择滚动窗口、滑动窗口时,需填写窗口大小

  • 滑动距离:选择滑动窗口时,需填写滑动距离

  • 时间间隔:选择SESSION窗口时,需填写时间间隔

  • 过滤条件:配置对输入字段列表中字段进行过滤的表达式

overAggregate

窗口聚合

配置前需确保KafkaSTDB类型的数据源节点中已开启了timeCharacteristic参数(也即选择了除none以外的时间属性值)

  • 输入字段:父节点输出的字段即为该节点的输入字段

  • 输出字段:填写需要输出的字段,多个字段间用英文逗号分隔,支持函数处理已有的字段

  • 分组字段:选择进行分组聚合的依据字段,可多选

  • 窗口类型:支持全局窗口、计数窗口、时间窗口

  • 选择计数窗口,需填写对应的计数

  • 选择时间窗口,需添加对应的时间

cep

用于实时计算复杂事件处理

配置前需确保KafkaSTDB类型的数据源节点中已开启了timeCharacteristic参数(也即选择了除none以外的时间属性值)

  • 时间窗口:设置事件匹配的有效期(即第一个事件触发,到最后一个事件触发的整个生命周期)。在该生命周期内没有完成匹配的数据将会被丢弃

  • 匹配事件跳过策略:设置事件跳过策略

  • SKIP TO NEXT ROW:匹配成功后,从匹配成功的事件序列中的第一个事件的下一个事件开始进行下一次匹配。

  • SKIP PAST LAST ROW:匹配成功后,从匹配成功的事件序列中的最后一个事件的下一个事件开始进行下一次匹配。

  • SKIP_TO_LAST:匹配成功后,从匹配成功的事件序列中最后一个对应于指定pattern的事件开始进行下一次匹配。

  • SKIP_TO_FIRST:匹配成功后,从匹配成功的事件序列中第一个对应于指定pattern的事件开始进行下一次匹配。

  • 分组字段:选择事件的分组字段,支持多选。该字段对应为父节点的输出字段。

  • 排序字段:选择事件的排序字段及排序类型,其中默认按timeCharacteristic属性值升序排序,支持添加多个其父节点输出的其他字段的排序,最多为其父节点输出字段的个数。

  • 事件:每个cep算子最多可以添加三个事件。单击<编辑事件>按钮,弹出事件添加条件窗口。

  • 添加事件:单击右上角按钮,可以添加事件,最多可添加三个事件。每一个事件都是独立页签,分为事件1、事件2和事件3,其事件标识依次为EVENT1 EVENT2EVENT3,除EVENT1无邻接条件外,EVENT2EVENT3的邻接条件均为NEXT

  • 匹配次数:上下界必须填一项,上界值需大于下界值,下界最小值为1,最后一个事件不可选贪婪匹配,其他事件默认选贪婪匹配。

  • 条件:可根据需要配置多个字段匹配规则。除事件1外,事件2、事件3支持与之前的事件字段比较。条件配置完成后,单击<确定>按钮,即可添加条件。多个字段匹配规则会显示在下方的文本框中。选择文本框中的多个字段,单击<并列>按钮,可以进行规则的与运算;单击<或者>按钮,可以进行规则的或运算。选择文本框中的1个字段,单击<编辑>按钮,可以编辑该条件,否则新增一个文本框,用户可自行输入条件,点空白处或回车可新增该条件。一个事件只能有一个条件,若为多个,请通过单击<并列>按钮或<或者>按钮合为一个条件,最终的条件在提示处展示。除事件1外可不填写条件(不写默认为匹配任意事件),事件2、事件3的条件为必填项。

  • 已选字段:单击<选择/编辑字段>按钮,弹出选择字段窗口。在窗口中,用户可以选择左侧表中的已有字段,也可以在右侧表中新增字段。从左侧表中选择字段时,可以通过在输入关键字搜索字段,选择后,需要在右侧表中对已选字段配置别名,还可对字段顺序进行排序。

 

聚合组件(aggregate)的函数有五种,分别为:求和(SUM)、求平均值(AVG)、最大值(MAX)、最小值(MIN)、统计(COUNT)。

 

数据结果表组件

数据结果表用于存储FLINK_GRAPH作业的的运算结果。目前支持将结果存储到KafkaHivePostgreSQL、达梦、VerticaMySQLElasticsearch数据源的数据表中,以及Hive数据源中Hudi类型的数据表内。

各数据结果表组件的参数说明如下表所示。

PostgreSQL数据库9.5以上版本支持upsert流;upsert操作数据写入HBaseMySQLPostgreSQL、达梦、Verticasink表需要有主键。

如果实时作业需要写入Hive数据源中Hudi类型的数据表内,则需要先在[表管理]中选择Hive数据源下创建Hudi数据表(即创建数据表时,选择Hive类型数据源,并配置存储方式为Hudi),并确保选择的表在创建时,分区字段类型不能选择date,表结构中的字段类型不包括charvarchartinyintsmallinttimestamp类型,否则,会导致Hudi数据同步至Hive失败。

 

表-4 数据结果表组件配置参数介绍

参数

说明

节点类型

所选组件节点的类型

表类型

均为数据结果表,表示为输出

表名称

根据所选的节点类型,选择对应的在流表管理中已注册的流表。此名称按照数据源名.表名规则进行展开(以区分不同的数据源下的同名表)

字段映射

输入字段向数据结果表字段映射。输入字段为其父节点的输出字段(不可编辑),数据结果表的字段只能选同类型的且未被选择的字段

时间分区字段

配置Timestamp类型字段分区依据。当数据结果表为Elasticsearch,且有建周期表,才有此配置项。要求该Timestamp类型字段不为空。若为空,则会导致生成*.null形式的索引(其中*为通配符),该情况下可进入对应DataEngine大数据集群后台执行如下命令,删除该不受全文索引管理的索引:

curl --negotiate -u : -XDELETE http://DataEngine大数据集群IP:9200/索引名