频道栏目
首页 > 资讯 > JAVA > 正文

JAVA纯代码关于用户创建自定义函数

18-07-27        来源:[db:作者]  
收藏   我要投稿

package com.bjsxt;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* 用户自定义函数
* @author Administrator
*
*/

public class UDF {
public static void main(String[] args) {
SparkConf conf=new SparkConf().setAppName("udf").setMaster("local");
JavaSparkContext sc=new JavaSparkContext(conf);
SQLContext sqlContext=new SQLContext(sc);
JavaRDD parallelize = sc.parallelize(Arrays.asList("zhangsan","lisi","wangwu","maliu"));
JavaRDD rowRDD = parallelize.map(new Function() {

@Override
public Row call(String s) throws Exception {

return RowFactory.create(s);
}
});

/**
* 动态创建schema的方式
*/
List fields=new ArrayList();
fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/**
* 根据函数的个数来决定实现那个UDF,UDF1,UDF2...
*/
sqlContext.udf().register("StrLen", new UDF1(){

@Override
public Integer call(String t1) throws Exception {

return t1.length();
}
},DataTypes.IntegerType);
sqlContext.sql("select name,StrLen(name) as Length from user").show();
System.out.println("********************************************8");
sqlContext.udf().register("StrLen",new UDF2(){

@Override
public Integer call(String t1, Integer t2) throws Exception {

return t1.length()+t2;
}

}, DataTypes.IntegerType);
sqlContext.sql("select name , StrLen(name,10) as length from user").show();
sc.stop();
}
}

相关TAG标签
上一篇:ErrorstartingApplicationContext.SpringBoot启动失败时该如何操作
下一篇:关于elasticsearchJavaHighLevelREST的封装操作
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站