package com.bjsxt;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* 用户自定义函数
* @author Administrator
*
*/
public class UDF {
public static void main(String[] args) {
SparkConf conf=new SparkConf().setAppName("udf").setMaster("local");
JavaSparkContext sc=new JavaSparkContext(conf);
SQLContext sqlContext=new SQLContext(sc);
JavaRDD
JavaRDD
@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
/**
* 动态创建schema的方式
*/
List
fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/**
* 根据函数的个数来决定实现那个UDF,UDF1,UDF2...
*/
sqlContext.udf().register("StrLen", new UDF1
@Override
public Integer call(String t1) throws Exception {
return t1.length();
}
},DataTypes.IntegerType);
sqlContext.sql("select name,StrLen(name) as Length from user").show();
System.out.println("********************************************8");
sqlContext.udf().register("StrLen",new UDF2
@Override
public Integer call(String t1, Integer t2) throws Exception {
return t1.length()+t2;
}
}, DataTypes.IntegerType);
sqlContext.sql("select name , StrLen(name,10) as length from user").show();
sc.stop();
}
}