频道栏目
首页 > 网络 > 云计算 > 正文

SparkStreaming解析

2018-09-13 15:59:27           
收藏   我要投稿

1.UpdateBykey和mapWithState

俩个实现的功能都是累加,但是updateBykey是1.6版本之前的,mapWithState是之后的并且更加实用!!!源码记得去看

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

2.transform

(1)作用:DStream整合RDD

package Streaming02

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer

object LeftJoinStramingApp {

def main(args: Array[String]): Unit = {

val sparkconf=new SparkConf().setMaster("local[2]").setAppName("LeftJoinStramingApp")

//这三个真是一个都不能少

val ssc=new StreamingContext(sparkconf,Seconds(10))

//数据二:rdd

val input2=new ListBuffer[(String,Boolean)]

input2.append(("www.baidu.com",true))

val data2=ssc.sparkContext.parallelize(input2) //构建rdd是sparkContext的,但是我们没有,所以调用一个再parallelize

//因为下面的调用是先创建在调用,所以得移到上面来

//数据一:nc -lk 9999 过来

val lines=ssc.socketTextStream("192.168.137.251",9999)//创建输入流

lines.map(x=>(x.split(",")(0),x)) .transform(rdd=>{

rdd.leftOuterJoin(data2).filter(x=>{ //这一步就是DS结合RDD的操作!!!!这是一个实时过滤,

会有个test给你处理,你需要将他过滤,然而这不是最好的,最好的直接将test广播变量出去,然后直接filter就好

x._2._2.getOrElse(false)!=true

}).map(_._2._1)

}).print()

//这是一个DStream,DStram是由一系列的RDD构成的不加x的时候表示分割去除第一个值,加了x之后表示成为key,value的类型

ssc.start()

ssc.awaitTermination()

}

}

3.做leftjoin取出需要的数据!这是RddJoin

package Streaming02

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer

object LeftJoinApp {

def main(args: Array[String]): Unit = {

val sparkconf=new SparkConf().setAppName("LeftJoinApp").setMaster("local[2]")

//setMaster不能乱写!要记得为什么不能这样写?

val sc=new SparkContext(sparkconf)

//数据一。域名,和流量

val input1=new ListBuffer[(String,Long)] //本地创建数据

input1.append(("www.ruozedata.com",8888))

input1.append(("www.ruozedata.com",9999))

input1.append(("www.baidu.com",7777))

val data1=sc.parallelize(input1) //装成RDD

//数据二

val inpu2=new ListBuffer[(String,Boolean)]

inpu2.append(("www.baidu.com",true))

val data2=sc.parallelize(inpu2)

data1.leftOuterJoin(data2) //这个得到是左边表的所有数据但这不是我们想要的

.filter(x=>{

x._2._2.getOrElse(false)!=true //过滤取出第二个不是true的值,但还有none的值

})

.map(x=>(x._1,x._2._1)) //过滤之后直接根据数据模型来取出值

.collect().foreach(println)

sc.stop()

}

}

4.foreachRDD

作用:一般在输出的时候用,例如写到MySQL中去写数据库都是这样的

package Streaming02

import java.sql.DriverManager

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

object foreachRDDApp {

def main(args: Array[String]): Unit = {

val sparkconf=new SparkConf().setAppName("foreacRDDApp").setMaster("local[2]")

val ssc=new StreamingContext(sparkconf,Seconds(10))

//WC写到MySQL里面去

val lines=ssc.socketTextStream("192.168.137.251",9998)

val re=lines.flatMap(_.split(",")).map((_,1))reduceByKey(_+_)

//re结果写入MySQL

/*re.foreachRDD{rdd=>

//val con=createConnection() //将connection放在driver端,这是有问题的因为,connection应该是在work上的

rdd.foreach{ rec=>

val con=createConnection() //将connection放在excutor端来,这也有问题,如果是一百万的数据,那需要多少个connection

val word=rec._1

val count=rec._2

val sql=s"insert into wc(word,c) values('$word',$count)"

con.createStatement().execute(sql)

con.close()

}

}*/

re.foreachRDD{ rdd=>

rdd.foreachPartition{ //这是工作中最常用的方式

par=> val con=createConnection()

par.foreach(rec=>{

val word=rec._1

val count=rec._2

val sql=s"insert into wc(word,c) values('$word',$count)"

con.createStatement().execute(sql)

})

con.close()

}

}

ssc.start()

ssc.awaitTermination()

}

def createConnection()={

Class.forName("com.mysql.jdbc.Driver")

DriverManager.getConnection("jdbc:mysql://192.168.137.251:3306/g3","root","123456")

//注意这个url的格式!!!!!

}

}

(2)最好的方法是ConnectionPool

我们用的是BoneCP ,添加依赖

com.jolbox

bonecp

0.8.0.RELEASE

5.window

(1)这个spark官网看看

相关TAG标签 SparkStreaming 解析
上一篇:云数据库解析
下一篇:python2.7默认编码修改为utf-8
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站