SparkMllib之线性回归解析

2018-09-11 14:28:11

1. 线性最小二乘，Lasso回归和岭回归

MLlib的所有方法都使用Java友好类型，因此您可以像在Scala中一样导入和调用它们。唯一需要注意的是，这些方法使用Scala RDD对象，而Spark Java API使用单独的JavaRDD类。您可以通过在JavaRDD对象上调用.rdd（）将Java RDD转换为Scala。下面提供了Scala代码段的相应Java示例：

 SparkConf conf = new SparkConf().setAppName("JavaLinearRegressionWithSGDExample").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); String path = "F:\\Learning\\java\\project\\LearningSpark\\src\\main\\resources\\lpsa.data"; JavaRDD data = sc.textFile(path); JavaRDD parsedData = data.map(line -> { String[] parts = line.split(","); String[] features = parts[1].split(" "); double[] v = new double[features.length]; for (int i = 0; i < features.length - 1; i++) { v[i] = Double.parseDouble(features[i]); } return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); }); parsedData.cache(); // 模型构建和训练 int numIterations = 100; // 迭代次数 double stepSize = 0.00000001; // 学习率 LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations, stepSize); JavaPairRDD valueAndPreds = parsedData .mapToPair(point -> new Tuple2<>(model.predict(point.features()), point.label())); double MSE = valueAndPreds.mapToDouble(pair -> { double diff = pair._1() - pair._2(); return diff * diff; }).mean(); System.out.println("训练数据的均方误差为:" + MSE);

2. Streaming linear regression

 import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse).cache() val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse) val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination()