Flink自定义函数开发

用户可以根据需要自定义Flink函数,本小节介绍了函数的开发过程,并给出了参考样例。

准备开发环境

开发函数前,需要准备如下环境:

新建Maven工程

IDEA中新建一个Maven工程,Pom文件中导入如下依赖。

当前数据运营平台支持Flink 1.9.2版本的Flink自定义函数,请不要更改Pom文件中的Flink版本。

 

<dependencies>

  <dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-table-api-java-bridge_2.11</artifactId>

    <version>1.9.2</version>

  </dependency>

  <dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-table-common</artifactId>

    <version>1.9.2</version>

  </dependency>

</dependencies>

 

自定义函数实例

自定义函数主要有UDF(用户自定义标量值函数)、UDTF(自定义表值函数)、UDAF(自定义聚合函数)三种,关于三种函数的详细说明,请参考Flink官方文档(https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/udfs.html)。现分别给出三种函数的示例,仅供参考,示例如下:

1.      UDF

如下代码示例,展示了计算字符串长度的函数。

import org.apache.flink.table.functions.FunctionContext;

import org.apache.flink.table.functions.ScalarFunction;

 

/**

 * 用户自定义标量值函数 UDF

 *

 * 必须继承 {@link ScalarFunction}

 */

public class StringLengthUdf extends ScalarFunction {

    /**

     * 可选

     */

    @Override

    public void open(FunctionContext context) {

        //初始化逻辑

    }

 

    /**

     * 必须有一个名为eval的方法

     */

    public int eval(String a) {

        return a == null ? 0 : a.length();

    }

 

    /**

     *可选

     */

    @Override

    public void close() {

        //关闭时的逻辑

    }

}

 

2.      UDTF

如下代码示例,展示了按照#对字符串进行分割,进而得到多个值的函数。

import org.apache.flink.table.functions.FunctionContext;

import org.apache.flink.table.functions.TableFunction;

 

/**

 * 用户自定义表值函数

 */

public class SplitUdtf extends TableFunction<String> {

    /**

     * 可选

     */

    @Override

    public void open(FunctionContext context) {

        //初始化逻辑

    }

 

    /**

     * 必须实现一个名为eval的方法

     */

    public void eval(String str) {

        String[] split = str.split("#");

        for (String s : split) {

            collect(s);

        }

    }

 

    /**

     * 可选

     */

    @Override

    public void close() {

        //关闭时的逻辑

    }

}

 

3.      UDAF

如下代码示例,展示了一个聚合函数。

import org.apache.flink.table.functions.AggregateFunction;

 

/**

 * 用户自定义聚合函数

 */

public class CountUdaf extends AggregateFunction<Integer, CountUdaf.CountAccum> {

    /**

     * 定义存放count udaf的状态的accumulator数据结构

     */

    static class CountAccum {

        int total;

    }

 

    /**

     * 初始化count udafaccumulator

     */

    @Override

    public CountAccum createAccumulator() {

        CountAccum acc = new CountAccum();

        acc.total = 0;

        return acc;

    }

 

    /**

     * {@link #getValue(CountAccum)} 提供了如何通过存放状态的accumulator计算count UDAF的结果的方法

     */

    @Override

    public Integer getValue(CountAccum accumulator) {

        return accumulator.total;

    }

 

    /**

     *{@link #accumulate(CountAccum, int)}提供了如何根据输入的数据更新count UDAF存放状态的accumulator

     */

    public void accumulate(CountAccum accumulator, int iValue) {

        accumulator.total += iValue;

    }

 

    public void merge(CountAccum accumulator, Iterable<CountAccum> its) {

        for (CountAccum other : its) {

            accumulator.total += other.total;

        }

    }

}