用户可以根据需要自定义Flink函数,本小节介绍了函数的开发过程,并给出了参考样例。
开发函数前,需要准备如下环境:
下载并安装OpenJDK 1.8(关于OpenJDK的更多信息,请参考http://openjdk.java.net/用户可以根据需要选择不同厂商的OpenJDK发行版)。
下载并安装支持Java语言开发的IDE(集成开发环境),如IDEA(详见https://www.jetbrains.com/idea/)、Eclipse(详见https://www.eclipse.org/)等。
下载并安装Maven,详见https://maven.apache.org/介绍。
在IDEA中新建一个Maven工程,Pom文件中导入如下依赖。
当前数据运营平台支持Flink 1.9.2版本的Flink自定义函数,请不要更改Pom文件中的Flink版本。 |
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.9.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.9.2</version> </dependency> </dependencies> |
自定义函数主要有UDF(用户自定义标量值函数)、UDTF(自定义表值函数)、UDAF(自定义聚合函数)三种,关于三种函数的详细说明,请参考Flink官方文档(https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/udfs.html)。现分别给出三种函数的示例,仅供参考,示例如下:
如下代码示例,展示了计算字符串长度的函数。
import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction;
/** * 用户自定义标量值函数 UDF * * 必须继承 {@link ScalarFunction} */ public class StringLengthUdf extends ScalarFunction { /** * 可选 */ @Override public void open(FunctionContext context) { //初始化逻辑 }
/** * 必须有一个名为eval的方法 */ public int eval(String a) { return a == null ? 0 : a.length(); }
/** *可选 */ @Override public void close() { //关闭时的逻辑 } } |
如下代码示例,展示了按照#对字符串进行分割,进而得到多个值的函数。
import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.TableFunction;
/** * 用户自定义表值函数 */ public class SplitUdtf extends TableFunction<String> { /** * 可选 */ @Override public void open(FunctionContext context) { //初始化逻辑 }
/** * 必须实现一个名为eval的方法 */ public void eval(String str) { String[] split = str.split("#"); for (String s : split) { collect(s); } }
/** * 可选 */ @Override public void close() { //关闭时的逻辑 } } |
如下代码示例,展示了一个聚合函数。
import org.apache.flink.table.functions.AggregateFunction;
/** * 用户自定义聚合函数 */ public class CountUdaf extends AggregateFunction<Integer, CountUdaf.CountAccum> { /** * 定义存放count udaf的状态的accumulator数据结构 */ static class CountAccum { int total; }
/** * 初始化count udaf的accumulator */ @Override public CountAccum createAccumulator() { CountAccum acc = new CountAccum(); acc.total = 0; return acc; }
/** * {@link #getValue(CountAccum)} 提供了如何通过存放状态的accumulator计算count UDAF的结果的方法 */ @Override public Integer getValue(CountAccum accumulator) { return accumulator.total; }
/** *{@link #accumulate(CountAccum, int)}提供了如何根据输入的数据更新count UDAF存放状态的accumulator */ public void accumulate(CountAccum accumulator, int iValue) { accumulator.total += iValue; }
public void merge(CountAccum accumulator, Iterable<CountAccum> its) { for (CountAccum other : its) { accumulator.total += other.total; } } } |