用户可以根据需要自定义Spark函数,本小节介绍了函数的开发过程,并给出了参考样例。
开发函数前,需要准备如下环境:
下载并安装OpenJDK 1.8(关于OpenJDK的更多信息,请参考http://openjdk.java.net/用户可以根据需要选择不同厂商的OpenJDK发行版)。
下载并安装支持Java语言开发的IDE(集成开发环境),如Eclipse(详见https://www.eclipse.org/)等。
下载并安装Maven,详见https://maven.apache.org/介绍。
在IDE中新建一个Maven工程,Pom文件中导入如下依赖。
当前数据运营平台支持Hadoop、Hive版本均如下所示,请不要随意更改该版本。 |
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0-cdh6.2.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.1-cdh6.2.0</version> </dependency> </dependencies> |
自定义函数主要有UDF(用户自定义标量值函数)、UDAF(自定义聚合函数)两种,关于两种函数的详细说明,请参考Spark官方文档(https://spark.apache.org/docs/2.4.0/api/java/index.html)。现分别对两种函数示例如下:
如下代码示例,展示了将字符串作为小写输出的函数。
import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text;
/** * 用户自定义标量值函数 UDF * * 必须继承 {@link UDF} */ public final class UdfLower extends UDF { // 须重写evaluate方法,自定义该方法的参数类型、个数和返回值 public Text evaluate(final Text str) { if (str == null) { return null; } return new Text(str.toString().toLowerCase()); } } |
如下代码示例,展示了聚合函数。
import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
/** * This is a simple UDAF that calculates avg number. * <p> * It should be very easy to follow and can be used as an example for writing * new UDAFs. */ //用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类 public class AvgDemo extends UDAF { public static class AvgState { private long mCount; private double mSum;
}
// 声明一个静态内部类,实现了org.apache.hadoop.hive.ql.exec.UDAFEvaluator 接口 public static class AvgEvaluator implements UDAFEvaluator { AvgState state;
public AvgEvaluator() { super(); state = new AvgState(); init(); }
/** * Reset the state of the aggregation. * init()方法负责初始化 */
public void init() { state.mSum = 0; state.mCount = 0; }
/** * Iterate through one row of original data. * <p> * This UDF accepts a single IntWritable argument, so we use * IntWritable argument * <p> * This function should always return true. * iterate()方法处理读入的行数据,会根据计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true。 */
public boolean iterate(Double o) { if (o != null) { state.mSum += o; state.mCount++; } return true; }
/** * terminatePartial无参数,其为iterate函数遍历结束后,返回轮转数据,* terminatePartial类似于hadoop的Combiner * * @return */
public AvgState terminatePartial() { // combiner return state.mCount == 0 ? null : state; }
/** * Merge with a partial aggregation. * <p> * This function should always have a single argument which has the same * type as the return value of terminatePartial(). * <p> * This function should always return true. * merge()方法合并上述处理结果,只有一个形参,且参数类型应和terminatePartial()方法的返回值类型一样,该方法的返回值类型 应为true */
public boolean merge(AvgState avgState) { if (avgState != null) { state.mCount += avgState.mCount; state.mSum += avgState.mSum; } return true; }
/** * Terminates the aggregation and return the final result. * terminate()方法返回最终值 */ public Double terminate() { return state.mCount == 0 ? null : Double.valueOf(state.mSum / state.mCount); } } } |