Spark自定义函数开发

用户可以根据需要自定义Spark函数,本小节介绍了函数的开发过程,并给出了参考样例。

准备开发环境

开发函数前,需要准备如下环境:

新建Maven工程

IDE中新建一个Maven工程,Pom文件中导入如下依赖。

当前数据运营平台支持HadoopHive版本均如下所示,请不要随意更改该版本。

 

<dependencies>

  <dependency>

    <groupId>org.apache.hadoop</groupId>

      <artifactId>hadoop-common</artifactId>

      <version>3.0.0-cdh6.2.0</version>

  </dependency>

  <dependency>

    <groupId>org.apache.hive</groupId>

    <artifactId>hive-exec</artifactId>

    <version>2.1.1-cdh6.2.0</version>

  </dependency>

</dependencies>

 

自定义函数实例

自定义函数主要有UDF(用户自定义标量值函数)、UDAF(自定义聚合函数)两种,关于两种函数的详细说明,请参考Spark官方文档(https://spark.apache.org/docs/2.4.0/api/java/index.html)。现分别对两种函数示例如下:

1.      UDF

如下代码示例,展示了将字符串作为小写输出的函数。

import org.apache.hadoop.hive.ql.exec.UDF;

import org.apache.hadoop.io.Text;

 

/**

 * 用户自定义标量值函数 UDF

 *

 * 必须继承 {@link UDF}

 */

public final class UdfLower extends UDF {

  // 须重写evaluate方法,自定义该方法的参数类型、个数和返回值

  public Text evaluate(final Text str) {

    if (str == null) {

        return null;

    }

    return new Text(str.toString().toLowerCase());

  }

}

 

2.      UDAF

如下代码示例,展示了聚合函数。

import org.apache.hadoop.hive.ql.exec.UDAF;

import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

 

/**

 * This is a simple UDAF that calculates avg number.

 * <p>

 * It should be very easy to follow and can be used as an example for writing

 * new UDAFs.

 */

//用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类

public class AvgDemo extends UDAF {

  public static class AvgState {

    private long mCount;

    private double mSum;

 

  }

 

// 声明一个静态内部类,实现了org.apache.hadoop.hive.ql.exec.UDAFEvaluator 接口

  public static class AvgEvaluator implements UDAFEvaluator {

    AvgState state;

 

    public AvgEvaluator() {

      super();

      state = new AvgState();

      init();

    }

 

    /**

     * Reset the state of the aggregation.

     * init()方法负责初始化

     */

 

    public void init() {

      state.mSum = 0;

      state.mCount = 0;

    }

 

    /**

     * Iterate through one row of original data.

     * <p>

     * This UDF accepts a single IntWritable argument, so we use

     * IntWritable argument

     * <p>

     * This function should always return true.

     * iterate()方法处理读入的行数据,会根据计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true

     */

 

    public boolean iterate(Double o) {

      if (o != null) {

        state.mSum += o;

        state.mCount++;

      }

      return true;

    }

 

    /**

     * terminatePartial无参数,其为iterate函数遍历结束后,返回轮转数据,* terminatePartial类似于hadoopCombiner * * @return

     */

 

    public AvgState terminatePartial() {

      // combiner

      return state.mCount == 0 ? null : state;

    }

 

    /**

     * Merge with a partial aggregation.

     * <p>

     * This function should always have a single argument which has the same

     * type as the return value of terminatePartial().

     * <p>

     * This function should always return true.

     * merge()方法合并上述处理结果,只有一个形参,且参数类型应和terminatePartial()方法的返回值类型一样,该方法的返回值类型

应为true

     */

 

    public boolean merge(AvgState avgState) {

      if (avgState != null) {

        state.mCount += avgState.mCount;

        state.mSum += avgState.mSum;

      }

      return true;

    }

 

    /**

     * Terminates the aggregation and return the final result.

     * terminate()方法返回最终值

     */

    public Double terminate() {

      return state.mCount == 0 ? null : Double.valueOf(state.mSum / state.mCount);

    }

  }

}