频道栏目
首页 > 网络 > 云计算 > 正文

移动电商交互平台搭建方法

2018-06-12 10:14:23      个评论    来源:石印掌纹的博客  
收藏   我要投稿

搭建Centos7集群

安装centos7,jdk

vim /etc/hostname

vim /etc/hosts,写入所有集群节点ip

\

设置网关

vim /etc/sysconfig/network-scripts/ifcfg-eno16777736

设置ssh

在根目录下生成秘钥

ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa

将s110的公钥文件id_rsa.pub远程复制到111,112主机上。

scp id_rsa.pub root@s111:/root/.ssh/authorized_keys

这样就可以在s110上直接登录其他节点了

ssh s111

搭建hadoop集群(CDH)

CDH版(Cloudera Hadoop)的hadoop是企业级的hadoop,基于apache的hadoop做了完善以及封装

1、将hadoop-2.5.0-cdh5.3.6.tar.gz,上传到虚拟机的/usr/local目录下。

(http://archive.cloudera.com/cdh5/cdh/5/)

2、将hadoop包进行解压缩:tar -zxvf hadoop-2.5.0-cdh5.3.6.tar.gz

3、对hadoop目录进行重命名:mv hadoop-2.5.0-cdh5.3.6 hadoop

4、配置hadoop相关环境变量

vi ~/.bashrc

export HADOOP_HOME=/usr/local/hadoop

export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin

source ~/.bashrc

5、创建/usr/local/data目录

6 在根目录下创建软连接

ln -s/usr/local/soft/hadoop-2.5.0-cdh/ hadoop

7 cd hadoop/etc/hadoop/

8 vim core-site.xml

fs.default.name

hdfs://服务名:9000

9 vim hdfs-site.xml

dfs.name.dir

/usr/local/data/namenode

dfs.data.dir

/usr/local/data/datanode //上面设置的数据文件存放目录

dfs.tmp.dir

/usr/local/data/tmp

dfs.replication

2 //从节点数量

10vim mapred-site.xml

mapreduce.framework.name

yarn

11vim yarn-site.xml

yarn.resourcemanager.hostname

服务名

yarn.nodemanager.aux-services

mapreduce_shuffle

12 vim slaves

\

13 将已经配置好的hadoop复制到各个从节点下

14 在各个从节点配置hadoop的环境变量

vim ~/.bashrc

15 记得在各个从节点中创建/usr/local/data目录

16 启动hdfs集群

(1)格式化namenode:在主节点上执行命令:hdfs namenode -format

(2)启动hdfs集群:start-dfs.sh

(3)验证是否启动成功:jps,查看50070端口

\
\

上传一个文件测试是否已经搭建成功,输入

hadoop fs -put hello.txt /hello

\

浏览器输入http://192.168.25.110:50070

\
\

可以看到刚才的hello已经上传上来了

\
\

启动yarn集群

start-yarn.sh

\

浏览器访问:http://192.168.25.110:8088

\

安装mysql

1、在主节点上安装mysql。

2、使用yum安装mysql server。

yum install -y mysql-server

service mysqld start

chkconfig mysqld on

3、使用yum安装mysql connector

yum install -y mysql-connector-java

4、将mysql connector拷贝到hive的lib包中

cp /usr/share/java/mysql-connector-java-5.1.17.jar /usr/local/hive/lib

5、在mysql上创建hive元数据库,创建hive账号,并进行授权

create database if not exists hive_metadata;

grant all privileges on hive_metadata.* to 'hive'@'%' identified by 'hive';

grant all privileges on hive_metadata.* to 'hive'@'localhost' identified by 'hive';

grant all privileges on hive_metadata.* to 'hive'@'spark1' identified by 'hive';

flush privileges;

use hive_metadata;

安装hive

1 上传hive-0.13.1-cdh5.3.6.tar.gz

2 解压到/usr/local/soft下

3 在根目录下创建该目录的软连接

4 ln -s/usr/local/soft/hive-0.13.1-cdh5.3.6/

5 配置环境变量vim ~/.bashrc

6 配置hive-site.xml文件

mv hive-default.xml.template hive-site.xml

javax.jdo.option.ConnectionURL

jdbc:mysql://s110:3306/hive_metadatacreateDatabaseIfNotExist=true

javax.jdo.option.ConnectionDriverName

com.mysql.jdbc.Driver

javax.jdo.option.ConnectionUserName

hive

javax.jdo.option.ConnectionPassword

hive

7mv hive-env.sh.template hive-env.sh

8vi /usr/local/hive/bin/hive-config.sh

export JAVA_HOME=/usr/java/latest

export HIVE_HOME=/usr/local/hive

export HADOOP_HOME=/usr/local/hadoop

9 进入hive

hive

创建数据库

create database mydb;

use mydb;

创建表,按照 ',' 分割

create table users(id int,name string) row format delimited fields terminated by ',';

load data local inpath '/home/download/users.txt' into table users;

users.txt

1,leo

2,jack

3,jen

4,marry

5,tom

\

搭建Zookeeper集群

1、将zookeeper-3.4.5-cdh5.3.6.tar.gz使用WinSCP拷贝到sparkproject1的/usr/local目录下。

2、对zookeeper-3.4.5-cdh5.3.6.tar.gz进行解压缩:tar -zxvf zookeeper-3.4.5-cdh5.3.6.tar.gz。

3、对zookeeper目录进行重命名:mv zookeeper-3.4.5-cdh5.3.6 zk。

4、根目录下创建zookeeper的软连接

ln -s /usr/local/soft/zookeeper-3.4.5-cdh5.3.6/ zookeeper

5、配置zookeeper相关的环境变量

vi ~/.bashrc

export ZOOKEEPER_HOME=/usr/local/zk

export PATH=$ZOOKEEPER_HOME/bin

source ~/.bashrc

6cd zookeeper/conf/

mv zoo_sample.cfg zoo.cfg

vim zoo.cfg

安装scala

1 将scala上传至/usr/local目录下

2 解压至/usr/local/soft目录下

3 在根目录下创建软连接

4 配置环境变量

5 scala -version检查是否已经配置成功

\

搭建kafka集群

1、将kafka_2.9.2-0.8.1.tgz使用WinSCP拷贝到sparkproject1的/usr/local目录下。

2、对kafka_2.9.2-0.8.1.tgz进行解压缩:tar -zxvf kafka_2.9.2-0.8.1.tgz。

3、对kafka目录进行改名:mv kafka_2.9.2-0.8.1 kafka,可以不配置环境变量

4、配置kafka

vi /usr/local/kafka/config/server.properties

broker.id:依次增长的整数,0、1、2,集群中Broker的唯一id

zookeeper.connect=192.168.1.105:2181,192.168.1.106:2181,192.168.1.107:2181

5、安装slf4j

将课程提供的slf4j-1.7.6.zip上传到/usr/local目录下

unzip slf4j-1.7.6.zip

把slf4j中的slf4j-nop-1.7.6.jar复制到kafka的libs目录下面

启动kafka集群

1、解决kafka Unrecognized VM option 'UseCompressedOops'问题

vi /usr/local/kafka/bin/kafka-run-class.sh

if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then

KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC

-XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark

-XX:+DisableExplicitGC -Djava.awt.headless=true"

fi

去掉-XX:+UseCompressedOops即可

2、在三台机器上的kafka目录下,分别执行以下命令:nohup bin/kafka-server-start.sh config/server.properties &

3、使用jps检查启动是否成功

测试kafka集群是否搭建成功

使用基本命令检查kafka是否搭建成功

1 创建一个队列

bin/kafka-topics.sh --zookeeper 192.168.25.110:2181,192.168.25.111:2181,192.168.25.112:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create

2 创建一个生产者

bin/kafka-console-producer.sh --broker-list s110:9092,s111:9092,s112:9092 --topic TestTopic

3 创建一个消费者

bin/kafka-console-consumer.sh --zookeeper s110:2181,s111:2181,s112:2181 --topic TestTopic --from-beginning

安装flume

1、将flume-ng-1.5.0-cdh5.3.6.tar.gz使用WinSCP拷贝到s110的/usr/local目录下。

2、对flume进行解压缩:tar -zxvf flume-ng-1.5.0-cdh5.3.6.tar.gz

3、对flume目录进行重命名:mv apache-flume-1.5.0-cdh5.3.6-bin flume

4、配置scala相关的环境变量

vi ~/.bashrc

export FLUME_HOME=/usr/local/flume

export FLUME_CONF_DIR=$FLUME_HOME/conf

export PATH=$FLUME_HOME/bin

source ~/.bashrc

5、修改flume-conf.properties文件

vi /usr/local/flume/conf/flume-conf.properties

#agent1表示代理名称

agent1.sources=source1

agent1.sinks=sink1

agent1.channels=channel1

#配置source1

agent1.sources.source1.type=spooldir

agent1.sources.source1.spoolDir=/usr/local/logs

agent1.sources.source1.channels=channel1

agent1.sources.source1.fileHeader = false

agent1.sources.source1.interceptors = i1

agent1.sources.source1.interceptors.i1.type = timestamp

#配置channel1

agent1.channels.channel1.type=file

agent1.channels.channel1.checkpointDir=/usr/local/logs_tmp_cp

agent1.channels.channel1.dataDirs=/usr/local/logs_tmp

#配置sink1

agent1.sinks.sink1.type=hdfs

agent1.sinks.sink1.hdfs.path=hdfs://sparkproject1:9000/logs

agent1.sinks.sink1.hdfs.fileType=DataStream

agent1.sinks.sink1.hdfs.writeFormat=TEXT

agent1.sinks.sink1.hdfs.rollInterval=1

agent1.sinks.sink1.channel=channel1

agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d

6、创建需要的文件夹

本地文件夹:mkdir /usr/local/logs

HDFS文件夹:hdfs dfs -mkdir /logs

7、启动flume-agent

flume-ng agent -n agent1 -c conf -f /root/flume/conf/flume-conf.properties -Dflume.root.logger=DEBUG,console

8、测试flume

新建一份文件,移动到/usr/local/logs目录下,flume就会自动上传到HDFS的/logs目录中

hadoop fs -lsr /logs 或者 hdfs dfs -lsr /logs 查看目录是否有上传文件进去

Spark搭建

这里我们没有搭建Spark集群,只是给linux服务器部署一个Spark的客户端,你的应用系统底层执行封装了Spark-Submit命令的shell脚本,这个脚本提交你底层的Spark作业,可以采用yarn的提交模式,你的Spark客户端就会把这个Spark作业提交到yarn集群里面去跑。

1、将spark-1.5.1-bin-hadoop2.4.tgz使用WinSCP上传到/usr/local目录下。

2、解压缩spark包:tar -zxvf spark-1.5.1-bin-hadoop2.4.tgz。

3、重命名spark目录:mv spark-1.5.1-bin-hadoop2.4 spark

4、修改spark环境变量

vi ~/.bashrc

export SPARK_HOME=/usr/local/spark

export PATH=$SPARK_HOME/bin

export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib

source ~/.bashrc

修改spark-env.sh文件

1、cd /usr/local/spark/conf

2、cp spark-env.sh.template spark-env.sh

3、vi spark-env.sh

export JAVA_HOME=/usr/java/latest

export SCALA_HOME=/usr/local/scala

export HADOOP_HOME=/usr/local/hadoop

export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop

用yarn-client模式提交spark作业

/usr/local/spark/bin/spark-submit \

--class org.apache.spark.example.JavaSparkPi \

--master yarn-client \

--num-executors 1 \

--driver-memory 10m \

--executor-memory 10m \

--executor-cores 1 \

/usr/local/spark/lib/spark-examples-1.5.1-hadoop2.4.0.jar \

用yarn-cluster模式提交spark作业

/usr/local/spark/bin/spark-submit \

--class org.apache.spark.example.JavaSparkPi \

--master yarn-cluster \

--num-executors 1 \

--driver-memory 10m \

--executor-memory 10m \

--executor-cores 1 \

/usr/local/spark/lib/spark-examples-1.5.1-hadoop2.4.0.jar \

Spark以及生态圈介绍

一、Spark简介

官方解释:Apache Sparkis a fast and general engine for large-scale dataprocessing.

打开官网网站解释一下。

二、Spark关键词

术语描述

ApplicationSpark的应用程序,包含一个Driver program和若干Executor

SparkContextSpark应用程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor

Driver Program运行Application的main()函数并且创建SparkContext

Executor是为Application运行在Worker node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上。

每个Application都会申请各自的Executor来处理任务

Cluster Manager在集群上获取资源的外部服务

(例如:Standalone、Mesos、Yarn)

Worker Node集群中任何可以运行Application代码的节点,运行一个或多个Executor进程

Task运行在Executor上的工作单元

JobSparkContext提交的具体Action操作,常和Action对应

Stage每个Job会被拆分很多组task,每组任务被称为Stage,也称TaskSet

RDD是Resilient distributed datasets的简称,中文为弹性分布式数据集;是Spark最核心的模块和类

DAGScheduler根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler

TaskScheduler将Taskset提交给Worker node集群运行并返回结果

Transformations是Spark API的一种类型,Transformation返回值还是一个RDD,

所有的Transformation采用的都是懒策略,如果只是将Transformation提交是不会执行计算的

Action是Spark API的一种类型,Action返回值不是一个RDD,而是一个scala集合;计算只有在Action被提交的时候计算才被触发。

三、Spark生态系统

Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的

Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。

Spark Streaming:对实时数据流进行处理和控制。Spark Streaming允许程序能够像普通RDD一样处理实时数据

MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。

GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作

Spark Core:

提供了有向无环图(DAG)的分布式并行计算框架,并提供Cache机制来支持多次迭代计算或者数据共享,大大减少迭代计算之间读取数据局的开销,这对于需要进行多次迭代的数据挖掘和分析性能有很大提升

l 在Spark中引入了RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”对它们进行重建,保证了数据的高容错性;

l 移动计算而非移动数据,RDD Partition可以就近读取分布式文件系统中的数据块到各个节点内存中进行计算

l 使用多线程池模型来减少task启动开稍

l 采用容错的、高可伸缩性的akka作为通讯框架

SparkStreaming:

SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP套接字)进行类似Map、Reduce和Join等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。

计算流程:Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备。下图显示了Spark Streaming的整个流程。

l容错性:对于流式计算来说,容错性至关重要。首先我们要明确一下Spark中RDD的容错机制。每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。

对于Spark Streaming来说,其RDD的传承关系如下图所示,图中的每一个椭圆形表示一个RDD,椭圆形中的每个圆形代表一个RDD中的一个Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD。我们可以看到图中的每一个RDD都是通过lineage相连接的,由于Spark Streaming输入数据可以来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每一个数据流拷贝两份到其他的机器)都能保证容错性,所以RDD中任意的Partition出错,都可以并行地在其他机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。

l实时性:对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。

l扩展性与吞吐量:Spark目前在EC2上已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍,图4是Berkeley利用WordCount和Grep两个用例所做的测试,在Grep这个测试中,Spark Streaming中的每个节点的吞吐量是670k records/s,而Storm是115k records/s。

Spark SQL:

SparkSQL允许开发人员直接处理RDD,同时也可查询例如在 Apache Hive上存在的外部数据。Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析。

MLlib:

MLlib是Spark实现一些常见的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维以及底层优化,该算法可以进行可扩充;MLRuntime基于Spark计算框架,将Spark的分布式计算应用到机器学习领域。

GraphX:

GraphX是Spark中用于图(e.g., Web-Graphs and Social Networks)和图并行计算(e.g., PageRank and CollaborativeFiltering)的API,可以认为是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重写及优化,跟其他分布式图计算框架相比,GraphX最大的贡献是,在Spark之上提供一栈式数据解决方案,可以方便且高效地完成图计算的一整套流水作业。GraphX最先是伯克利AMPLAB的一个分布式图计算框架项目,后来整合到Spark中成为一个核心组件

Spark运行模式

一、 Spark运行架构介绍

术语定义:

Application:指的是用户编写的Spark应用程序,包含了一个Driver功能的代码和分布在集群中多节点上运行的Executor代码。

Driver:Spark中的Driver就是运行Application的main()函数,并且创建SparkContext。SparkContext为Spark准备运行环境,它负责和ClusterManager通信,进行资源的申请、任务的分配和监控,当Executor部分运行完毕后,负责将SparkContext关闭。

Worker:集群中运行Application代码的节点。

Executor:Application运行在Worker节点上的一个进程,该进程负责运行Task。

Cluster Manager:在集群上获取资源的外部服务。

作业(Job):包含多个Task组成的并行计算,job包括多个RDD以及作用于RDD上的各种操作。

阶段(Stage):每个Job会被拆分成很多组Task,每组任务被称为Stage。

任务(Task):被送到某个Executor上的工作任务。

Spark运行基本流程图

\

1、构建Spark Application的运行环境,启动SparkContext,SparkContext向资源管理器注册并且申请运行Executor资源;

2、资源管理器分配Executor资源并启动ExecutorBackend,Executor运行情况将随着心跳发送到资源管理器;

3、SparkContext构建DAG图,将DAG图分解成Stage,并把TaskSet发送给Task Scheduler。Executor向SparkContext申请Task,TaskScheduler将Task发放给Executor运行同时将SparkContext将应用程序代码发放给Executor;

4、Task在Executor上运行,运行完毕释放所有资源;

DAGScheduler

DAGSchuduler把一个Spark作业换成Stage的DAG,根据RDD和Stage之间的关系找出开销最小的调度方法,然后把Stage以TaskSet的形式提交给TaskScheduler。

TaskScheduler

TaskScheduler维护所有的TaskSet,当Executor向Driver发送心跳时,TaskScheduler会根据其资源剩余情况分配相应的Task。

Job、Stage、Task

\

通过观察,这段代码,只有一个job,两个stage,有66个task。

详细介绍一下job、stage、task

job:所谓一个 job,就是由一个 rdd 的 action 触发的动作,可以简单的理解为,当你需要执行一个 rdd 的 action 的时候,会生成一个 job。

stage: stage 是一个 job 的组成单位,就是说,一个 job 会被切分成 1 个或 1 个以上的 stage,然后各个 stage 会按照执行顺序依次执行。至于 job 根据什么标准来切分 stage,下几节我们再介绍RDD的时候详细介绍。这里只是说一下,是按照窄依赖和宽依赖来切分的。

task:即 stage 下的一个任务执行单元,一般来说,一个 rdd 有多少个 partition,就会有多少个 task,因为每一个 task 只是处理一个 partition 上的数据。从 web ui 截图上我们可以看到,这个 job 一共有 2 个 stage,66 个 task,平均下来每个 stage 有 33 个 task,相当于每个 stage 的数据都有 33 个 partition。

运行流程:

1、Job:spark应用里只有一个 job,那就是因为我们执行了一个collect操作,即把处理后的数据全部返回到我们的 driver 上。

2、Stage:

\

第一个 stage,即截图中 stage id 为 0 的 stage,其执行了sc.wholeTextFiles().map().flatMap().map().reduceByKey()这几个步骤,因为这是一个Shuffle操作,所以后面会有Shuffle Read和ShuffleWrite。具体来说,就是在 stage 0 这个 stage 中,发生了一个 Shuffle 操作,这个操作读入 22.5 MB 的数据,生成 41.7 KB 的数据,并把生成的数据写在了硬盘上。

第二个 stage,即截图中 stage id 为 1 到 stage,其执行了collect()这个操作,因为这是一个action操作,并且它上一步是一个 Shuffle 操作,且没有后续操作,所以这里collect()这个操作被独立成一个 stage 了。这里它把上一个 Shuffle 写下的数据读取进来,然后一起返回到 driver 端,所以这里可以看到他的Shuffle Read这里刚好读取了上一个 stage 写下的数据。

3、Task:这里就不用介绍了。就是根据分区运行的task。

二、 Local模式运行流程图

1、启动应用程序,在SparkContext启动过程中,初始化DAGScheduler和TaskSchedulerImpl调度器

\

Spark编程模型RDD设计以及运行原理

一、RDD介绍

RDD:弹性分布式数据集,是一个分区的只读记录的集合。也可以这样理解,是一个提供了许多操作接口的数据集合。它还包括容错、数据集内的数据可以并行处理等。

二、RDD操作类型

RDD的操作类型分为两类,转换(transformations)和行动(action),转换是根据原有的RDD创建一个新的RDD,行动是对RDD操作后把结果返回给driver。

RDD的所有转换操作都是lazy模式,即Spark不会立刻结算结果,而只是简单的记住所有对数据集的转换操作。这些转换只有遇到action操作的时候才会开始计算。

三、RDD依赖关系

RDD提供了许多转换操作,每个转换操作都会生成新的RDD,这时候新的RDD便依赖于原有的RDD,这种RDD之间的依赖关系最终形成DAG。

RDD之间的依赖关系分为两种,为窄依赖和宽依赖。

宽依赖:RDD的每个partition都依赖于父RDD的所有Partition。

窄依赖:只依赖一个或部分的Partition。

\

四、RDD partitioner与并行度

每个RDD都有Partitioner属性,它决定了该RDD如何分区,当然Partition的个数还将决定每个Stage的Task个数。当前Spark需要应用设置Stage的并行Task个数(配置项为:spark.default.parallelism),在未设置的情况下,子RDD会根据父RDD的Partition决定,如map操作下子RDD的Partition与父Partition完全一致,Union操作时子RDD的Partition个数为父Partition个数之和。

如何设置spark.default.parallelism对用户是一个挑战,它会很大程度上决定Spark程序的性能。

RDD简介

最外层是一个worker节点,worker节点中启动进程,woker启动executor,真正完成计算的是executor(基于内存),executor中由部分的partition,RDD就是所有节点上的partition

1 有很多的partition

2 一个函数会作用与所有的parition(准确的是作用于其中的数据)

3 rdd前后是有依赖的,也就是说当数据丢失时(服务器宕机),也可以依赖于前面的RDD进行数据恢复

4 如果是key-value的rdd会默认选用hash partitioner(分区器)

5 rdd会自动选取最佳位置,在有数据的节点上进行计算,移动计算而不是移动数据

\

reduceByKey的效率是比groupByKey高的,因为reduceByKey有局部聚合的功能,groupByKey并没有

离线日志采集流程简介

1我们的数据从哪里来

互联网行业:网站、app、系统(交易系统。。)

传统行业:电信,人们的上网、打电话、发短信等等数据

数据源:网站、app

都要往我们的后台去发送请求,获取数据,执行业务逻辑;app获取要展现的商品数据;发送请求到后台进行交易和结账

2网站/app会发送请求到后台服务器,通常会由Nginx接收请求,并进行转发

3 后台服务器,比如Tomcat、Jetty;但是,其实在面向大量用户,高并发(每秒访问量过万)的情况下,通常都不会直接是用Tomcat来接收请求。这种时候,通常,都是用Nginx来接收请求,并且后端接入Tomcat集群/Jetty集群,来进行高并发访问下的负载均衡。

比如说,Nginx,或者是Tomcat,你进行适当配置之后,所有请求的数据都会作为log存储起来;接收请求的后台系统(J2EE、PHP、Ruby On Rails),也可以按照你的规范,每接收一个请求,或者每执行一个业务逻辑,就往日志文件里面打一条log(到这里为止,我们的后台每天就至少可以产生一份日志文件)。

4日志文件(通常由我们预先设定的特殊的格式)通常每天一份。此时呢,由于可能有多份日志文件,因为有多个web服务器。

5一个日志转移的工具,比如自己用linux的crontab定时调度一个shell脚本/python脚本;或者自己用java开发一个后台服务,用quartz这样的框架进行定时调度。这个工具,负责将当天的所有日志的数据,都给采集起来,进行合并和处理,等操作;然后作为一份日志文件,给转移到flume agent正在监控的目录中。

6flume,按照我们上述所讲的;flume agent启动起来以后,可以实时的监控linux系统上面的某一个目录,看其中是否有新的文件进来。只要发现有新的日志文件进来,那么flume就会走后续的channel和sink。通常来说,sink都会配置为HDFS,flume负责将每天的一份log文件,传输到HDFS上。

7HDFS,Hadoop Distributed File System。Hadoop分布式文件系统。用来存储每天的log数据。为什么用hadoop进行存储呢。因为Hadoop可以存储大数据,大量数据。比如说,每天的日志,数据文件是一个T,那么,也许一天的日志文件,是可以存储在某个Linux系统上面,但是问题是,1个月的呢,1年的呢。当积累了大量数据以后,就不可能存储在单机上,只能存储在Hadoop大数据分布式存储系统中。

8使用Hadoop MapReduce,自己开发MR作业,可以用crontab定时调度工具来定时每天执行一次;也可以用Oozie来进行定时调度;也可以(百度、阿里、腾讯、京东、美团)自己组建团队来研发复杂、大型、分布式的调度系统,来承担全公司所有MapReduce / Hive作业的调度(对于大型公司来说,可能每天除了负责数据清洗的MR作业以外,后续的建立数据仓库、进行数据分析和统计的Hive ETL作业可能高达上万个,上十万、百万个),针对HDFS里的原始日志进行数据清洗,写入HDFS中另外一个文件

9Hadoop HDFS中的原始的日志数据,会经过数据清洗。为什么要进行数据清洗因为我们的数据中可能有很多是不符合预期的脏数据。

HDFS:存储一份经过数据清洗的日志文件。

10把HDFS中的清洗后的数据,给导入到Hive的某个表中。这里可以使用动态分区,Hive使用分区表,每个分区放一天的数据。

11Hive,底层也是基于HDFS,作为一个大数据的数据仓库。数据仓库内部,再往后,其实就是一些数据仓库建模的ETL。ETL会将原始日志所在的一个表,给转换成几十张,甚至上百张表。这几十,甚至上百张表,就是我们的数据仓库。然后呢,公司的统计分析人员,就会针对数据仓库中的表,执行临时的,或者每天定时调度的Hive SQL ETL作业。来进行大数据的统计和分析,Spark/Hdoop/Storm,大数据平台/系统,可能都会使用Hive中的数据仓库内部的表。

12我们的Spark大型大数据平台/系统,其实,通常来说,都会针对Hive中的数据来进行开发。也就是说,我们的Spark大数据系统,数据来源都是Hive中的某些表。这些表,可能都是经过大量的Hive ETL以后建立起来的数据仓库中的某些表。然后来开发特殊的,符合业务需求的大数据平台。通过大数据平台来给公司里的用户进行使用,来提供大数据的支持,推动公司的发展。

用户访问session分析模块功能介绍

1、对用户访问session进行分析

2、JDBC辅助类封装

3、用户访问session聚合统计

4、按时间比例随机抽取session

5、获取点击、下单和支付次数排名前10的品类

6、获取top10品类的点击次数最多的10个session

7、复杂性能调优全套解决方案

8、十亿级数据troubleshooting经验总结

9、数据倾斜全套完美解决方案

10、模块功能演示

在实际企业项目中的使用架构:

1、J2EE的平台(美观的前端页面),通过这个J2EE平台可以让使用者,提交各种各样的分析任务,其中就包括一个模块,就是用户访问session分析模块;可以指定各种各样的筛选条件,比如年龄范围、职业、城市等等。。

2、J2EE平台接收到了执行统计分析任务的请求之后,会调用底层的封装了spark-submit的shell脚本(Runtime、Process),shell脚本进而提交我们编写的Spark作业。

3、Spark作业获取使用者指定的筛选参数,然后运行复杂的作业逻辑,进行该模块的统计和分析。

4、Spark作业统计和分析的结果,会写入MySQL中,指定的表

5、最后,J2EE平台,使用者可以通过前端页面(美观),以表格、图表的形式展示和查看MySQL中存储的该统计分析任务的结果数据。

模块的目标:对用户访问session进行分析

1、可以根据使用者指定的某些条件,筛选出指定的一些用户(有特定年龄、职业、城市);

2、对这些用户在指定日期范围内发起的session,进行聚合统计,比如,统计出访问时长在0~3s的session占总session数量的比例;

3、按时间比例,比如一天有24个小时,其中12:00~13:00的session数量占当天总session数量的50%,当天总session数量是10000个,那么当天总共要抽取1000个session,ok,12:00~13:00的用户,就得抽取1000*50%=500。而且这500个需要随机抽取。

4、获取点击量、下单量和支付量都排名10的商品种类

5、获取top10的商品种类的点击数量排名前10的session

6、开发完毕了以上功能之后,需要进行大量、复杂、高端、全套的性能调优(大部分性能调优点,都是本人在实际开发过程中积累的经验,基本都是全网唯一)

7、十亿级数据量的troubleshooting(故障解决)的经验总结

8、数据倾斜的完美解决方案(全网唯一,非常高端,因为数据倾斜往往是大数据处理程序的性能杀手,很多人在遇到的时候,往往没有思路)

9、使用mock(模拟)的数据,对模块进行调试、运行和演示效果

用户访问session介绍:

用户在电商网站上,通常会有很多的点击行为,首页通常都是进入首页;然后可能点击首页上的一些商品;点击首页上的一些品类;也可能随时在搜索框里面搜索关键词;还可能将一些商品加入购物车;对购物车中的多个商品下订单;最后对订单中的多个商品进行支付。

用户的每一次操作,其实可以理解为一个action,比如点击、搜索、下单、支付

用户session,指的就是,从用户第一次进入首页,session就开始了。然后在一定时间范围内,直到最后操作完(可能做了几十次、甚至上百次操作)。离开网站,关闭浏览器,或者长时间没有做操作;那么session就结束了。

以上用户在网站内的访问过程,就称之为一次session。简单理解,session就是某一天某一个时间段内,某个用户对网站从打开/进入,到做了大量操作,到最后关闭浏览器。的过程。就叫做session。

session实际上就是一个电商网站中最基本的数据和大数据。那么大数据,面向C端,也就是customer,消费者,用户端的,分析,基本是最基本的就是面向用户访问行为/用户访问session。

基础数据结构介绍(数据调研)

表名:user_visit_action(Hive表)

date:日期,代表这个用户点击行为是在哪一天发生的

user_id:代表这个点击行为是哪一个用户执行的

session_id :唯一标识了某个用户的一个访问session

page_id :点击了某些商品/品类,也可能是搜索了某个关键词,然后进入了某个页面,页面的id

action_time :这个点击行为发生的时间点

search_keyword :如果用户执行的是一个搜索行为,比如说在网站/app中,搜索了某个关键词,然后会跳转到商品列表页面;搜索的关键词

click_category_id :可能是在网站首页,点击了某个品类(美食、电子设备、电脑)

click_product_id :可能是在网站首页,或者是在商品列表页,点击了某个商品(比如呷哺呷哺火锅XX路店3人套餐、iphone 6s)

order_category_ids :代表了可能将某些商品加入了购物车,然后一次性对购物车中的商品下了一个订单,这就代表了某次下单的行为中,有哪些

商品品类,可能有6个商品,但是就对应了2个品类,比如有3根火腿肠(食品品类),3个电池(日用品品类)

order_product_ids :某次下单,具体对哪些商品下的订单

pay_category_ids :代表的是,对某个订单,或者某几个订单,进行了一次支付的行为,对应了哪些品类

pay_product_ids:代表的,支付行为下,对应的哪些具体的商品

user_visit_action表,其实就是放,比如说网站,或者是app,每天的点击流的数据。可以理解为,用户对网站/app每点击一下,就会代表在这个表里面的一条数据。

这个表在任何企业中,都可能是不同的。为什么呢因为我们之前讲解过日志采集流程。实际上,用户在网页上真正的执行某些行为时,那么会往服务器端发送日志。但是日志的格式绝对不是这个格式的哦。实际上,我们之前也提过,企业中会有专门的大数据ETL开发工程师,对原始的日志数据,开发大量的ETL,对数据进行各种转换和抽取。然后可能会为了各种业务的需要,形成大量的各种各样的结构的表,可能已经进行了处理或者是某些聚合的操作。

我们做任何大数据系统/平台类的项目,首先第一步,就是要做数据调研。也就是分析平台要基于的底层的基础数据。分析表结构,弄清楚表之间的关系。表中的数据的更新粒度,一个小时更新一次,还是一天更新一次。会不会有脏数据。每天什么时候数据能够进来。

表名:user_info(Hive表)

user_id:其实就是每一个用户的唯一标识,通常是自增长的Long类型,BigInt类型

username:是每个用户的登录名

name:每个用户自己的昵称、或者是真实姓名

age:用户的年龄

professional:用户的职业

city:用户所在的城市

user_info表,实际上,就是一张最普通的用户基础信息表;这张表里面,其实就是放置了网站/app所有的注册用户的信息。那么我们这里也是对用户信息表,进行了一定程度的简化。比如略去了手机号等这种数据。因为我们这个项目里不需要使用到某些数据。那么我们就保留一些最重要的数据,即可。

表名:task(MySQL表)

task_id:表的主键

task_name:任务名称

create_time:创建时间

start_time:开始运行的时间

finish_time:结束运行的时间

task_type:任务类型,就是说,在一套大数据平台中,肯定会有各种不同类型的统计分析任务,比如说用户访问session分析任务,页面单跳转化率统计任务;所以这个字段就标识了每个任务的类型

task_status:任务状态,任务对应的就是一次Spark作业的运行,这里就标识了,Spark作业是新建,还没运行,还是正在运行,还是已经运行完毕

task_param:最最重要,用来使用JSON的格式,来封装用户提交的任务对应的特殊的筛选参数

task表,其实是用来保存平台的使用者,通过J2EE系统,提交的基于特定筛选参数的分析任务,的信息,就会通过J2EE系统保存到task表中来。之所以使用MySQL表,是因为J2EE系统是要实现快速的实时插入和查询的。

需求分析

1、按条件筛选session

2、统计出符合条件的session中,访问时长在1s~3s、4s~6s、7s~9s、10s~30s、30s~60s、1m~3m、3m~10m、10m~30m、30m以上各个范围内的session占比;访问步长在1~3、4~6、7~9、10~30、30~60、60以上各个范围内的session占比

3、在符合条件的session中,按照时间比例随机抽取1000个session

4、在符合条件的session中,获取点击、下单和支付数量排名前10的品类

5、对于排名前10的品类,分别获取其点击次数排名前10的session

上述第一个步骤就是数据调研(就是对底层基于的基础数据的表结构进行调研、分析和研究);

这里相当于是项目开发流程的第二个步骤,就是需求分析(在实际的企业中,需求分析,可能会比这里更加复杂很多;在互联网企业中,需求分析,首先就是要跟PM,就是产品经理,也就是负责设计你开发的大数据平台产品的人,去大量开会,去沟通需求的细节;此外,你自己还得根据产品经理编写的需求文档,可能还会自己设计一些产品原型图出来,让你看,去看,去研究;

第三点,可能还需要作为一个项目的技术leader,去跟你的项目组内的成员,去讲解和讨论需求,要确保组内所有成员,都对需求清晰的理解了)

1、按条件筛选session

搜索过某些关键词的用户、访问时间在某个时间段内的用户、年龄在某个范围内的用户、职业在某个范围内的用户、所在某个城市的用户,发起的session。找到对应的这些用户的session,也就是我们所说的第一步,按条件筛选session。

这个功能,就最大的作用就是灵活。也就是说,可以让使用者,对感兴趣的和关系的用户群体,进行后续各种复杂业务逻辑的统计和分析,那么拿到的结果数据,就是只是针对特殊用户群体的分析结果;而不是对所有用户进行分析的泛泛的分析结果。比如说,现在某个企业高层,就是想看到用户群体中,28~35岁的,老师职业的群体,对应的一些统计和分析的结果数据,从而辅助高管进行公司战略上的决策制定。

2、统计出符合条件的session中,访问时长在1s~3s、4s~6s、7s~9s、10s~30s、30s~60s、1m~3m、3m~10m、10m~30m、30m以上各个范围内的session占比;访问步长在1~3、4~6、7~9、10~30、30~60、60以上各个范围内的session占比

session访问时长,也就是说一个session对应的开始的action,到结束的action,之间的时间范围;还有,就是访问步长,指的是,一个session执行期间内,依次点击过多少个页面,比如说,一次session,维持了1分钟,那么访问时长就是1m,然后在这1分钟内,点击了10个页面,那么session的访问步长,就是10.

比如说,符合第一步筛选出来的session的数量大概是有1000万个。那么里面,我们要计算出,访问时长在1s~3s内的session的数量,并除以符合条件的总session数量(比如1000万),比如是100万/1000万,那么1s~3s内的session占比就是10%。依次类推,这里说的统计,就是这个意思。

这个功能的作用,其实就是,可以让人从全局的角度看到,符合某些条件的用户群体,使用我们的产品的一些习惯。比如大多数人,到底是会在产品中停留多长时间,大多数人,会在一次使用产品的过程中,访问多少个页面。那么对于使用者来说,有一个全局和清晰的认识。

3、在符合条件的session中,按照时间比例随机抽取1000个session

这个按照时间比例是什么意思呢随机抽取本身是很简单的,但是按照时间比例,就很复杂了。比如说,这一天总共有1000万的session。那么我现在总共要从这1000万session中,随机抽取出来1000个session。但是这个随机不是那么简单的。需要做到如下几点要求:首先,如果这一天的12:00~13:00的session数量是100万,那么这个小时的session占比就是1/10,那么这个小时中的100万的session,我们就要抽取1/10 * 1000 = 100个。然后再从这个小时的100万session中,随机抽取出100个session。以此类推,其他小时的抽取也是这样做。

这个功能的作用,是说,可以让使用者,能够对于符合条件的session,按照时间比例均匀的随机采样出1000个session,然后观察每个session具体的点击流/行为,比如先进入了首页、然后点击了食品品类、然后点击了雨润火腿肠商品、然后搜索了火腿肠罐头的关键词、接着对王中王火腿肠下了订单、最后对订单做了支付。

之所以要做到按时间比例随机采用抽取,就是要做到,观察样本的公平性。

4、在符合条件的session中,获取点击、下单和支付数量排名前10的品类

什么意思呢,对于这些session,每个session可能都会对一些品类的商品进行点击、下单和支付等等行为。那么现在就需要获取这些session点击、下单和支付数量排名前10的最热门的品类。也就是说,要计算出所有这些session对各个品类的点击、下单和支付的次数,然后按照这三个属性进行排序,获取前10个品类。

这个功能,很重要,就可以让我们明白,就是符合条件的用户,他最感兴趣的商品是什么种类。这个可以让后端的分析人员,清晰地了解到不同层次、不同类型的用户的心理和喜好。

5、对于排名前10的品类,分别获取其点击次数排名前10的session

这个就是说,对于top10的品类,每一个都要获取对它点击次数排名前10的session。

这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的session的行为。

数据表设计

在进行完了数据调研、需求分析、技术实现方案,进行数据设计。数据设计,往往包含两个环节,第一个呢,就是说,我们的上游数据,就是数据调研环节看到的项目基于的基础数据,是否要针对其开发一些Hive ETL,对数据进行进一步的处理和转换,从而让我们能够更加方便的和快速的去计算和执行spark作业;第二个,就是要设计spark作业要保存结果数据的业务表的结构,从而让J2EE平台可以使用业务表中的数据,来为使用者展示任务执行结果。

在本项目中,我们所有的数据设计环节,只会涉及第二个,不会涉及第一个。

设计MySQL中的业务表的结构。

第一表:session_aggr_stat表,存储第一个功能,session聚合统计的结果

CREATE TABLE `session_aggr_stat` (

`task_id` int(11) NOT NULL,

`session_count` int(11) DEFAULT NULL,

`1s_3s` double DEFAULT NULL,

`4s_6s` double DEFAULT NULL,

`7s_9s` double DEFAULT NULL,

`10s_30s` double DEFAULT NULL,

`30s_60s` double DEFAULT NULL,

`1m_3m` double DEFAULT NULL,

`3m_10m` double DEFAULT NULL,

`10m_30m` double DEFAULT NULL,

`30m` double DEFAULT NULL,

`1_3` double DEFAULT NULL,

`4_6` double DEFAULT NULL,

`7_9` double DEFAULT NULL,

`10_30` double DEFAULT NULL,

`30_60` double DEFAULT NULL,

`60` double DEFAULT NULL,

PRIMARY KEY (`task_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

第二个表:session_random_extract表,存储我们的按时间比例随机抽取功能抽取出来的1000个session

CREATE TABLE `session_random_extract` (

`task_id` int(11) NOT NULL,

`session_id` varchar(255) DEFAULT NULL,

`start_time` varchar(50) DEFAULT NULL,

`end_time` varchar(50) DEFAULT NULL,

`search_keywords` varchar(255) DEFAULT NULL,

PRIMARY KEY (`task_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

第三个表:top10_category表,存储按点击、下单和支付排序出来的top10品类数据

CREATE TABLE `top10_category` (

`task_id` int(11) NOT NULL,

`category_id` int(11) DEFAULT NULL,

`click_count` int(11) DEFAULT NULL,

`order_count` int(11) DEFAULT NULL,

`pay_count` int(11) DEFAULT NULL,

PRIMARY KEY (`task_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

第四个表:top10_category_session表,存储top10每个品类的点击top10的session

CREATE TABLE `top10_category_session` (

`task_id` int(11) NOT NULL,

`category_id` int(11) DEFAULT NULL,

`session_id` varchar(255) DEFAULT NULL,

`click_count` int(11) DEFAULT NULL,

PRIMARY KEY (`task_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

最后一张表:session_detail,用来存储随机抽取出来的session的明细数据、top10品类的session的明细数据

CREATE TABLE `session_detail` (

`task_id` int(11) NOT NULL,

`user_id` int(11) DEFAULT NULL,

`session_id` varchar(255) DEFAULT NULL,

`page_id` int(11) DEFAULT NULL,

`action_time` varchar(255) DEFAULT NULL,

`search_keyword` varchar(255) DEFAULT NULL,

`click_category_id` int(11) DEFAULT NULL,

`click_product_id` int(11) DEFAULT NULL,

`order_category_ids` varchar(255) DEFAULT NULL,

`order_product_ids` varchar(255) DEFAULT NULL,

`pay_category_ids` varchar(255) DEFAULT NULL,

`pay_product_ids` varchar(255) DEFAULT NULL,

PRIMARY KEY (`task_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

额外的一张表:task表,用来存储J2EE平台插入其中的任务的信息

CREATE TABLE `task` (

`task_id` int(11) NOT NULL AUTO_INCREMENT,

`task_name` varchar(255) DEFAULT NULL,

`create_time` varchar(255) DEFAULT NULL,

`start_time` varchar(255) DEFAULT NULL,

`finish_time` varchar(255) DEFAULT NULL,

`task_type` varchar(255) DEFAULT NULL,

`task_status` varchar(255) DEFAULT NULL,

`task_param` text,

PRIMARY KEY (`task_id`)

) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8

在数据设计以后,就正式进入一个漫长的环节,就是编码实现阶段,coding阶段。在编码实现阶段,每开发完一个功能,其实都会走后续的两个环节,就是本地测试和生产环境测试。

接下来,就是在完成了数据调研、需求分析、技术方案设计、数据设计以后,正式进入编码实现和功能测试阶段。最后才是性能调优阶段。

按session粒度进行分组聚合

用户访问session分析Spark作业

接收用户创建的分析任务,用户可能指定的条件如下:

1、时间范围:起始日期~结束日期

2、性别:男或女

3、年龄范围

4、职业:多选

5、城市:多选

6、搜索词:多个搜索词,只要某个session中的任何一个action搜索过指定的关键词,那么session就符合条件

7、点击品类:多个品类,只要某个session中的任何一个action点击过某个品类,那么session就符合条件

我们的spark作业如何接受用户创建的任务

J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,任务参数以JSON格式封装在task_param字段中

接着J2EE平台会执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本

spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数,传递给Spark作业的main函数

参数就封装在main函数的args数组中

这是spark本身提供的特性

1 首先得在数据库mysql中查询出来指定的任务,并获取任务的查询参数task_param

2 再从user_visit_action表中查询出指定日期范围内的行为数据

3 将行为数据按照session_id使用groupByKey进行分组

此时数据粒度就是session粒度了,然后与用户信息进行join

此时得到的数据就是session粒度的信息并且包含着session对应的user的信息

public void uservisit() {

String[] args = new String[]{"1"};

//构建Spark上下文

SparkConf conf = new SparkConf().setAppName(SparkContantsParam.USER_VISIT_APPNAME).setMaster(SparkContantsParam.USER_VISIT_MASTER);

JavaSparkContext jsc = new JavaSparkContext(conf);

SQLContext sqlContext = getSqlContext(jsc.sc());

//生成模拟数据

mockData(jsc ,sqlContext);

//从主函数入口args获取taskId

Long taskId = ParamUtils.getTaskIdFromArgs(args);

List tasks = taskDao.findTaskById(Math.toIntExact(taskId));

//得到task

Task task = null;

if(!tasks.isEmpty()){

task = tasks.get(0);

}

// 首先得查询出来指定的任务,并获取任务的查询参数

JSONObject paramJson = JSONObject.parseObject(task.getTaskParam());

//如果要进行session粒度的聚合,首先要从user_visit_action表中查询出指定日期范围内的行为数据

JavaRDD actionRDD = getActionRDDByDateRange(sqlContext,paramJson);

System.out.println(actionRDD.count());

//首先将行为数据按照session_id使用groupByKey进行分组

//此时数据粒度就是session粒度了,然后与用户信息进行join

//此时得到的数据就是session粒度的信息并且包含着session对应的user的信息

JavaPairRDD session2AggrInfo = aggregateBySession(sqlContext,actionRDD);

System.out.println(session2AggrInfo.count());

for(Tuple2 tuple: session2AggrInfo.take(10)){

System.out.println(tuple._2);

}

//接着我们就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤

//相当于我们自己编写的算子,是要访问外面的任务参数对象的

JavaPairRDD filtersession2AggrInfo = filterSession(session2AggrInfo,paramJson);

System.out.println(filtersession2AggrInfo.count());

for(Tuple2 tuple: filtersession2AggrInfo.take(10)){

System.out.println(tuple._2);

}

//关闭Spark上下文

jsc.close();

}

//首先要从user_visit_action表中查询出指定日期范围内的行为数据

private static JavaRDD getActionRDDByDateRange(SQLContext sqlContext, JSONObject paramJson) {

String startDate = ParamUtils.getParam(paramJson, SparkContantsParam.FIELD_STARTDATE);

String endDate = ParamUtils.getParam(paramJson,SparkContantsParam.FIELD_ENDDATE);

String sql = "select * from user_visit_action where date >='"+startDate+

"' and date <='"+endDate+"'";

DataFrame dataFrame = sqlContext.sql(sql);

return dataFrame.javaRDD();

}

//首先将行为数据按照session_id使用groupByKey进行分组

//此时数据粒度就是session粒度了,然后与用户信息进行join

//此时得到的数据就是session粒度的信息并且包含着session对应的user的信息

private static JavaPairRDD aggregateBySession(SQLContext sqlContext, JavaRDD actionRDD) {

//现在的actionRDD中包含着多个Row,每个Row都是一行用户的行为记录

//现在需要将这些记录映射成的格式

JavaPairRDD session2ActionRDD = actionRDD.mapToPair(row -> {

//取出sessionId与Row映射

return new Tuple2<>(row.getString(2), row);

});

//对这些行为数据按照sessionId进行分组

JavaPairRDD> sessionid2ActionRDDs = session2ActionRDD.groupByKey();

//对每个session进行分组,将session中所有的搜索词和点击品类聚合起来

//到此为止获取数据格式为:

JavaPairRDD userid2PartAggrRDD = sessionid2ActionRDDs.mapToPair(tuple -> {

String sessionid = tuple._1;

Iterator rows = tuple._2.iterator();

StringBuffer searchKeyWords = new StringBuffer();

StringBuffer clickCategoryIds = new StringBuffer();

Long userId = null;

while (rows.hasNext()) {

Row row = rows.next();

if (userId == null) {

userId = row.getLong(1);

}

String searchKeyWord = row.getString(5);

long clickCategory = row.getLong(6);

//这里对数据进行一下说明

//并不是所有数据都有searchKeyWord和clickCategory这两个字段的

//只有当进行了搜索才会有searchKeyWord记录

//只有进行了点击品类查询才会有clickCategory的记录

//所以任何一行数据都不可能两个行为都有,并且数据可能会出现null值

//我们决定是否将搜索词或者点击品类id拼接到字符串中

//首先,该值不能为null

//其次,字符串中还没有包含该值

if (StringUtils.isNotEmpty(searchKeyWord)) {

if (!searchKeyWords.toString().contains(searchKeyWord)) {

searchKeyWords.append(searchKeyWord + ",");

}

}

if (clickCategory != 0) {

if (!clickCategoryIds.toString().contains(clickCategory + "")) {

clickCategoryIds.append(clickCategory + ",");

}

}

}

String searchKW = StringUtils.trimComma(searchKeyWords.toString());

String clickCategoryid = StringUtils.trimComma(clickCategoryIds.toString());

// 思考一下

// 我们返回的数据格式,即使

// 但是,这一步聚合完了以后,其实,我们是还需要将每一行数据,跟对应的用户信息进行聚合

// 问题就来了,如果是跟用户信息进行聚合的话,那么key,就不应该是sessionid

// 就应该是userid,才能够跟格式的用户信息进行聚合

// 如果我们这里直接返回,还得再做一次mapToPair算子

// 将RDD映射成的格式,那么就多此一举

// 所以,我们这里其实可以直接,返回的数据格式,就是

// 然后跟用户信息join的时候,将partAggrInfo关联上userInfo

// 然后再直接将返回的Tuple的key设置成sessionid

// 最后的数据格式,还是

// 聚合数据,用什么样的格式进行拼接

// 我们这里统一定义,使用key=value|key=value

String partAggrInfo = SparkContantsParam.FIELD_SESSION_ID + "=" + sessionid + "|"

+ SparkContantsParam.FIELD_SEARCH_KEYWORD + "=" + searchKW + "|"

+ SparkContantsParam.FIELD_CLICK_CATEGORY_ID + "=" + clickCategoryid;

return new Tuple2(userId, partAggrInfo);

});

//查询所有用户信息,并且映射成的格式

String sql = "select * from user_info";

JavaRDD userRdds = sqlContext.sql(sql).javaRDD();

JavaPairRDD userid2RDD = userRdds.mapToPair(row -> {

return new Tuple2(row.getLong(0), row);

});

//将用户行为数据与用户信息进行join

JavaPairRDD> userid2FullInfo = userid2PartAggrRDD.join(userid2RDD);

JavaPairRDD sessionid2FullInfo = userid2FullInfo.mapToPair(tuple -> {

String partinfo = tuple._2._1;

Row userinfo = tuple._2._2;

String sessionid = StringUtils.getFieldFromConcatString(partinfo, "\\|", SparkContantsParam.FIELD_SESSION_ID);

int age = userinfo.getInt(3);

String professional = userinfo.getString(4);

String city = userinfo.getString(5);

String sex = userinfo.getString(6);

//拼接上用户的年龄,职业,城市,性别,形成完整信息

String fullinfo = partinfo + "|"

+ SparkContantsParam.FIELD_AGE + "=" + age + "|"

+ SparkContantsParam.FIELD_PROFESSIONAL + "=" + professional + "|"

+ SparkContantsParam.FIELD_CITY + "=" + city + "|"

+ SparkContantsParam.FIELD_SEX + "=" + sex;

//将sessionid和完成信息返回

return new Tuple2(sessionid, fullinfo);

});

return sessionid2FullInfo;

}

按筛选参数对session粒度聚合数据进行过滤

package com.spark.sparksession.sparkjob;

import com.alibaba.fastjson.JSONObject;

import com.spark.sparksession.dao.TaskDao;

import com.spark.sparksession.data.MockData;

import com.spark.sparksession.domain.Task;

import com.spark.sparksession.util.ParamUtils;

import com.spark.sparksession.util.SparkContantsParam;

import com.spark.sparksession.util.StringUtils;

import com.spark.sparksession.util.ValidUtils;

import org.apache.spark.SparkConf;

import org.apache.spark.SparkContext;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SQLContext;

import org.apache.spark.sql.hive.HiveContext;

import org.springframework.stereotype.Component;

import scala.Tuple2;

import javax.annotation.Resource;

import java.util.Iterator;

import java.util.List;

@Component

public class UserVisitSessionAnalyzeSpark {

@Resource

private TaskDao taskDao;

public void uservisit() {

String[] args = new String[]{"1"};

//构建Spark上下文

SparkConf conf = new SparkConf().setAppName(SparkContantsParam.USER_VISIT_APPNAME).setMaster(SparkContantsParam.USER_VISIT_MASTER);

JavaSparkContext jsc = new JavaSparkContext(conf);

SQLContext sqlContext = getSqlContext(jsc.sc());

//生成模拟数据

mockData(jsc ,sqlContext);

//从主函数入口args获取taskId

Long taskId = ParamUtils.getTaskIdFromArgs(args);

List tasks = taskDao.findTaskById(Math.toIntExact(taskId));

//得到task

Task task = null;

if(!tasks.isEmpty()){

task = tasks.get(0);

}

// 首先得查询出来指定的任务,并获取任务的查询参数

JSONObject paramJson = JSONObject.parseObject(task.getTaskParam());

//如果要进行session粒度的聚合,首先要从user_visit_action表中查询出指定日期范围内的行为数据

JavaRDD actionRDD = getActionRDDByDateRange(sqlContext,paramJson);

System.out.println(actionRDD.count());

//首先将行为数据按照session_id使用groupByKey进行分组

//此时数据粒度就是session粒度了,然后与用户信息进行join

//此时得到的数据就是session粒度的信息并且包含着session对应的user的信息

JavaPairRDD session2AggrInfo = aggregateBySession(sqlContext,actionRDD);

System.out.println(session2AggrInfo.count());

for(Tuple2 tuple: session2AggrInfo.take(10)){

System.out.println(tuple._2);

}

//接着我们就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤

//相当于我们自己编写的算子,是要访问外面的任务参数对象的

JavaPairRDD filtersession2AggrInfo = filterSession(session2AggrInfo,paramJson);

System.out.println(filtersession2AggrInfo.count());

for(Tuple2 tuple: filtersession2AggrInfo.take(10)){

System.out.println(tuple._2);

}

//关闭Spark上下文

jsc.close();

}

//首先要从user_visit_action表中查询出指定日期范围内的行为数据

private static JavaRDD getActionRDDByDateRange(SQLContext sqlContext, JSONObject paramJson) {

如上

}

//首先将行为数据按照session_id使用groupByKey进行分组

//此时数据粒度就是session粒度了,然后与用户信息进行join

//此时得到的数据就是session粒度的信息并且包含着session对应的user的信息

private static JavaPairRDD aggregateBySession(SQLContext sqlContext, JavaRDD actionRDD) {

如上

}

//按照使用者(Task表中的task_param)指定的筛选参数进行数据过滤

private static JavaPairRDD filterSession(JavaPairRDD session2AggrInfo, JSONObject paramJson) {

String categoryIds = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_CATEGORY_IDS);

String cities = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_CITIES);

String startAge = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_START_AGE);

String endAge = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_END_AGE);

String professionals = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_PROFESSIONALS);

String sex = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_SEX);

String keyWords = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_KEYWORDS);

String _param = (StringUtils.isNotEmpty(categoryIds) SparkContantsParam.PARAM_CATEGORY_IDS+"="+categoryIds + "|":"")

+ (StringUtils.isNotEmpty(cities)SparkContantsParam.PARAM_CITIES+"="+cities + "|" : "")

+ (StringUtils.isNotEmpty(startAge)SparkContantsParam.PARAM_START_AGE+"="+startAge + "|" : "")

+ (StringUtils.isNotEmpty(endAge)SparkContantsParam.PARAM_END_AGE+"="+endAge + "|" : "")

+ (StringUtils.isNotEmpty(professionals)SparkContantsParam.PARAM_PROFESSIONALS+"="+professionals + "|" : "")

+ (StringUtils.isNotEmpty(sex)SparkContantsParam.PARAM_SEX+"="+sex + "|" : "")

+ (StringUtils.isNotEmpty(keyWords)SparkContantsParam.PARAM_KEYWORDS+"="+keyWords : "");

if(_param.endsWith("\\|")){

_param = _param.substring(0,_param.length()-1);

}

final String param = _param;

JavaPairRDD filter = session2AggrInfo.filter(tuple -> {

// 首先,从tuple中,获取聚合数据

String partInfo = tuple._2;

// 接着,依次按照筛选条件进行过滤

// 按照年龄范围进行过滤(startAge、endAge)

if (!ValidUtils.between(partInfo, SparkContantsParam.FIELD_AGE, param, SparkContantsParam.PARAM_START_AGE, SparkContantsParam.PARAM_END_AGE)) {

return false;

}

// 按照职业范围进行过滤(professionals)

// 互联网,IT,软件

// 互联网

if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_PROFESSIONAL, param, SparkContantsParam.PARAM_PROFESSIONALS)) {

return false;

}

// 按照城市范围进行过滤(cities)

// 北京,上海,广州,深圳

// 成都

if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_CITY, param, SparkContantsParam.PARAM_CITIES)) {

return false;

}

// 按照搜索词进行过滤

// 我们的session可能搜索了 火锅,蛋糕,烧烤

// 我们的筛选条件可能是 火锅,串串香,iphone手机

// 那么,in这个校验方法,主要判定session搜索的词中,有任何一个,与筛选条件中

// 任何一个搜索词相当,即通过

if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_SEARCH_KEYWORD, param, SparkContantsParam.PARAM_KEYWORDS)) {

return false;

}

// 按照点击品类id进行过滤

if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_CLICK_CATEGORY_ID, param, SparkContantsParam.PARAM_CATEGORY_IDS)) {

return false;

}

// 按照性别进行过滤

// 男/女

// 男,女

if (!ValidUtils.equal(partInfo, SparkContantsParam.FIELD_SEX, param, SparkContantsParam.PARAM_SEX)) {

return false;

}

return true;

});

return filter;

}

//生成模拟数据,只有在本地才会生成模拟数据

private static void mockData(JavaSparkContext jsc, SQLContext sqlContext) {

if(SparkContantsParam.USER_VISIT_LOCAL){

MockData.mock(jsc,sqlContext);

}

}

//获取SqlContext,如果在本地就生成SqlContext对象,如果在线上就生成HiveContext对象

private static SQLContext getSqlContext(SparkContext sc) {

if(SparkContantsParam.USER_VISIT_LOCAL){

return new SQLContext(sc);

}else{

return new HiveContext(sc);

}

}

}

自定义Accumulator

session聚合统计:

统计出来之前通过条件过滤的session,访问时长在0s~3s的session的数量,占总session数量的比例;4s~6s。。。。;

访问步长在1~3的session的数量,占总session数量的比例;4~6。。。;

Accumulator 1s_3s = sc.accumulator(0L);

。。

。。

。。

十几个Accumulator

可以对过滤以后的session,调用foreach也可以,遍历所有session;计算每个session的访问时长和访问步长;

访问时长:把session的最后一个action的时间,减去第一个action的时间

访问步长:session的action数量

计算出访问时长和访问步长以后,根据对应的区间,找到对应的Accumulator,1s_3s.add(1L)

同时每遍历一个session,就可以给总session数量对应的Accumulator,加1

最后用各个区间的session数量,除以总session数量,就可以计算出各个区间的占比了

这种传统的实现方式,有什么不好

最大的不好,就是Accumulator太多了,不便于维护

首先第一,很有可能,在写后面的累加代码的时候,比如找到了一个4s~6s的区间的session,但是却代码里面不小心,累加到7s~9s里面去了;

第二,当后期,项目如果要出现一些逻辑上的变更,比如说,session数量的计算逻辑,要改变,就得更改所有Accumulator对应的代码;或者说,又要增加几个范围,那么又要增加多个Accumulator,并且修改对应的累加代码;维护成本,相当之高(甚至可能,修改一个小功能,或者增加一个小功能,耗费的时间,比做一个新项目还要多;甚至于,还修改出了bug,那就耗费更多的时间)

所以,我们这里的设计,不打算采用传统的方式,用十几个,甚至二十个Accumulator,因为维护成本太高

这里的实现思路是,我们自己自定义一个Accumulator,实现较为复杂的计算逻辑,一个Accumulator维护了所有范围区间的数量的统计逻辑

低耦合,如果说,session数量计算逻辑要改变,那么不用变更session遍历的相关的代码;只要维护一个Accumulator里面的代码即可;

如果计算逻辑后期变更,或者加了几个范围,那么也很方便,不用多加好几个Accumulator,去修改大量的代码;只要维护一个Accumulator里面的代码即可;

维护成本,大大降低

自定义Accumulator,也是Spark Core中,属于比较高端的一个技术

使用自定义Accumulator,大家就可以任意的实现自己的复杂分布式计算的逻辑

如果说,你的task,分布式,进行复杂计算逻辑,那么是很难实现的(借助于redis,维护中间状态,借助于zookeeper去实现分布式锁)

但是,使用自定义Accumulator,可以更方便进行中间状态的维护,而且不用担心并发和锁的问题

package com.ibeifeng.sparkproject.spark;

import org.apache.spark.AccumulatorParam;

import com.ibeifeng.sparkproject.constant.Constants;

import com.ibeifeng.sparkproject.util.StringUtils;

/**

* session聚合统计Accumulator

*

* 大家可以看到

* 其实使用自己定义的一些数据格式,比如String,甚至说,我们可以自己定义model,自己定义的类(必须可序列化)

* 然后呢,可以基于这种特殊的数据格式,可以实现自己复杂的分布式的计算逻辑

* 各个task,分布式在运行,可以根据你的需求,task给Accumulator传入不同的值

* 根据不同的值,去做复杂的逻辑

*

* Spark Core里面很实用的高端技术

*

* @author Administrator

*

*/

public class SessionAggrStatAccumulator implements AccumulatorParam {

private static final long serialVersionUID = 6311074555136039130L;

/**

* zero方法,其实主要用于数据的初始化

* 那么,我们这里,就返回一个值,就是初始化中,所有范围区间的数量,都是0

* 各个范围区间的统计数量的拼接,还是采用一如既往的key=value|key=value的连接串的格式

*/

@Override

public String zero(String v) {

return Constants.SESSION_COUNT + "=0|"

+ Constants.TIME_PERIOD_1s_3s + "=0|"

+ Constants.TIME_PERIOD_4s_6s + "=0|"

+ Constants.TIME_PERIOD_7s_9s + "=0|"

+ Constants.TIME_PERIOD_10s_30s + "=0|"

+ Constants.TIME_PERIOD_30s_60s + "=0|"

+ Constants.TIME_PERIOD_1m_3m + "=0|"

+ Constants.TIME_PERIOD_3m_10m + "=0|"

+ Constants.TIME_PERIOD_10m_30m + "=0|"

+ Constants.TIME_PERIOD_30m + "=0|"

+ Constants.STEP_PERIOD_1_3 + "=0|"

+ Constants.STEP_PERIOD_4_6 + "=0|"

+ Constants.STEP_PERIOD_7_9 + "=0|"

+ Constants.STEP_PERIOD_10_30 + "=0|"

+ Constants.STEP_PERIOD_30_60 + "=0|"

+ Constants.STEP_PERIOD_60 + "=0";

}

/**

* addInPlace和addAccumulator

* 可以理解为是一样的

*

* 这两个方法,其实主要就是实现,v1可能就是我们初始化的那个连接串

* v2,就是我们在遍历session的时候,判断出某个session对应的区间,然后会用Constants.TIME_PERIOD_1s_3s

* 所以,我们,要做的事情就是

* 在v1中,找到v2对应的value,累加1,然后再更新回连接串里面去

*

*/

@Override

public String addInPlace(String v1, String v2) {

return add(v1, v2);

}

@Override

public String addAccumulator(String v1, String v2) {

return add(v1, v2);

}

/**

* session统计计算逻辑

* @param v1 连接串

* @param v2 范围区间

* @return 更新以后的连接串

*/

private String add(String v1, String v2) {

// 校验:v1为空的话,直接返回v2

if(StringUtils.isEmpty(v1)) {

return v2;

}

// 使用StringUtils工具类,从v1中,提取v2对应的值,并累加1

String oldValue = StringUtils.getFieldFromConcatString(v1, "\\|", v2);

if(oldValue != null) {

// 将范围区间原有的值,累加1

int newValue = Integer.valueOf(oldValue) + 1;

// 使用StringUtils工具类,将v1中,v2对应的值,设置成新的累加后的值

return StringUtils.setFieldInConcatString(v1, "\\|", v2, String.valueOf(newValue));

}

return v1;

}

}

重构实现思路和重构统计聚合

session聚合统计(统计出访问时长和访问步长,各个区间的session数量占总session数量的比例)

如果不进行重构,直接来实现,思路:

1、actionRDD,映射成的格式

2、按sessionid聚合,计算出每个session的访问时长和访问步长,生成一个新的RDD

3、遍历新生成的RDD,将每个session的访问时长和访问步长,去更新自定义Accumulator中的对应的值

4、使用自定义Accumulator中的统计值,去计算各个区间的比例

5、将最后计算出来的结果,写入MySQL对应的表中

普通实现思路的问题:

1、为什么还要用actionRDD,去映射其实我们之前在session聚合的时候,映射已经做过了。多此一举

2、是不是一定要,为了session的聚合这个功能,单独去遍历一遍session其实没有必要,已经有session数据

之前过滤session的时候,其实,就相当于,是在遍历session,那么这里就没有必要再过滤一遍了

重构实现思路:

1、不要去生成任何新的RDD(处理上亿的数据)

2、不要去单独遍历一遍session的数据(处理上千万的数据)

3、可以在进行session聚合的时候,就直接计算出来每个session的访问时长和访问步长

4、在进行过滤的时候,本来就要遍历所有的聚合session信息,此时,就可以在某个session通过筛选条件后

将其访问时长和访问步长,累加到自定义的Accumulator上面去

5、就是两种截然不同的思考方式,和实现方式,在面对上亿,上千万数据的时候,甚至可以节省时间长达

半个小时,或者数个小时

开发Spark大型复杂项目的一些经验准则:

1、尽量少生成RDD

2、尽量少对RDD进行算子操作,如果有可能,尽量在一个算子里面,实现多个需要做的功能

3、尽量少对RDD进行shuffle算子操作,比如groupByKey、reduceByKey、sortByKey(map、mapToPair)

shuffle操作,会导致大量的磁盘读写,严重降低性能

有shuffle的算子,和没有shuffle的算子,甚至性能,会达到几十分钟,甚至数个小时的差别

有shfufle的算子,很容易导致数据倾斜,一旦数据倾斜,简直就是性能杀手(完整的解决方案)

4、无论做什么功能,性能第一

在传统的J2EE或者.NET后者PHP,软件/系统/网站开发中,我认为是架构和可维护性,可扩展性的重要

程度,远远高于了性能,大量的分布式的架构,设计模式,代码的划分,类的划分(高并发网站除外)

在大数据项目中,比如MapReduce、Hive、Spark、Storm,我认为性能的重要程度,远远大于一些代码

的规范,和设计模式,代码的划分,类的划分;大数据,大数据,最重要的,就是性能

主要就是因为大数据以及大数据项目的特点,决定了,大数据的程序和项目的速度,都比较慢

如果不优先考虑性能的话,会导致一个大数据处理程序运行时间长度数个小时,甚至数十个小时

此时,对于用户体验,简直就是一场灾难

所以,推荐大数据项目,在开发和代码的架构中,优先考虑性能;其次考虑功能代码的划分、解耦合

我们如果采用第一种实现方案,那么其实就是代码划分(解耦合、可维护)优先,设计优先

如果采用第二种方案,那么其实就是性能优先

重构代码实现

在对session的统计聚合数据的方法中加入以下代码

//首先将行为数据按照session_id使用groupByKey进行分组

//此时数据粒度就是session粒度了,然后与用户信息进行join

//此时得到的数据就是session粒度的信息并且包含着session对应的user的信息

private static JavaPairRDD aggregateBySession(SQLContext sqlContext, JavaRDD actionRDD) {

//现在的actionRDD中包含着多个Row,每个Row都是一行用户的行为记录

//现在需要将这些记录映射成的格式

JavaPairRDD session2ActionRDD = actionRDD.mapToPair(row -> {

//取出sessionId与Row映射

return new Tuple2<>(row.getString(2), row);

});

//对这些行为数据按照sessionId进行分组

JavaPairRDD> sessionid2ActionRDDs = session2ActionRDD.groupByKey();

//对每个session进行分组,将session中所有的搜索词和点击品类聚合起来

//到此为止获取数据格式为:

JavaPairRDD userid2PartAggrRDD = sessionid2ActionRDDs.mapToPair(tuple -> {

String sessionid = tuple._1;

Iterator rows = tuple._2.iterator();

StringBuffer searchKeyWords = new StringBuffer();

StringBuffer clickCategoryIds = new StringBuffer();

Long userId = null;

Date startDate = null;

Date endDate = null;

int stepLength = 0;

while (rows.hasNext()) {

Row row = rows.next();

if (userId == null) {

userId = row.getLong(1);

}

String searchKeyWord = row.getString(5);

long clickCategory = row.getLong(6);

//这里对数据进行一下说明

//并不是所有数据都有searchKeyWord和clickCategory这两个字段的

//只有当进行了搜索才会有searchKeyWord记录

//只有进行了点击品类查询才会有clickCategory的记录

//所以任何一行数据都不可能两个行为都有,并且数据可能会出现null值

//我们决定是否将搜索词或者点击品类id拼接到字符串中

//首先,该值不能为null

//其次,字符串中还没有包含该值

if (StringUtils.isNotEmpty(searchKeyWord)) {

if (!searchKeyWords.toString().contains(searchKeyWord)) {

searchKeyWords.append(searchKeyWord + ",");

}

}

if (clickCategory != 0) {

if (!clickCategoryIds.toString().contains(clickCategory + "")) {

clickCategoryIds.append(clickCategory + ",");

}

}

//计算session访问步长(访问次数)

stepLength++;

String actionTime = row.getString(4);

//获取每次的访问时间

Date actionDate = DateUtils.parseTime(actionTime);

//计算出开始访问时间和结束访问时间

if(startDate == null){

startDate = actionDate;

}

if(endDate == null){

endDate = actionDate;

}

if(actionDate.before(startDate)){

startDate = actionDate;

}

if(actionDate.after(endDate)){

endDate = actionDate;

}

}

String searchKW = StringUtils.trimComma(searchKeyWords.toString());

String clickCategoryid = StringUtils.trimComma(clickCategoryIds.toString());

//计算出session的总访问时间

long visiTime = (endDate.getTime() - startDate.getTime())/1000;

// 思考一下

// 我们返回的数据格式,即使

// 但是,这一步聚合完了以后,其实,我们是还需要将每一行数据,跟对应的用户信息进行聚合

// 问题就来了,如果是跟用户信息进行聚合的话,那么key,就不应该是sessionid

// 就应该是userid,才能够跟格式的用户信息进行聚合

// 如果我们这里直接返回,还得再做一次mapToPair算子

// 将RDD映射成的格式,那么就多此一举

// 所以,我们这里其实可以直接,返回的数据格式,就是

// 然后跟用户信息join的时候,将partAggrInfo关联上userInfo

// 然后再直接将返回的Tuple的key设置成sessionid

// 最后的数据格式,还是

// 聚合数据,用什么样的格式进行拼接

// 我们这里统一定义,使用key=value|key=value

String partAggrInfo = SparkContantsParam.FIELD_SESSION_ID + "=" + sessionid + "|"

+ SparkContantsParam.FIELD_SEARCH_KEYWORD + "=" + searchKW + "|"

+ SparkContantsParam.FIELD_CLICK_CATEGORY_ID + "=" + clickCategoryid

+ SparkContantsParam.FIELD_VISIT_LENGTH + "=" + visiTime + "|"

+ SparkContantsParam.FIELD_STEP_LENGTH + "=" + stepLength;

return new Tuple2(userId, partAggrInfo);

});

过滤算子方法中增加统计各个阶段session的访问次数和访问时长

//按照使用者(Task表中的task_param)指定的筛选参数进行数据过滤

private static JavaPairRDD filterSession(JavaPairRDD session2AggrInfo, JSONObject paramJson,

Accumulator sessionAggrStatAccumulator) {

String categoryIds = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_CATEGORY_IDS);

String cities = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_CITIES);

String startAge = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_START_AGE);

String endAge = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_END_AGE);

String professionals = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_PROFESSIONALS);

String sex = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_SEX);

String keyWords = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_KEYWORDS);

String _param = (StringUtils.isNotEmpty(categoryIds) SparkContantsParam.PARAM_CATEGORY_IDS+"="+categoryIds + "|":"")

+ (StringUtils.isNotEmpty(cities)SparkContantsParam.PARAM_CITIES+"="+cities + "|" : "")

+ (StringUtils.isNotEmpty(startAge)SparkContantsParam.PARAM_START_AGE+"="+startAge + "|" : "")

+ (StringUtils.isNotEmpty(endAge)SparkContantsParam.PARAM_END_AGE+"="+endAge + "|" : "")

+ (StringUtils.isNotEmpty(professionals)SparkContantsParam.PARAM_PROFESSIONALS+"="+professionals + "|" : "")

+ (StringUtils.isNotEmpty(sex)SparkContantsParam.PARAM_SEX+"="+sex + "|" : "")

+ (StringUtils.isNotEmpty(keyWords)SparkContantsParam.PARAM_KEYWORDS+"="+keyWords : "");

if(_param.endsWith("\\|")){

_param = _param.substring(0,_param.length()-1);

}

final String param = _param;

JavaPairRDD filter = session2AggrInfo.filter(tuple -> {

// 首先,从tuple中,获取聚合数据

String partInfo = tuple._2;

// 接着,依次按照筛选条件进行过滤

// 按照年龄范围进行过滤(startAge、endAge)

if (!ValidUtils.between(partInfo, SparkContantsParam.FIELD_AGE, param, SparkContantsParam.PARAM_START_AGE, SparkContantsParam.PARAM_END_AGE)) {

return false;

}

// 按照职业范围进行过滤(professionals)

// 互联网,IT,软件

// 互联网

if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_PROFESSIONAL, param, SparkContantsParam.PARAM_PROFESSIONALS)) {

return false;

}

// 按照城市范围进行过滤(cities)

// 北京,上海,广州,深圳

// 成都

if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_CITY, param, SparkContantsParam.PARAM_CITIES)) {

return false;

}

// 按照搜索词进行过滤

// 我们的session可能搜索了 火锅,蛋糕,烧烤

// 我们的筛选条件可能是 火锅,串串香,iphone手机

// 那么,in这个校验方法,主要判定session搜索的词中,有任何一个,与筛选条件中

// 任何一个搜索词相当,即通过

if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_SEARCH_KEYWORD, param, SparkContantsParam.PARAM_KEYWORDS)) {

return false;

}

// 按照点击品类id进行过滤

if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_CLICK_CATEGORY_ID, param, SparkContantsParam.PARAM_CATEGORY_IDS)) {

return false;

}

// 按照性别进行过滤

// 男/女

// 男,女

if (!ValidUtils.equal(partInfo, SparkContantsParam.FIELD_SEX, param, SparkContantsParam.PARAM_SEX)) {

return false;

}

//当程序运行到这里说明session使我们需要统计的session

sessionAggrStatAccumulator.add(SparkContantsParam.SESSION_COUNT);

long visitLength = Long.valueOf(StringUtils.getFieldFromConcatString(partInfo,"\\|",SparkContantsParam.FIELD_VISIT_LENGTH));

long stepLength = Long.valueOf(StringUtils.getFieldFromConcatString(partInfo,"\\|",SparkContantsParam.FIELD_STEP_LENGTH));

calculateStepLength(visitLength,sessionAggrStatAccumulator);

calculateVisitLength(stepLength,sessionAggrStatAccumulator);

return true;

});

return filter;

}

private static void calculateVisitLength(long visitLength, Accumulator sessionAggrStatAccumulator) {

if (visitLength >= 1 && visitLength <= 3) {

sessionAggrStatAccumulator.add(SparkContantsParam.TIME_PERIOD_1s_3s);

} else if (visitLength >= 4 && visitLength <= 6) {

sessionAggrStatAccumulator.add(SparkContantsParam.TIME_PERIOD_4s_6s);

} else if (visitLength >= 7 && visitLength <= 9) {

sessionAggrStatAccumulator.add(SparkContantsParam.TIME_PERIOD_7s_9s);

} else if (visitLength >= 10 && visitLength <= 30) {

sessionAggrStatAccumulator.add(SparkContantsParam.TIME_PERIOD_10s_30s);

} else if (visitLength > 30 && visitLength <= 60) {

sessionAggrStatAccumulator.add(SparkContantsParam.TIME_PERIOD_30s_60s);

} else if (visitLength > 60 && visitLength <= 180) {

sessionAggrStatAccumulator.add(SparkContantsParam.TIME_PERIOD_1m_3m);

} else if (visitLength > 180 && visitLength <= 600) {

sessionAggrStatAccumulator.add(SparkContantsParam.TIME_PERIOD_3m_10m);

} else if (visitLength > 600 && visitLength <= 1800) {

sessionAggrStatAccumulator.add(SparkContantsParam.TIME_PERIOD_10m_30m);

}

}

private static void calculateStepLength(long stepLength, Accumulator sessionAggrStatAccumulator) {

if (stepLength >= 1 && stepLength <= 3) {

sessionAggrStatAccumulator.add(SparkContantsParam.STEP_PERIOD_1_3);

} else if (stepLength >= 4 && stepLength <= 6) {

sessionAggrStatAccumulator.add(SparkContantsParam.STEP_PERIOD_4_6);

} else if (stepLength >= 7 && stepLength <= 9) {

sessionAggrStatAccumulator.add(SparkContantsParam.STEP_PERIOD_7_9);

} else if (stepLength >= 10 && stepLength <= 30) {

sessionAggrStatAccumulator.add(SparkContantsParam.STEP_PERIOD_10_30);

} else if (stepLength > 30 && stepLength <= 60) {

sessionAggrStatAccumulator.add(SparkContantsParam.STEP_PERIOD_30_60);

} else if (stepLength > 60) {

sessionAggrStatAccumulator.add(SparkContantsParam.STEP_PERIOD_60);

}

}

session聚合统计之计算统计结果并写入Mysql数据库

package com.spark.sparksession.sparkjob;

import com.alibaba.fastjson.JSONObject;

import com.spark.sparksession.dao.SessionAggrStatDao;

import com.spark.sparksession.dao.TaskDao;

import com.spark.sparksession.data.MockData;

import com.spark.sparksession.domain.SessionAggr;

import com.spark.sparksession.domain.Task;

import com.spark.sparksession.util.*;

import org.apache.spark.Accumulator;

import org.apache.spark.SparkConf;

import org.apache.spark.SparkContext;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SQLContext;

import org.apache.spark.sql.hive.HiveContext;

import org.springframework.stereotype.Component;

import scala.Tuple2;

import javax.annotation.Resource;

import java.util.Date;

import java.util.Iterator;

import java.util.List;

import java.util.Map;

/**

* 用户访问session分析Spark作业

*

* 接收用户创建的分析任务,用户可能指定的条件如下: *

* 1、时间范围:起始日期~结束日期 * 2、性别:男或女 * 3、年龄范围 * 4、职业:多选 * 5、城市:多选 * 6、搜索词:多个搜索词,只要某个session中的任何一个action搜索过指定的关键词,那么session就符合条件 * 7、点击品类:多个品类,只要某个session中的任何一个action点击过某个品类,那么session就符合条件 *

* 我们的spark作业如何接受用户创建的任务 *

* J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,任务参数以JSON格式封装在task_param * 字段中 *

* 接着J2EE平台会执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本 * spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数,传递给Spark作业的main函数 * 参数就封装在main函数的args数组中 *

* 这是spark本身提供的特性 * * @author Administrator */ @Component public class UserVisitSessionAnalyzeSpark { @Resource private TaskDao taskDao; @Resource private SessionAggrStatDao sessionAggrStatDao; public void uservisit() { String[] args = new String[]{"1"}; //构建Spark上下文 SparkConf conf = new SparkConf().setAppName(SparkContantsParam.USER_VISIT_APPNAME).setMaster(SparkContantsParam.USER_VISIT_MASTER); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = getSqlContext(jsc.sc()); //生成模拟数据 mockData(jsc, sqlContext); //从主函数入口args获取taskId Long taskId = ParamUtils.getTaskIdFromArgs(args); List tasks = taskDao.findTaskById(Math.toIntExact(taskId)); //得到task Task task = null; if (!tasks.isEmpty()) { task = tasks.get(0); } // 首先得查询出来指定的任务,并获取任务的查询参数 JSONObject paramJson = JSONObject.parseObject(task.getTaskParam()); //如果要进行session粒度的聚合,首先要从user_visit_action表中查询出指定日期范围内的行为数据 JavaRDD actionRDD = getActionRDDByDateRange(sqlContext, paramJson); System.out.println(actionRDD.count()); //首先将行为数据按照session_id使用groupByKey进行分组 //此时数据粒度就是session粒度了,然后与用户信息进行join //此时得到的数据就是session粒度的信息并且包含着session对应的user的信息 JavaPairRDD session2AggrInfo = aggregateBySession(sqlContext, actionRDD); //重构统计使用自定义的accumulator进行对用户访问时长和访问次数的统计 Accumulator sessionAggrStatAccumulator = jsc.accumulator("", new SparkSessionAggrAccumulator()); //接着我们就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤 //相当于我们自己编写的算子,是要访问外面的任务参数对象的 JavaPairRDD filtersession2AggrInfo = filterSession(session2AggrInfo, paramJson, sessionAggrStatAccumulator); //count:触发action执行的方法 System.out.println(filtersession2AggrInfo.count()); //计算出各个范围的session占比,并写入Mysql caculateAndAggrPersist(sessionAggrStatAccumulator.value(), task.getTaskId()); //关闭Spark上下文 jsc.close(); } //首先要从user_visit_action表中查询出指定日期范围内的行为数据 private static JavaRDD getActionRDDByDateRange(SQLContext sqlContext, JSONObject paramJson) { 如上 } //首先将行为数据按照session_id使用groupByKey进行分组 //此时数据粒度就是session粒度了,然后与用户信息进行join //此时得到的数据就是session粒度的信息并且包含着session对应的user的信息 private static JavaPairRDD aggregateBySession(SQLContext sqlContext, JavaRDD actionRDD) { 如上 } //按照使用者(Task表中的task_param)指定的筛选参数进行数据过滤 private static JavaPairRDD filterSession(JavaPairRDD session2AggrInfo, JSONObject paramJson, Accumulator sessionAggrStatAccumulator) { String categoryIds = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_CATEGORY_IDS); String cities = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_CITIES); String startAge = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_START_AGE); String endAge = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_END_AGE); String professionals = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_PROFESSIONALS); String sex = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_SEX); String keyWords = ParamUtils.getParam(paramJson, SparkContantsParam.PARAM_KEYWORDS); String _param = (StringUtils.isNotEmpty(categoryIds) SparkContantsParam.PARAM_CATEGORY_IDS + "=" + categoryIds + "|" : "") + (StringUtils.isNotEmpty(cities) SparkContantsParam.PARAM_CITIES + "=" + cities + "|" : "") + (StringUtils.isNotEmpty(startAge) SparkContantsParam.PARAM_START_AGE + "=" + startAge + "|" : "") + (StringUtils.isNotEmpty(endAge) SparkContantsParam.PARAM_END_AGE + "=" + endAge + "|" : "") + (StringUtils.isNotEmpty(professionals) SparkContantsParam.PARAM_PROFESSIONALS + "=" + professionals + "|" : "") + (StringUtils.isNotEmpty(sex) SparkContantsParam.PARAM_SEX + "=" + sex + "|" : "") + (StringUtils.isNotEmpty(keyWords) SparkContantsParam.PARAM_KEYWORDS + "=" + keyWords : ""); if (_param.endsWith("\\|")) { _param = _param.substring(0, _param.length() - 1); } final String param = _param; JavaPairRDD filter = session2AggrInfo.filter(tuple -> { // 首先,从tuple中,获取聚合数据 String partInfo = tuple._2; System.out.println("filter:"+partInfo); // 接着,依次按照筛选条件进行过滤 // 按照年龄范围进行过滤(startAge、endAge) if (!ValidUtils.between(partInfo, SparkContantsParam.FIELD_AGE, param, SparkContantsParam.PARAM_START_AGE, SparkContantsParam.PARAM_END_AGE)) { return false; } // 按照职业范围进行过滤(professionals) // 互联网,IT,软件 // 互联网 if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_PROFESSIONAL, param, SparkContantsParam.PARAM_PROFESSIONALS)) { return false; } // 按照城市范围进行过滤(cities) // 北京,上海,广州,深圳 // 成都 if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_CITY, param, SparkContantsParam.PARAM_CITIES)) { return false; } // 按照搜索词进行过滤 // 我们的session可能搜索了 火锅,蛋糕,烧烤 // 我们的筛选条件可能是 火锅,串串香,iphone手机 // 那么,in这个校验方法,主要判定session搜索的词中,有任何一个,与筛选条件中 // 任何一个搜索词相当,即通过 if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_SEARCH_KEYWORD, param, SparkContantsParam.PARAM_KEYWORDS)) { return false; } // 按照点击品类id进行过滤 if (!ValidUtils.in(partInfo, SparkContantsParam.FIELD_CLICK_CATEGORY_ID, param, SparkContantsParam.PARAM_CATEGORY_IDS)) { return false; } // 按照性别进行过滤 // 男/女 // 男,女 if (!ValidUtils.equal(partInfo, SparkContantsParam.FIELD_SEX, param, SparkContantsParam.PARAM_SEX)) { return false; } //当程序运行到这里说明session使我们需要统计的session sessionAggrStatAccumulator.add(SparkContantsParam.SESSION_COUNT); long visitLength = Long.valueOf(StringUtils.getFieldFromConcatString(partInfo, "\\|", SparkContantsParam.FIELD_VISIT_LENGTH)); long stepLength = Long.valueOf(StringUtils.getFieldFromConcatString(partInfo, "\\|", SparkContantsParam.FIELD_STEP_LENGTH)); calculateStepLength(stepLength, sessionAggrStatAccumulator); calculateVisitLength(visitLength, sessionAggrStatAccumulator); return true; }); return filter; } //生成模拟数据,只有在本地才会生成模拟数据 private static void mockData(JavaSparkContext jsc, SQLContext sqlContext) { if (SparkContantsParam.USER_VISIT_LOCAL) { MockData.mock(jsc, sqlContext); } } //获取SqlContext,如果在本地就生成SqlContext对象,如果在线上就生成HiveContext对象 private static SQLContext getSqlContext(SparkContext sc) { if (SparkContantsParam.USER_VISIT_LOCAL) { return new SQLContext(sc); } else { return new HiveContext(sc); } } private void caculateAndAggrPersist(String value, int taskId) { System.out.println(value); System.out.println(taskId); int session_count = Integer.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.SESSION_COUNT)); Long visit_length_1s_3s_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.TIME_PERIOD_1s_3s)); Long visit_length_4s_6s_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.TIME_PERIOD_4s_6s)); Long visit_length_7s_9s_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.TIME_PERIOD_7s_9s)); Long visit_length_10s_30s_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.TIME_PERIOD_10s_30s)); Long visit_length_30s_60s_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.TIME_PERIOD_30s_60s)); Long visit_length_1m_3m_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.TIME_PERIOD_1m_3m)); Long visit_length_3m_10m_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.TIME_PERIOD_3m_10m)); Long visit_length_10m_30m_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.TIME_PERIOD_10m_30m)); Long visit_length_30m_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.TIME_PERIOD_30m)); Long step_length_1_3_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.STEP_PERIOD_1_3)); Long step_length_4_6_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.STEP_PERIOD_4_6)); Long step_length_7_9_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.STEP_PERIOD_7_9)); Long step_length_10_30_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.STEP_PERIOD_10_30)); Long step_length_30_60_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.STEP_PERIOD_30_60)); Long step_length_60_ratio = Long.valueOf(StringUtils.getFieldFromConcatString(value, "\\|", SparkContantsParam.STEP_PERIOD_60)); double visit_length_1s_3s = NumberUtils.formatDouble((double)visit_length_1s_3s_ratio / (double)session_count, 2); double visit_length_4s_6s = NumberUtils.formatDouble((double)visit_length_4s_6s_ratio / (double)session_count, 2); double visit_length_7s_9s = NumberUtils.formatDouble((double)visit_length_7s_9s_ratio / (double)session_count, 2); double visit_length_10s_30s = NumberUtils.formatDouble((double)visit_length_10s_30s_ratio / (double)session_count, 2); double visit_length_30s_60s = NumberUtils.formatDouble((double)visit_length_30s_60s_ratio / (double)session_count, 2); double visit_length_1m_3m = NumberUtils.formatDouble((double)visit_length_1m_3m_ratio / (double)session_count, 2); double visit_length_3m_10m = NumberUtils.formatDouble((double)visit_length_3m_10m_ratio / (double)session_count, 2); double visit_length_10m_30m = NumberUtils.formatDouble((double)visit_length_10m_30m_ratio / (double)session_count, 2); double visit_length_30m = NumberUtils.formatDouble((double)visit_length_30m_ratio / (double)session_count, 2); double step_length_1_3 = NumberUtils.formatDouble((double)step_length_1_3_ratio / (double)session_count, 2); double step_length_4_6 = NumberUtils.formatDouble((double)step_length_4_6_ratio / (double)session_count, 2); double step_length_7_9 = NumberUtils.formatDouble((double)step_length_7_9_ratio / (double)session_count, 2); double step_length_10_30 = NumberUtils.formatDouble((double)step_length_10_30_ratio / (double)session_count, 2); double step_length_30_60 = NumberUtils.formatDouble((double)step_length_30_60_ratio / (double)session_count, 2); double step_length_60 = NumberUtils.formatDouble((double)step_length_60_ratio / (double)session_count, 2); SessionAggr aggr = new SessionAggr(); aggr.setTaskid(taskId); aggr.setSession_count(session_count); aggr.setStep_length_1_3_ratio(step_length_1_3); aggr.setStep_length_4_6_ratio(step_length_4_6); aggr.setStep_length_7_9_ratio(step_length_7_9); aggr.setStep_length_10_30_ratio(step_length_10_30); aggr.setStep_length_30_60_ratio(step_length_30_60); aggr.setStep_length_60_ratio(step_length_60); aggr.setVisit_length_1s_3s_ratio(visit_length_1s_3s); aggr.setVisit_length_4s_6s_ratio(visit_length_4s_6s); aggr.setVisit_length_7s_9s_ratio(visit_length_7s_9s); aggr.setVisit_length_10s_30s_ratio(visit_length_10s_30s); aggr.setVisit_length_30s_60s_ratio(visit_length_30s_60s); aggr.setVisit_length_1m_3m_ratio(visit_length_1m_3m); aggr.setVisit_length_3m_10m_ratio(visit_length_3m_10m); aggr.setVisit_length_10m_30m_ratio(visit_length_10m_30m); aggr.setVisit_length_30m_ratio(visit_length_30m); System.out.println(aggr); sessionAggrStatDao.insert(aggr); } }

按时间比例随机抽取session

package com.spark.sparksession.sparkjob;

import com.alibaba.fastjson.JSONObject;

import com.spark.sparksession.dao.SessionAggrStatDao;

import com.spark.sparksession.dao.TaskDao;

import com.spark.sparksession.data.MockData;

import com.spark.sparksession.domain.SessionAggr;

import com.spark.sparksession.domain.Task;

import com.spark.sparksession.util.*;

import org.apache.spark.Accumulator;

import org.apache.spark.SparkConf;

import org.apache.spark.SparkContext;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SQLContext;

import org.apache.spark.sql.hive.HiveContext;

import org.springframework.stereotype.Component;

import scala.Tuple2;

import javax.annotation.Resource;

import java.util.Date;

import java.util.Iterator;

import java.util.List;

import java.util.Map;

/**

* 用户访问session分析Spark作业

*

* 接收用户创建的分析任务,用户可能指定的条件如下: *

* 1、时间范围:起始日期~结束日期 * 2、性别:男或女 * 3、年龄范围 * 4、职业:多选 * 5、城市:多选 * 6、搜索词:多个搜索词,只要某个session中的任何一个action搜索过指定的关键词,那么session就符合条件 * 7、点击品类:多个品类,只要某个session中的任何一个action点击过某个品类,那么session就符合条件 *

* 我们的spark作业如何接受用户创建的任务 *

* J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,任务参数以JSON格式封装在task_param * 字段中 *

* 接着J2EE平台会执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本 * spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数,传递给Spark作业的main函数 * 参数就封装在main函数的args数组中 *

* 这是spark本身提供的特性 * * @author Administrator */ @Component public class UserVisitSessionAnalyzeSpark { @Resource private TaskDao taskDao; @Resource private SessionAggrStatDao sessionAggrStatDao; public void uservisit() { String[] args = new String[]{"1"}; //构建Spark上下文 SparkConf conf = new SparkConf().setAppName(SparkContantsParam.USER_VISIT_APPNAME).setMaster(SparkContantsParam.USER_VISIT_MASTER); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = getSqlContext(jsc.sc()); //生成模拟数据 mockData(jsc, sqlContext); //从主函数入口args获取taskId Long taskId = ParamUtils.getTaskIdFromArgs(args); List tasks = taskDao.findTaskById(Math.toIntExact(taskId)); //得到task Task task = null; if (!tasks.isEmpty()) { task = tasks.get(0); } // 首先得查询出来指定的任务,并获取任务的查询参数 JSONObject paramJson = JSONObject.parseObject(task.getTaskParam()); //如果要进行session粒度的聚合,首先要从user_visit_action表中查询出指定日期范围内的行为数据 JavaRDD actionRDD = getActionRDDByDateRange(sqlContext, paramJson); System.out.println(actionRDD.count()); //首先将行为数据按照session_id使用groupByKey进行分组 //此时数据粒度就是session粒度了,然后与用户信息进行join //此时得到的数据就是session粒度的信息并且包含着session对应的user的信息 JavaPairRDD session2AggrInfo = aggregateBySession(sqlContext, actionRDD); //重构统计使用自定义的accumulator进行对用户访问时长和访问次数的统计 Accumulator sessionAggrStatAccumulator = jsc.accumulator("", new SparkSessionAggrAccumulator()); //接着我们就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤 //相当于我们自己编写的算子,是要访问外面的任务参数对象的 JavaPairRDD filtersession2AggrInfo = filterSession(session2AggrInfo, paramJson, sessionAggrStatAccumulator); //按时间比例随机抽取session randomExtractSession(filtersession2AggrInfo); System.out.println(filtersession2AggrInfo.count()); caculateAndAggrPersist(sessionAggrStatAccumulator.value(), task.getTaskId()); //关闭Spark上下文 jsc.close(); } }

随机抽取session方法

第一步 使用mapToPair()获取格式的RDD

使用countByKey得到每天每小时session的数量

第二步 使用按时间比例随机抽取算法,计算出每天每小时要抽取session的索引

第三步 遍历每天每小时的session根据第二步生成的随机索引进行抽取session

将第一步生成的RDD使用groupByKey算子得到格式的每天每小时的所有session数据

遍历这些session数据,如果该session索引是第二步生成的索引,将其对应的信息持久化到数据库中,并使用flatMapToPair将其sessionid抽取出来,组成 Tuple2(sessionid,sessionid)的格式,方便第四步 与session详情的RDD进行join,获取该session的详细信息

第四步:获取抽取出来的session的明细数据

private void randomExtractSession(final int taskId, JavaPairRDD session2AggrInfo,

JavaPairRDD sessionid2Action) {

/**

* 第一步 计算出每天每小时的session数量,获取,aggrInfo>格式的RDD

*/

JavaPairRDD session2TimeRDD = session2AggrInfo.mapToPair(tuple2 -> {

String aggrInfo = tuple2._2;

String startTime = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", SparkContantsParam.FIELD_STARTDATE);

String dateHour = DateUtils.getDateHour(startTime);

return new Tuple2<>(dateHour, aggrInfo);

});

/**

*

* 每天每小时的session数量,然后计算出每天每小时的session抽取索引,遍历每天每小时session

* 首先抽取出的session的聚合数据,写入session_random_extract表

* 所以第一个RDD的value,应该是session聚合数据

*

*/

//得到每天每小时的session数量

Map countMap = session2TimeRDD.countByKey();

/**

* 第二步 使用按时间比例随机抽取算法,计算出每天每小时要抽取session的索引

*/

// 将格式的map,转换成>的格式

Map> dateHourCountMap = new HashMap<>();

countMap.forEach((k,v)->{

String date = k.split("_")[0];

String hour = k.split("_")[1];

int count = Integer.parseInt(v.toString());

Map hourCountMap = dateHourCountMap.get(date);

if(hourCountMap == null){

hourCountMap = new HashMap<>();

dateHourCountMap.put(date,hourCountMap);

}

hourCountMap.put(hour,count);

});

// 开始实现我们的按时间比例随机抽取算法

// 总共要抽取100个session,先按照天数进行平分

// 每天要抽取的数量

int oneDayExtractNum = 100 / dateHourCountMap.size();

// >

final Map>> dateHourExtractMap = new HashMap<>();

Random random = new Random();

dateHourCountMap.forEach((k,v)->{

String date = k;

Map hourCountMap = v;

//计算出这一天的session总数

long dateTotalCount = 0;

for(long datecount:hourCountMap.values()){

dateTotalCount += datecount;

}

Map> hourExtractMap = dateHourExtractMap.get(date);

if(hourExtractMap == null){

hourExtractMap = new HashMap<>();

dateHourExtractMap.put(date,hourExtractMap);

}

for(Map.Entry hourCount : hourCountMap.entrySet()){

String hour = hourCount.getKey();

Integer count = hourCount.getValue();

//计算出每个小时session的数量,占据当天总session数量的比例,直接乘以每天要抽取的数量

//就可以计算出当前小时要抽取的数量

int hourExtractCount = (int)((double) count / (double) dateTotalCount) * oneDayExtractNum;

if(hourExtractCount > count) {

hourExtractCount = count;

}

//获取当前小时存放随机索引的list

List hourIndexList = hourExtractMap.get(hour);

if(hourIndexList == null){

hourIndexList = new ArrayList<>();

hourExtractMap.put(hour,hourIndexList);

}

//按照每个小时要抽取的数量,在每个小时对应的session数量范围内,生成随机的session索引

for(int i=0;i/**

* 第三步 遍历每天每小时的session根据第二步生成的随机索引进行抽取

*/

// 执行groupByKey算子,得到

JavaPairRDD> time2SessionRDD = session2TimeRDD.groupByKey();

JavaPairRDD extractSessionIdsRDD = time2SessionRDD.flatMapToPair(tuple -> {

String date = tuple._1.split("_")[0];

String hour = tuple._1.split("_")[1];

Iterator aggrInfos = tuple._2.iterator();

List> extractSessionIds = new ArrayList<>();

//或取到上面得到的当前小时随机索引

List indexs = dateHourExtractMap.get(date).get(hour);

for (int i = 0; aggrInfos.hasNext(); i++) {

//如果当前的session索引在刚才第二步生成的当前小时的随机索引中

if (indexs.contains(i)) {

String info = aggrInfos.next();

String sessionId = StringUtils.getFieldFromConcatString(info, "\\|", SparkContantsParam.FIELD_SESSION_ID);

String startDate = StringUtils.getFieldFromConcatString(info, "\\|", SparkContantsParam.FIELD_STARTDATE);

String searchKeywordds = StringUtils.getFieldFromConcatString(info, "\\|", SparkContantsParam.FIELD_SEARCH_KEYWORDS);

String categoryIds = StringUtils.getFieldFromConcatString(info, "\\|", SparkContantsParam.PARAM_CATEGORY_IDS);

SessionRandomExtract randomExtract = new SessionRandomExtract();

randomExtract.setTaskId(taskId);

randomExtract.setClickCategoryIds(categoryIds);

randomExtract.setSearchKeywords(searchKeywordds);

randomExtract.setSessionId(sessionId);

randomExtract.setStartTime(startDate);

//插入随机抽取到的session数据到mysql的session_random_extract表

sessionRandomExtractDao.insertSessionRandomExtract(randomExtract);

extractSessionIds.add(new Tuple2<>(sessionId, sessionId));

}

}

return extractSessionIds;

});

/**

* 第四步:获取抽取出来的session的明细数据

*/

JavaPairRDD> extractSessionDetailRDD =

extractSessionIdsRDD.join(sessionid2Action);

extractSessionDetailRDD.foreach(new VoidFunction>>() {

@Override

public void call(Tuple2> tuple) throws Exception {

Row row = tuple._2._2;

SessionDetail sd = new SessionDetail();

sd.setUserId((int) row.getLong(1));

sd.setSessionId(row.getString(2));

sd.setPageId((int) row.getLong(3));

sd.setActionTime(row.getString(4));

sd.setSearchKeyword(row.getString(5));

sd.setClickCategoryId((int)row.getLong(6));

sd.setClickProductId((int)row.getLong(7));

sd.setOrderCategoryIds(row.getString(8));

sd.setOrderProductIds(row.getString(9));

sd.setPayCategoryIds(row.getString(10));

sd.setPayProductIds(row.getString(11));

sessionDetailDao.insertSessionDetail(sd);

}

});

}

Top10热门品类排名功能

需求回顾:top10热门品类

计算出来通过筛选条件的那些session,他们访问过的所有品类(点击、下单、支付),按照各个品类的点击、下单和支付次数,降序排序,获取前10个品类,也就是筛选条件下的那一批session的top10热门品类;

点击、下单和支付次数:优先按照点击次数排序、如果点击次数相等,那么按照下单次数排序、如果下单次数相当,那么按照支付次数排序

这个需求是很有意义的,因为这样,就可以让数据分析师、产品经理、公司高层,随时随地都可以看到自己感兴趣的那一批用户,最喜欢的10个品类,从而对自己公司和产品的定位有清晰的了解,并且可以更加深入的了解自己的用户,更好的调整公司战略

二次排序:

如果我们就只是根据某一个字段进行排序,比如点击次数降序排序,那么就不是二次排序;

二次排序,顾名思义,就是说,不只是根据一个字段进行一次排序,可能是要根据多个字段,进行多次排序的

点击、下单和支付次数,依次进行排序,就是二次排序

sortByKey算子,默认情况下,它支持根据int、long等类型来进行排序,但是那样的话,key就只能放一个字段了

所以需要自定义key,作为sortByKey算子的key,自定义key中,封装n个字段,并在key中,自己在指定接口方法中,实现自己的根据多字段的排序算法

然后再使用sortByKey算子进行排序,那么就可以按照我们自己的key,使用多个字段进行排序

本模块中,最最重要和核心的一个Spark技术点

实现思路分析:

1、拿到通过筛选条件的那批session,访问过的所有品类

2、计算出session访问过的所有品类的点击、下单和支付次数,这里可能要跟第一步计算出来的品类进行join

3、自己开发二次排序的key

4、做映射,将品类的点击、下单和支付次数,封装到二次排序key中,作为PairRDD的key

5、使用sortByKey(false),按照自定义key,进行降序二次排序

6、使用take(10)获取,排序后的前10个品类,就是top10热门品类

7、将top10热门品类,以及每个品类的点击、下单和支付次数,写入MySQL数据库

8、本地测试

9、使用Scala来开发二次排序key

代码实现

第一步:获取符合条件的session访问过得所有品类

/**

* 获取top10热门品类

* @param filteredSessionid2AggrInfoRDD

* @param sessionid2actionRDD

*/

private static void getTop10Category(

JavaPairRDD filteredSessionid2AggrInfoRDD,

JavaPairRDD sessionid2actionRDD) {

/**

* 第一步:获取符合条件的session访问过的所有品类

*/

// 获取符合条件的session的访问明细

JavaPairRDD sessionid2detailRDD = filteredSessionid2AggrInfoRDD

.join(sessionid2actionRDD)

.mapToPair(new PairFunction>, String, Row>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2 call(

Tuple2> tuple) throws Exception {

return new Tuple2(tuple._1, tuple._2._2);

}

});

// 获取session访问过的所有品类id

// 访问过:指的是,点击过、下单过、支付过的品类

JavaPairRDD categoryidRDD = sessionid2detailRDD.flatMapToPair(

new PairFlatMapFunction, Long, Long>() {

private static final long serialVersionUID = 1L;

@Override

public Iterable> call(

Tuple2 tuple) throws Exception {

Row row = tuple._2;

List> list = new ArrayList>();

Long clickCategoryId = row.getLong(6);

if(clickCategoryId != null) {

list.add(new Tuple2(clickCategoryId, clickCategoryId));

}

String orderCategoryIds = row.getString(8);

if(orderCategoryIds != null) {

String[] orderCategoryIdsSplited = orderCategoryIds.split(",");

for(String orderCategoryId : orderCategoryIdsSplited) {

list.add(new Tuple2(Long.valueOf(orderCategoryId),

Long.valueOf(orderCategoryId)));

}

}

String payCategoryIds = row.getString(10);

if(payCategoryIds != null) {

String[] payCategoryIdsSplited = payCategoryIds.split(",");

for(String payCategoryId : payCategoryIdsSplited) {

list.add(new Tuple2(Long.valueOf(payCategoryId),

Long.valueOf(payCategoryId)));

}

}

return list;

}

});

第二步:计算各品类的点击、下单、支付次数

访问明细中,其中三种访问行为是:点击、下单和支付

分别来计算各品类点击、下单和支付的次数,可以先对访问明细数据进行过滤

分别过滤出点击、下单和支付行为,然后通过map、reduceByKey等算子来进行计算

/**

* 2 计算各品类的点击,下单,支付次数

*/

//访问明细中,三种访问行为是,点击,下单,支付

//分别计算以上三种各品类行为的次数,可以先对访问明细数据进行过滤

//分别过滤出点击,下单,支付三种行为,然后通过map,reduceByKey的等各种算子进行计算

JavaPairRDD clickCategoryIdCountRDD = getLongCategoryIDCountByRowNum(sessionid2Action,6);

JavaPairRDD orderCategoryIdCountRDD = getStringCategoryIDCountByRowNum(sessionid2Action,8);

JavaPairRDD payCategoryIdCountRDD = getStringCategoryIDCountByRowNum(sessionid2Action,10);

}

//获取点击品类的次数

private JavaPairRDD getLongCategoryIDCountByRowNum(JavaPairRDD sessionid2Action, int rowNum) {

JavaPairRDD filterCategoryIDCountRDD = sessionid2Action.filter(tuple -> {

Row row = tuple._2;

return Long.valueOf(row.getLong(rowNum)) == null false : true;

});

JavaPairRDD categoryIDRDD = filterCategoryIDCountRDD.mapToPair(tuple -> {

long categoryId = tuple._2.getLong(rowNum);

return new Tuple2<>(categoryId, 1L);

});

return categoryIDRDD.reduceByKey((aLong, aLong2) -> aLong + aLong2);

}

//获取支付或者下单的次数

private JavaPairRDD getStringCategoryIDCountByRowNum(JavaPairRDD sessionid2Action, int rownum) {

JavaPairRDD filterCategoryIdsRDD = sessionid2Action.filter(tuple -> {

String categoryIds = tuple._2.getString(rownum);

return StringUtils.isNotEmpty(categoryIds) true : false;

});

JavaPairRDD flatCategoryIdsRDD = filterCategoryIdsRDD.flatMapToPair(tuple -> {

String categoryIds = tuple._2.getString(rownum);

String[] splitcateids = categoryIds.split(",");

List> categoryIDS = new ArrayList<>();

for (String id : splitcateids) {

categoryIDS.add(new Tuple2<>(Long.valueOf(id), 1L));

}

return categoryIDS;

});

return flatCategoryIdsRDD.reduceByKey((aLong, aLong2) -> aLong + aLong2);

}

第三步:左外连接各品类RDD与数据RDD,拼接成

/**

* 3 连接品类RDD与数据RDD

*/

JavaPairRDD categoryId2CountRDD =

joinCategoryAndData(categoryIdsRDD, clickCategoryIdCountRDD, orderCategoryIdCountRDD, payCategoryIdCountRDD);

/**

* 连接品类RDD与数据RDD

*/

public JavaPairRDD joinCategoryAndData(JavaPairRDD categoryIdRDD,

JavaPairRDD clickCategoryIdRDD,

JavaPairRDD orderIdsCategoryIdRDD,

JavaPairRDD payCategoryIdRDD){

JavaPairRDD tmpMapRDD = categoryIdRDD.leftOuterJoin(clickCategoryIdRDD).mapToPair(tuple -> {

Long categoryId = tuple._1;

Optional optional = tuple._2._2;

Long clickCount = 0L;

if(optional.isPresent()){

clickCount = optional.get();

}

String value = SparkContantsParam.FIELD_CATEGORY_ID + "=" + categoryId +"\\|"

+ SparkContantsParam.FIELD_CLICK_COUNT + "=" + clickCount;

return new Tuple2<>(categoryId, value);

});

tmpMapRDD = tmpMapRDD.leftOuterJoin(orderIdsCategoryIdRDD).mapToPair(tuple -> {

Long categoryId = tuple._1;

Optional optional = tuple._2._2;

String value = tuple._2._1;

Long orderCount = 0L;

if(optional.isPresent()){

orderCount = optional.get();

}

value = value + "\\|" + SparkContantsParam.FIELD_ORDER_COUNT + "=" + orderCount;

return new Tuple2<>(categoryId,value);

});

return tmpMapRDD.leftOuterJoin(payCategoryIdRDD).mapToPair(tuple -> {

Long categoryId = tuple._1;

Optional optional = tuple._2._2;

String value = tuple._2._1;

Long payCount = 0L;

if(optional.isPresent()){

payCount = optional.get();

}

value = value + "\\|" + SparkContantsParam.FIELD_PAY_COUNT + "=" + payCount;

return new Tuple2<>(categoryId,value);

});

}

第四步:自定义二次排序key

package com.spark.sparksession.util;

import scala.math.Ordered;

public class CategorySortKey implements Ordered {

private Long clickCount;

private Long orderCount;

private Long payCount;

@Override

public boolean $less(CategorySortKey that) {

if(clickCount < that.clickCount){

return true;

}else if(clickCount == that.clickCount &&

orderCount < that.orderCount){

return true;

}else if((clickCount == that.clickCount &&

orderCount == that.orderCount &&

payCount < that.payCount)){

return true;

}

return false;

}

@Override

public boolean $greater(CategorySortKey that) {

if(clickCount > that.clickCount){

return true;

}else if(clickCount == that.clickCount &&

orderCount > that.orderCount){

return true;

}else if(clickCount == that.clickCount &&

orderCount == that.orderCount &&

payCount > that.payCount){

return true;

}

return false;

}

@Override

public boolean $less$eq(CategorySortKey that) {

if($less(that)){

return true;

}else if(clickCount == that.clickCount &&

orderCount == that.orderCount &&

payCount == that.payCount){

return true;

}

return false;

}

@Override

public boolean $greater$eq(CategorySortKey that) {

if($greater(that)){

return true;

}else if(clickCount == that.clickCount &&

orderCount == that.orderCount &&

payCount == that.payCount){

return true;

}

return false;

}

@Override

public int compareTo(CategorySortKey that) {

if(clickCount != that.clickCount){

return (int) (clickCount - that.clickCount);

}else if (orderCount != that.orderCount){

return (int) (orderCount - that.orderCount);

}else if(payCount != that.payCount){

return (int) (payCount - that.payCount);

}

return 0;

}

@Override

public int compare(CategorySortKey that) {

if(clickCount != that.clickCount){

return (int) (clickCount - that.clickCount);

}else if (orderCount != that.orderCount){

return (int) (orderCount - that.orderCount);

}else if(payCount != that.payCount){

return (int) (payCount - that.payCount);

}

return 0;

}

public Long getClickCount() {

return clickCount;

}

public void setClickCount(Long clickCount) {

this.clickCount = clickCount;

}

public Long getOrderCount() {

return orderCount;

}

public void setOrderCount(Long orderCount) {

this.orderCount = orderCount;

}

public Long getPayCount() {

return payCount;

}

public void setPayCount(Long payCount) {

this.payCount = payCount;

}

}

第五步:将数据映射成格式的RDD,然后进行二次排序

JavaPairRDD sortInfoRDD = categoryId2CountRDD.mapToPair(tuple -> {

String countInfo = tuple._2;

Long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|",

SparkContantsParam.FIELD_CLICK_COUNT));

Long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|",

SparkContantsParam.FIELD_ORDER_COUNT));

Long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|",

SparkContantsParam.FIELD_PAY_COUNT));

CategorySortKey sortKey = new CategorySortKey(clickCount, orderCount, payCount);

return new Tuple2<>(sortKey, countInfo);

});

//倒序排序

JavaPairRDD sortedCategoryCountRDD = sortInfoRDD.sortByKey(false);

第六步:用take(10)取出top10热门品类,并写入Mysql

List> takes = sortedCategoryCountRDD.take(10);

for (Tuple2 take : takes) {

String countInfo = take._2;

Top10Category top10Category = new Top10Category();

top10Category.setCategoryid(Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", SparkContantsParam.FIELD_CATEGORY_ID)));

top10Category.setClickCount(Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", SparkContantsParam.FIELD_CLICK_COUNT)));

top10Category.setOrderCount(Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", SparkContantsParam.FIELD_ORDER_COUNT)));

top10Category.setPayCount(Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", SparkContantsParam.FIELD_PAY_COUNT)));

top10Category.setTaskid(taskId);

top10CategoryDAO.insert(top10Category);

}

return takes;

top10活跃session

top10热门品类,获取每个品类点击次数最多的10个session,以及其对应的访问明细

实现思路分析:

1、拿到符合筛选条件的session的明细数据

2、按照session粒度进行聚合,获取到session对每个品类的点击次数,用flatMap,算子函数返回的是

3、按照品类id,分组取top10,获取到top10活跃session;groupByKey;自己写算法,获取到点击次数最多的前10个session,直接写入MySQL表;返回的是sessionid

4、获取各品类top10活跃session的访问明细数据,写入MySQL

代码实现

/**

* 获取top10活跃session

*/

private static void getTop10Session(JavaSparkContext jsc, int taskId, List> top10Category,

JavaPairRDD session2DetailRDD) {

/**

* 1 将top10热门品类的categoryid生成一份RDD

*/

List> top10CategoryIdsList = new ArrayList<>();

top10Category.forEach(tuple ->{

long categoryId = Long.valueOf(StringUtils.getFieldFromConcatString(tuple._2,"\\|",SparkContantsParam.FIELD_CATEGORY_ID));

top10CategoryIdsList.add(new Tuple2<>(categoryId,categoryId));

});

JavaPairRDD top10CategoryIdRDD = jsc.parallelizePairs(top10CategoryIdsList);

/**

* 2 计算top10的品类被各个session点击的次数

*/

JavaPairRDD> sessionDetailGroup = session2DetailRDD.groupByKey();

JavaPairRDD category2CountRDD = sessionDetailGroup.flatMapToPair(tuple -> {

String sessionid = tuple._1;

Iterator iterator = tuple._2.iterator();

Map categoryCount = new HashMap<>();

while (iterator.hasNext()) {

Row next = iterator.next();

if (next.get(6) != null) {

long categoryid = next.getLong(6);

Long count = categoryCount.get(categoryid);

if (count == null) {

count = 0L;

}

count++;

categoryCount.put(categoryid, count);

}

}

List> category2Count = new ArrayList<>();

//返回结果,的格式

for (Map.Entry category : categoryCount.entrySet()) {

Long categoryKey = category.getKey();

Long count = category.getValue();

String value = sessionid + "," + count;

category2Count.add(new Tuple2<>(categoryKey, value));

}

return category2Count;

});

// 获取到top10的品类被各个session点击的次数

JavaPairRDD top10CategorySessionCountRDD = top10CategoryIdRDD.join(category2CountRDD).mapToPair(tuple -> {

Long categoryId = tuple._1;

String value = tuple._2._2;

return new Tuple2<>(categoryId, value);

});

/**

* 第三步 分组取topN算法实现,获取每个品类的top10的活跃用户

*/

JavaPairRDD> categorySessionCountsRDD = top10CategorySessionCountRDD.groupByKey();

JavaPairRDD top10CategorySessionCountsRDD = categorySessionCountsRDD.flatMapToPair(tuple -> {

Long categoryId = tuple._1;

Iterator iterator = tuple._2.iterator();

String[] top10Sessions = new String[10];

while (iterator.hasNext()) {

String value = iterator.next();

long count = Long.valueOf(value.split(",")[1]);

for (int i = 0; i < top10Sessions.length; i++) {

// 如果当前i位,没有数据,那么直接将i位数据赋值为当前sessionCount

if (top10Sessions[i] == null) {

top10Sessions[i] = value;

break;

} else {

long _count = Long.valueOf(top10Sessions[i].split(",")[1]);

// 如果sessionCount比i位的sessionCount要大

if (count > _count) {

// 从排序数组最后一位开始,到i位,所有数据往后挪一位

for (int j = top10Sessions.length - 1; j > i; j--) {

top10Sessions[j] = top10Sessions[j - 1];

}

// 将i位赋值为sessionCount

top10Sessions[i] = value;

break;

}

// 比较小,继续外层for循环

}

}

}

//将数据写入mysql

List> list = new ArrayList<>();

for (String value : top10Sessions) {

String sessionId = value.split(",")[0];

String count = value.split(",")[1];

Top10Session top10Session = new Top10Session();

top10Session.setCategoryid(categoryId);

top10Session.setClickCount(Long.valueOf(count));

top10Session.setSessionid(sessionId);

top10Session.setTaskid(taskId);

top10SessionDAO.insert(top10Session);

list.add(new Tuple2<>(sessionId, sessionId));

}

return list;

});

/**

* 第四步:获取top10活跃session的明细数据,并写入MySQL

*/

JavaPairRDD> top10SessionDetailRDD = top10CategorySessionCountsRDD.join(session2DetailRDD);

top10SessionDetailRDD.foreach(tuple2 ->{

Row row = tuple2._2._2;

SessionDetail sessionDetail = new SessionDetail();

sessionDetail.setTaskId(taskId);

sessionDetail.setUserId(row.getLong(1));

sessionDetail.setSessionId(row.getString(2));

sessionDetail.setPageId(row.getLong(3));

sessionDetail.setActionTime(row.getString(4));

sessionDetail.setSearchKeyword(row.getString(5));

sessionDetail.setClickCategoryId(row.getLong(6));

sessionDetail.setClickProductId(row.getLong(7));

sessionDetail.setOrderCategoryIds(row.getString(8));

sessionDetail.setOrderProductIds(row.getString(9));

sessionDetail.setPayCategoryIds(row.getString(10));

sessionDetail.setPayProductIds(row.getString(11));

System.out.println(sessionDetail);

sessionDetailDAO.insert(sessionDetail);

});

}

用户访问session分析,开发小结

目前为止,我们做了些什么

1、公共组件

1.1 配置管理组件

1.2 JDBC辅助组件

1.3 工具类

1.4 模拟数据生成程序

1.5 单元测试

1.6 domain、dao

2、第一个模块:用户访问session分析模块

2.0 基础:session粒度聚合、按筛选条件进行过滤

2.1 session聚合统计:统计出访问时长和访问步长,各个区间范围的session数量,占总session数量的比例

2.2 session随机抽取:按时间比例,随机抽取出100个session

2.3 top10热门品类:获取通过筛选条件的session,点击、下单和支付次数最多的10个品类

2.4 top10活跃session:获取top10热门品类中,每个品类点击次数最多的10个session

3、技术点和知识点

3.1 正规的大型大数据项目的架构(公共组件的封装、包的划分、代码的规范)

3.2 复杂的大数据分析需求(纯spark作业代码,1500行+)

3.3 Spark Core大部分算子在实际项目中的综合应用实战:map、reduce、count、group

3.4 高级技术点:自定义Accumulator、按时间比例随机抽取算法、二次排序、分组取TopN算法

3.5 标准和正规的大数据项目开发流程:数据调研、需求分析、技术方案设计、数据库设计、编码实现、单元测试、本地测试

4、完成了第一个业务模块:用户访问session分析,开发

5、走完剩下的流程:性能调优、生产环境测试

6、接下来要做什么:

6.1 性能调优:按照本人开发过的大量的单个spark作业,处理10亿到100亿级别数据的经验,要针对我们写好的spark作业程序,实施十几个到二十个左右的复杂性调优技术;性能调优相关的原理讲解;性能调优技术的实施;实际经验中应用性能调优技术的经验总结;掌握一整套复杂的Spark企业级性能调优解决方案;而不只是简单的一些性能调优技巧(网上一些博客、其他一些视频、其他一些书)

6.2 数据倾斜解决方案:针对写好的spark作业,实施一整套数据倾斜解决方案:实际经验中积累的数据倾斜现象的表现,以及处理后的效果总结

6.3 troubleshooting:针对写好的spark作业,讲解实际经验中遇到的各种线上报错问题,以及解决方案

6.4 生产环境测试:Hive表

session分析之性能调优

分配更多资源:性能调优的王道,就是增加和分配更多的资源,性能和速度上的提升,是显而易见的;基本上,在一定范围之内,增加资源与性能的提升,是成正比的;写完了一个复杂的spark作业之后,进行性能调优的时候,首先第一步,我觉得,就是要来调节最优的资源配置;在这个基础之上,如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源了,公司资源有限;那么才是考虑去做后面的这些性能调优的点。

问题:

1、分配哪些资源

2、在哪里分配这些资源

3、为什么多分配了这些资源以后,性能会得到提升

答案:

1、分配哪些资源executor、cpu per executor、memory per executor、driver memory

2、在哪里分配这些资源在我们在生产环境中,提交spark作业时,用的spark-submit shell脚本,里面调整对应的参数

/usr/local/spark/bin/spark-submit \

--class cn.spark.sparktest.core.WordCountCluster \

--num-executors 3 \ 配置executor的数量

--driver-memory 100m \ 配置driver的内存(影响不大)

--executor-memory 100m \ 配置每个executor的内存大小

--executor-cores 3 \ 配置每个executor的cpu core数量

/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

3、调节到多大,算是最大呢

第一种,Spark Standalone,公司集群上,搭建了一套Spark集群,你心里应该清楚每台机器还能够给你使用的,大概有多少内存,多少cpu core;那么,设置的时候,就根据这个实际的情况,去调节每个spark作业的资源分配。比如说你的每台机器能够给你使用4G内存,2个cpu core;20台机器;executor,20;4G内存,2个cpu core,平均每个executor。

第二种,Yarn。资源队列。资源调度。应该去查看,你的spark作业,要提交到的资源队列,大概有多少资源500G内存,100个cpu core;executor,50;10G内存,2个cpu core,平均每个executor。

一个原则,你能使用的资源有多大,就尽量去调节到最大的大小(executor的数量,几十个到上百个不等;executor内存;executor cpu core)

4、为什么调节了资源以后,性能可以提升

\

增加每个executor的cpu core:

也是增加了执行的并行能力。原本20个executor,每个才2个cpu core。能够并行执行的task数量,就是40个task。

现在每个executor的cpu core,增加到了5个。能够并行执行的task数量,就是100个task。

执行的速度,提升了2.5倍。

增加executor:

如果executor数量比较少,那么,能够并行执行的task数量就比较少,就意味着,我们的Application的并行执行的能力就很弱。

比如有3个executor,每个executor有2个cpu core,那么同时能够并行执行的task,就是6个。6个执行完以后,再换下一批6个task。

增加了executor数量以后,那么,就意味着,能够并行执行的task数量,也就变多了。比如原先是6个,现在可能可以并行执行10个,甚至20个,100个。那么并行能力就比之前提升了数倍,数十倍。

相应的,性能(执行的速度),也能提升数倍~数十倍。

增加每个executor的内存量。增加了内存量以后,对性能的提升,有两点:

1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO。

2、对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘。减少了磁盘IO,提升了性能。

3、对于task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,速度变快了。

Spark基本运行原理:每个运行程序都是由一个个的job组成的,一个actoin操作触发一个job,每个job划分成多个stage,stage的划分在reduceByKey,groupByKey,sortByBey,join这种shuffle操作时,会拆分出一个新的stage,每个stage都有自己的一个或多个task,分布在各个服务器节点的excutor进程里面,每个task会有一个线程去执行他,每个task都会处理一小片数据,对一小片数据进行transformation操作,到拆分时给下一个stage每个task都写一份文件,1、每个文件中一定是存放相同的key对应的values,2、但是一个文件里面可能有多个key,以及对应的values,3、相同key的values一定进入同一个文件

Spark作业的task并行度

\

reduceByKey,上一个stage的task,在最后,执行到reduceByKey的时候,会为下一个stage每个的task,都创建一

份文件(也可能是合并在少量的文件里面);每个stage1的task,会去各个节点上的各个task创建的属于自己的那

一份文件里面,拉取数据;每个stage1的task,拉取到的数据,一定是相同key对应的数据。对相同的key,对应的

values,才能去执行我们自定义的function操作(_ + _)

并行度:其实就是指的是,Spark作业中,各个stage的task数量,也就代表了Spark作业的在各个阶段(stage)的并行度。

如果不调节并行度,导致并行度过低,会怎么样

假设,现在已经在spark-submit脚本里面,给我们的spark作业分配了足够多的资源,比如50个executor,每个executor有10G内存,每个executor有3个cpu core。基本已经达到了集群或者yarn队列的资源上限。

task没有设置,或者设置的很少,比如就设置了,100个task。50个executor,每个executor有3个cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,可以并行运行。但是你现在,只有100个task,平均分配一下,每个executor分配到2个task,ok,那么同时在运行的task,只有100个,每个executor只会并行运行2个task。每个executor剩下的一个cpu core,就浪费掉了。

你的资源虽然分配足够了,但是问题是,并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。

合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源;比如上面的例子,总共集群有150个cpu core,可以并行运行150个task。那么就应该将你的Application的并行度,至少设置成150,才能完全有效的利用你的集群资源,让150个task,并行执行;而且task增加到150个以后,即可以同时并行运行,还可以让每个task要处理的数据量变少;比如总共150G的数据要处理,如果是100个task,每个task计算1.5G的数据;现在增加到150个task,可以并行运行,而且每个task主要处理1G的数据就可以。

很简单的道理,只要合理设置并行度,就可以完全充分利用你的集群计算资源,并且减少每个task要处理的数据量,最终,就是提升你的整个Spark作业的性能和运行速度。

1、task数量,至少设置成与Spark application的总cpu core数量相同(最理想情况,比如总共150个cpu core,分配了150个task,一起运行,差不多同一时间运行完毕)

2、官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,比如150个cpu core,基本要设置task数量为300~500;

实际情况,与理想情况不同的,有些task会运行的快一点,比如50s就完了,有些task,可能会慢一点,要1分半才运行完,所以如果你的task数量,刚好设置的跟cpu core数量相同,可能还是会导致资源的浪费,因为,比如150个task,10个先运行完了,剩余140个还在运行,但是这个时候,有10个cpu core就空闲出来了,就导致了浪费。那如果task数量设置成cpu core总数的2~3倍,那么一个task运行完了以后,另一个task马上可以补上来,就尽量让cpu core不要空闲,同时也是尽量提升spark作业运行的效率和速度,提升性能。

3、如何设置一个Spark Application的并行度

spark.default.parallelism

SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")

“重剑无锋”:真正有分量的一些技术和点,其实都是看起来比较平凡,看起来没有那么“炫酷”,但是其实是你每次写完一个spark作业,进入性能调优阶段的时候,应该优先调节的事情,就是这些(大部分时候,可能资源和并行度到位了,spark作业就很快了,几分钟就跑完了)

“炫酷”:数据倾斜(100个spark作业,最多10个会出现真正严重的数据倾斜问题),感冒和发烧,你不能上来就用一些偏方(癌症,用癞蛤蟆熬煮汤药);JVM调优;

RDD重构以及持久化

\

第一,RDD架构重构与优化

尽量去复用RDD,差不多的RDD,可以抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。

第二,公共RDD一定要实现持久化

对于要多次计算和使用的公共RDD,一定要进行持久化。

持久化,也就是说,将RDD的数据缓存到内存中 / 磁盘中,(BlockManager),以后无论对这个RDD做多少次计算,那么都是直接取这个RDD的持久化的数据,比如从内存中或者磁盘中,直接提取一份数据。

第三,持久化,是可以进行序列化的

如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许,会导致OOM内存溢出。

当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减少内存的空间占用。

序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。

如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。

内存+磁盘,序列化

第四,为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化

持久化的双副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;持久化的每个数据单元,存储一份副本,放在其他节点上面;从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。

这种方式,仅仅针对你的内存资源极度充足

将RDD缓存到内存中

session2DetailRDD = session2DetailRDD.persist(StorageLevel.MEMORY_ONLY());

性能调优之在实际项目中广播大变量

用户访问session分析模块中的,按时间比例随机抽取

\

如果你是从哪个表里面读取了一些维度数据,比方说,所有商品品类的信息,在某个算子函数中要使用到。100M。

1000个task。100G的数据,网络传输。集群瞬间因为这个原因消耗掉100G的内存。

\

这种默认的,task执行的算子中,使用了外部的变量,每个task都会获取一份变量的副本,有什么缺点呢在什么情况下,会出现性能上的恶劣的影响呢

存在的问题:

map,本身是不小,存放数据的一个单位是Entry,还有可能会用链表的格式的来存放Entry链条。所以map是比较消耗内存的数据格式。

比如,map是1M。总共,你前面调优都调的特好,资源给的到位,配合着资源,并行度调节的绝对到位,1000个task。大量task的确都在并行运行。

这些task里面都用到了占用1M内存的map,那么首先,map会拷贝1000份副本,通过网络传输到各个task中去,给task使用。总计有1G的数据,会通过网络传输。网络传输的开销,不容乐观啊!!!网络传输,也许就会消耗掉你的spark作业运行的总时间的一小部分。

map副本,传输到了各个task上之后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,一下子就耗费掉1G的内存。对性能会有什么影响呢

不必要的内存的消耗和占用,就导致了,你在进行RDD持久化到内存,也许就没法完全在内存中放下;就只能写入磁盘,最后导致后续的操作在磁盘IO上消耗性能;

你的task在创建对象的时候,也许会发现堆内存放不下所有对象,也许就会导致频繁的垃圾回收器的回收,GC。GC的时候,一定是会导致工作线程停止,也就是导致Spark暂停工作那么一点时间。频繁GC的话,对Spark作业的运行的速度会有相当可观的影响。

性能调优值在项目中使用Kryo序列化

默认情况下,Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化

这种默认序列化机制的好处在于,处理起来比较方便;也不需要我们手动去做什么事情,只是,你在算子里面使用的变量,必须是实现Serializable接口的,可序列化即可。

但是缺点在于,默认的序列化机制的效率不高,序列化的速度比较慢;序列化以后的数据,占用的内存空间相对还是比较大。

可以手动进行序列化格式的优化

Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。

所以Kryo序列化优化以后,可以让网络传输的数据变少;在集群中耗费的内存资源大大减少。

Kryo序列化机制,一旦启用以后,会生效的几个地方:

1、算子函数中使用到的外部变量

算子函数中使用到的外部变量,使用Kryo以后:优化网络传输的性能,可以优化集群中内存的占用和消耗

2、持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER

持久化RDD,优化内存的占用和消耗;持久化RDD占用的内存越少,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC。

3、shuffle:可以优化网络传输的性能

在进行stage间的task的shuffle操作时,节点与节点之间的task会互相大量通过网络拉取和传输文件,此时,这些数据既然通过网络传输,也是可能要序列化的,就会使用Kryo

代码设置:SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

首先第一步,在SparkConf中设置一个属性,spark.serializer,org.apache.spark.serializer.KryoSerializer类;

Kryo之所以没有被作为默认的序列化类库的原因,就要出现了:主要是因为Kryo要求,如果要达到它的最佳性能的话,那么就一定要注册你自定义的类(比如,你的算子函数中使用到了外部自定义类型的对象变量,这时,就要求必须注册你的类,否则Kryo达不到最佳性能)。

第二步,注册你使用到的,需要通过Kryo序列化的,一些自定义类,SparkConf.registerKryoClasses()

项目中的使用:

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

.registerKryoClasses(new Class[]{CategorySortKey.class})

//构建Spark上下文

SparkConf conf = new SparkConf().setAppName(SparkContantsParam.USER_VISIT_APPNAME)

.setMaster(SparkContantsParam.USER_VISIT_MASTER)

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

.registerKryoClasses(new Class[]{CategorySortKey.class});

fastutil优化数据格式

fastutil介绍:

fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue;

fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,可以减小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度;

fastutil也提供了64位的array、set和list,以及高性能快速的,以及实用的IO类,来处理二进制和文本类型的文件;

fastutil最新版本要求Java 7以及以上版本;

fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实现了Java的Map接口),因此可以直接放入已有系统的任何代码中。

fastutil还提供了一些JDK标准类库中没有的额外功能(比如双向迭代器)。

fastutil除了对象和原始类型为元素的集合,fastutil也提供引用类型的支持,但是对引用类型是使用等于号(=)进行比较的,而不是equals()方法。

fastutil尽量提供了在任何场景下都是速度最快的集合类库。

Spark中应用fastutil的场景:

1、如果算子函数使用了外部变量;那么第一,你可以使用Broadcast广播变量优化;第二,可以使用Kryo序列化类库,提升序列化性能和效率;第三,如果外部变量是某种比较大的集合,那么可以考虑使用fastutil改写外部变量,首先从源头上就减少内存的占用,通过广播变量进一步减少内存占用,再通过Kryo序列化类库进一步减少内存占用。

2、在你的算子函数里,也就是task要执行的计算逻辑里面,如果有逻辑中,出现,要创建比较大的Map、List等集合,可能会占用较大的内存空间,而且可能涉及到消耗性能的遍历、存取等集合操作;那么此时,可以考虑将这些集合类型使用fastutil类库重写,使用了fastutil集合类以后,就可以在一定程度上,减少task创建出来的集合类型的内存占用。避免executor内存频繁占满,频繁唤起GC,导致性能下降。

关于fastutil调优的说明:

fastutil其实没有你想象中的那么强大,也不会跟官网上说的效果那么一鸣惊人。广播变量、Kryo序列化类库、fastutil,都是之前所说的,对于性能来说,类似于一种调味品,烤鸡,本来就很好吃了,然后加了一点特质的孜然麻辣粉调料,就更加好吃了一点。分配资源、并行度、RDD架构与持久化,这三个就是烤鸡;broadcast、kryo、fastutil,类似于调料。

比如说,你的spark作业,经过之前一些调优以后,大概30分钟运行完,现在加上broadcast、kryo、fastutil,也许就是优化到29分钟运行完、或者更好一点,也许就是28分钟、25分钟。

shuffle调优,15分钟;groupByKey用reduceByKey改写,执行本地聚合,也许10分钟;跟公司申请更多的资源,比如资源更大的YARN队列,1分钟。

fastutil的使用:

第一步:在pom.xml中引用fastutil的包

fastutil

fastutil

5.0.9

速度比较慢,可能是从国外的网去拉取jar包,可能要等待5分钟,甚至几十分钟,不等

List => IntList

基本都是类似于IntList的格式,前缀就是集合的元素类型;特殊的就是Map,Int2IntMap,代表了key-value映射的元素类型。除此之外,刚才也看到了,还支持object、reference。

调节数据本地化等待时长

PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中;性能最好

NODE_LOCAL:节点本地化,代码和数据在同一个节点中;比如说,数据作为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中;数据需要在进程间进行传输

NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分

RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上;数据需要通过网络在节点之间进行传输

ANY:数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差

spark.locality.wait,默认是3s

本地化的原理

Spark在Driver上,对Application的每一个stage的task,进行分配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partition;Spark的task分配算法,优先,会希望每个task正好分配到它要计算的数据所在的节点,这样的话,就不用在网络间传输数据;

但是呢,通常来说,有时,事与愿违,可能task没有机会分配到它的数据所在的节点,为什么呢,可能那个节点的计算资源和计算能力都满了;所以呢,这种时候,通常来说,Spark会等待一段时间,默认情况下是3s钟(不是绝对的,还有很多种情况,对不同的本地化级别,都会去等待),到最后,实在是等待不了了,就会选择一个比较差的本地化级别,比如说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,然后进行计算。

但是对于第二种情况,通常来说,肯定是要发生数据传输,task会通过其所在节点的BlockManager来获取数据,BlockManager发现自己本地没有数据,会通过一个getRemote()方法,通过TransferService(网络数据传输组件)从数据所在节点的BlockManager中,获取数据,通过网络传输回task所在节点。

对于我们来说,当然不希望是类似于第二种情况的了。最好的,当然是task和数据在一个节点上,直接从本地executor的BlockManager中获取数据,纯内存,或者带一点磁盘IO;如果要通过网络传输数据的话,那么实在是,性能肯定会下降的,大量网络传输,以及磁盘IO,都是性能的杀手。

各种情况

最佳情况:直接在一个executor进程内,走内存速度最佳

还不错的情况:一个worker中由两个excutor,当一个excutor中的计算资源已经满了,那么他的task就运行在另外一个excutor上,同样走内存,只是这个task需要跨进程取原来的excutor对应的BlockManager取数据,速度也不慢。

跨节点的情况:task被分配到了其他就近节点上的excutor中,当他需要取数据时就需要跨节点取数据,势必涉及到网络开销

跨机架的情况:最差的就是这种跨机架拉取数据的方式了。速度非常慢,对性能的影响,相当大。

我们什么时候要调节这个参数

观察日志,spark作业的运行日志,推荐大家在测试的时候,先用client模式,在本地就直接可以看到比较全的日志。

日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL

观察大部分task的数据本地化级别

如果大多都是PROCESS_LOCAL,那就不用调节了

如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长

调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志

看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短

你别本末倒置,本地化级别倒是提升了,但是因为大量的等待时长,spark作业的运行时间反而增加了,那就还是不要调节了

怎么调节

spark.locality.wait,默认是3s;6s,10s

默认情况下,下面3个的等待时长,都是跟上面那个是一样的,都是3s

spark.locality.wait.process

spark.locality.wait.node

spark.locality.wait.rack

new SparkConf().set("spark.locality.wait", "10")

JVM调优之降低cache内存占比

理论基础:

spark是用scala开发的。

spark的scala代码调用了很多java api。scala也是运行在java虚拟机中的。spark是运行在java虚拟机中的。

java虚拟机可能会产生什么样的问题:内存不足!!

我们的RDD的缓存、task运行定义的算子函数,可能会创建很多对象。都可能会占用大量内存,没搞好的话,可能导致JVM出问题。

1、常规性能调优:分配资源、并行度。。。等

2、JVM调优(Java虚拟机):JVM相关的参数,通常情况下,如果你的硬件配置、基础的JVM的配置,都ok的话,JVM通常不会造成太严重的性能问题;反而更多的是,在troubleshooting中,JVM占了很重要的地位;JVM造成线上的spark作业的运行报错,甚至失败(比如OOM)。

3、shuffle调优(相当重要):spark在执行groupByKey、reduceByKey等操作时的,shuffle环节的调优。这个很重要。shuffle调优,其实对spark作业的性能的影响,是相当之高!!!经验:在spark作业的运行过程中,只要一牵扯到有shuffle的操作,基本上shuffle操作的性能消耗,要占到整个spark作业的50%~90%。10%用来运行map等操作,90%耗费在两个shuffle操作。groupByKey、countByKey。

4、spark操作调优(spark算子调优,比较重要):groupByKey,countByKey或aggregateByKey来重构实现。有些算子的性能,是比其他一些算子的性能要高的。foreachPartition替代foreach。如果一旦遇到合适的情况,效果还是不错的。

1、分配资源、并行度、RDD架构与缓存

2、shuffle调优

3、spark算子调优

4、JVM调优、广播大变量。。。

原理概述

每一次放对象的时候,都是放入eden区域,和其中一个survivor区域;另外一个survivor区域是空闲的。

当eden区域和一个survivor区域放满了以后(spark运行过程中,产生的对象实在太多了),就会触发minor gc,小型垃圾回收。把不再使用的对象,从内存中清空,给后面新创建的对象腾出来点儿地方。

清理掉了不再使用的对象之后,那么也会将存活下来的对象(还要继续使用的),放入之前空闲的那一个survivor区域中。这里可能会出现一个问题。默认eden、survior1和survivor2的内存占比是8:1:1。问题是,如果存活下来的对象是1.5,一个survivor区域放不下。此时就可能通过JVM的担保机制(不同JVM版本可能对应的行为),将多余的对象,直接放入老年代了。

如果你的JVM内存不够大的话,可能导致频繁的年轻代内存满溢,频繁的进行minor gc。频繁的minor gc会导致短时间内,有些存活的对象,多次垃圾回收都没有回收掉。会导致这种短声明周期(其实不一定是要长期使用的)对象,年龄过大,垃圾回收次数太多还没有回收到,跑到老年代。

老年代中,可能会因为内存不足,囤积一大堆,短生命周期的,本来应该在年轻代中的,可能马上就要被回收掉的对象。此时,可能导致老年代频繁满溢。频繁进行full gc(全局/全面垃圾回收)。full gc就会去回收老年代中的对象。full gc由于这个算法的设计,是针对的是,老年代中的对象数量很少,满溢进行full gc的频率应该很少,因此采取了不太复杂,但是耗费性能和时间的垃圾回收算法。full gc很慢。

full gc / minor gc,无论是快,还是慢,都会导致jvm的工作线程停止工作,stop the world。简而言之,就是说,gc的时候,spark停止工作了。等着垃圾回收结束。

内存不充足的时候,问题:

1、频繁minor gc,也会导致频繁spark停止工作

2、老年代囤积大量活跃对象(短生命周期的对象),导致频繁full gc,full gc时间很长,短则数十秒,长则数分钟,甚至数小时。可能导致spark长时间停止工作。

3、严重影响咱们的spark的性能和运行的速度。

JVM调优的第一个点

降低cache操作的内存占比

spark中,堆内存又被划分成了两块儿,一块儿是专门用来给RDD的cache、persist操作进行RDD数据缓存用的;另外一块儿,就是我们刚才所说的,用来给spark算子函数的运行使用的,存放函数中自己创建的对象。

默认情况下,给RDD cache操作的内存占比,是0.6,60%的内存都给了cache操作了。但是问题是,如果某些情况下,cache不是那么的紧张,问题在于task算子函数中创建的对象过多,然后内存又不太大,导致了频繁的minor gc,甚至频繁full gc,导致spark频繁的停止工作。性能影响会很大。

针对上述这种情况,大家可以在之前我们讲过的那个spark ui。yarn去运行的话,那么就通过yarn的界面,去查看你的spark作业的运行统计,很简单,大家一层一层点击进去就好。可以看到每个stage的运行情况,包括每个task的运行时间、gc时间等等。如果发现gc太频繁,时间太长。此时就可以适当调价这个比例。

降低cache操作的内存占比,大不了用persist操作,选择将一部分缓存的RDD数据写入磁盘,或者序列化方式,配合Kryo序列化类,减少RDD缓存的内存占用;降低cache操作内存占比;对应的,算子函数的内存占比就提升了。这个时候,可能,就可以减少minor gc的频率,同时减少full gc的频率。对性能的提升是有一定的帮助的。

一句话,让task执行算子函数时,有更多的内存可以使用。

spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2

JVM优化调节excutor堆外内存以及连接等待时长

/usr/local/spark/bin/spark-submit \

--class com.ibeifeng.sparkstudy.WordCount \

--num-executors 80 \ (50-100)

--driver-memory 6g \(1g-5g)

--executor-memory 6g \(6g-10g)

--executor-cores 3 \ (每次占用3个cpu core)

--master yarn-cluster \

--queue root.default \

--conf spark.yarn.executor.memoryOverhead=2048 \ 堆外内存大小

--conf spark.core.connection.ack.wait.timeout=300 \ 连接超时时间

/usr/local/spark/spark.jar \

${1}(main函数需要的参数)

executor堆外内存

有时候,如果你的spark作业处理的数据量特别特别大,几亿数据量;然后spark作业一运行,时不时的报错,shuffle file cannot find,executor、task lost,out of memory(内存溢出);

可能是说executor的堆外内存不太够用,导致executor在运行的过程中,可能会内存溢出;然后可能导致后续的stage的task在运行的时候,可能要从一些executor中去拉取shuffle map output文件,但是executor可能已经挂掉了,关联的block manager也没有了;所以可能会报shuffle output file not found;resubmitting task;executor lost;spark作业彻底崩溃。

上述情况下,就可以去考虑调节一下executor的堆外内存。也许就可以避免报错;此外,有时,堆外内存调节的比较大的时候,对于性能来说,也会带来一定的提升。

\

--conf spark.yarn.executor.memoryOverhead=2048

spark-submit脚本里面,去用--conf的方式,去添加配置;一定要注意!!!切记,不是在你的spark作业代码中,用new SparkConf().set()这种方式去设置,不要这样去设置,是没有用的!一定要在spark-submit脚本中去设置。

spark.yarn.executor.memoryOverhead(看名字,顾名思义,针对的是基于yarn的提交模式)

默认情况下,这个堆外内存上限大概是300多M;后来我们通常项目中,真正处理大数据的时候,这里都会出现问题,导致spark作业反复崩溃,无法运行;此时就会去调节这个参数,到至少1G(1024M),甚至说2G、4G

通常这个参数调节上去以后,就会避免掉某些JVM OOM的异常问题,同时呢,会让整体spark作业的性能,得到较大的提升。

\

此时呢,就会没有响应,无法建立网络连接;会卡住;ok,spark默认的网络连接的超时时长,是60s;如果卡住60s都无法建立连接的话,那么就宣告失败了。

碰到一种情况,偶尔,偶尔,偶尔!!!没有规律!!!某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。

这种情况下,很有可能是有那份数据的executor在jvm gc。所以拉取数据的时候,建立不了连接。然后超过默认60s以后,直接宣告失败。

报错几次,几次都拉取不到数据的话,可能会导致spark作业的崩溃。也可能会导致DAGScheduler,反复提交几次stage。TaskScheduler,反复提交几次task。大大延长我们的spark作业的运行时间。

可以考虑调节连接的超时时长。

--conf spark.core.connection.ack.wait.timeout=300

spark-submit脚本,切记,不是在new SparkConf().set()这种方式来设置的。

spark.core.connection.ack.wait.timeout(spark core,connection,连接,ack,wait timeout,建立不上连接的时候,超时等待时长)

调节这个值比较大以后,通常来说,可以避免部分的偶尔出现的某某文件拉取失败,某某文件lost掉了。。。

为什么在这里讲这两个参数呢

因为比较实用,在真正处理大数据(不是几千万数据量、几百万数据量),几亿,几十亿,几百亿的时候。很容易碰到executor堆外内存,以及gc引起的连接超时的问题。file not found,executor lost,task lost。

调节上面两个参数,还是很有帮助的。

Shuffle调优原理介绍

什么样的情况下,会发生shuffle

在spark中,主要是以下几个算子:groupByKey、reduceByKey、countByKey、join,等等。

什么是shuffle

groupByKey,要把分布在集群各个节点上的数据中的同一个key,对应的values,都给集中到一块儿,集中到集群中同一个节点上,更严密一点说,就是集中到一个节点的一个executor的一个task中。

然后呢,集中一个key对应的values之后,才能交给我们来进行处理,>;reduceByKey,算子函数去对values集合进行reduce操作,最后变成一个value;countByKey,需要在一个task中,获取到一个key对应的所有的value,然后进行计数,统计总共有多少个value;join,RDD,RDD,只要是两个RDD中,key相同对应的2个value,都能到一个节点的executor的task中,给我们进行处理。

\

reduceByKey(_+_)

问题在于,同一个单词,比如说(hello, 1),可能散落在不同的节点上;对每个单词进行累加计数,就必须让所有单词都跑到同一个节点的一个task中,给一个task来进行处理。

每一个shuffle的前半部分stage的task,每个task都会创建下一个stage的task数量相同的文件,比如下一个stage会有100个task,那么当前stage每个task都会创建100份文件;会将同一个key对应的values,一定是写入同一个文件中的;不同节点上的task,也一定会将同一个key对应的values,写入下一个stage,同一个task对应的文件中。

shuffle的后半部分stage的task,每个task都会从各个节点上的task写的属于自己的那一份文件中,拉取key, value对;然后task会有一个内存缓冲区,然后会用HashMap,进行key, values的汇聚;(key ,values);

task会用我们自己定义的聚合函数,比如reduceByKey(_+_),把所有values进行一对一的累加;聚合出来最终的值。就完成了shuffle。

shuffle前半部分的task在写入数据到磁盘文件之前,都会先写入一个一个的内存缓冲,内存缓冲满溢之后,再spill溢写到磁盘文件中。

shuffle,一定是分为两个stage来完成的。因为这其实是个逆向的过程,不是stage决定shuffle,是shuffle决定stage。

reduceByKey(_+_),在某个action触发job的时候,DAGScheduler,会负责划分job为多个stage。划分的依据,就是,如果发现有会触发shuffle操作的算子,比如reduceByKey,就将这个操作的前半部分,以及之前所有的RDD和transformation操作,划分为一个stage;shuffle操作的后半部分,以及后面的,直到action为止的RDD和transformation操作,划分为另外一个stage。

Shuffle调优之合并map端输出文件

\

问题来了:默认的这种shuffle行为,对性能有什么样的恶劣影响呢

实际生产环境的条件:

100个节点(每个节点一个executor):100个executor

每个executor:2个cpu core

总共1000个task:每个executor平均10个task

每个节点,10个task,每个节点会输出多少份map端文件10 * 1000=1万个文件

总共有多少份map端输出文件100 * 10000 = 100万。

第一个stage,每个task,都会给第二个stage的每个task创建一份map端的输出文件

第二个stage,每个task,会到各个节点上面去,拉取第一个stage每个task输出的,属于自己的那一份文件。

shuffle中的写磁盘的操作,基本上就是shuffle中性能消耗最为严重的部分。

通过上面的分析,一个普通的生产环境的spark job的一个shuffle环节,会写入磁盘100万个文件。

磁盘IO对性能和spark作业执行速度的影响,是极其惊人和吓人的。

基本上,spark作业的性能,都消耗在shuffle中了,虽然不只是shuffle的map端输出文件这一个部分,但是这里也是非常大的一个性能消耗点。

优化

针对于这种情况进行设置

开启shuffle map端输出文件合并的机制;

new SparkConf().set("spark.shuffle.consolidateFiles", "true");

默认情况下,是不开启的,就是会发生如上所述的大量map端输出文件的操作,严重影响性能。

\

开启了map端输出文件的合并机制之后:

第一个stage,同时就运行cpu core个task,比如cpu core是2个,并行运行2个task;每个task都创建下一个stage的task数量个文件;

第一个stage,并行运行的2个task执行完以后;就会执行另外两个task;另外2个task不会再重新创建输出文件;而是复用之前的task创建的map端输出文件,将数据写入上一批task的输出文件中。

第二个stage,task在拉取数据的时候,就不会去拉取上一个stage每一个task为自己创建的那份输出文件了;而是拉取少量的输出文件,每个输出文件中,可能包含了多个task给自己的map端输出。

提醒一下(map端输出文件合并):

只有并行执行的task会去创建新的输出文件;下一批并行执行的task,就会去复用之前已有的输出文件;但是有一个例外,比如2个task并行在执行,但是此时又启动要执行2个task;那么这个时候的话,就无法去复用刚才的2个task创建的输出文件了;而是还是只能去创建新的输出文件。

要实现输出文件的合并的效果,必须是一批task先执行,然后下一批task再执行,才能复用之前的输出文件;负责多批task同时起来执行,还是做不到复用的。

开启了map端输出文件合并机制之后,生产环境上的例子,会有什么样的变化

实际生产环境的条件:

100个节点(每个节点一个executor):100个executor

每个executor:2个cpu core

总共1000个task:每个executor平均10个task

每个节点,2个cpu core,有多少份输出文件呢2 * 1000 = 2000个

总共100个节点,总共创建多少份输出文件呢100 * 2000 = 20万个文件

相比较开启合并机制之前的情况,100万个

map端输出文件,在生产环境中,立减5倍!

合并map端输出文件,对咱们的spark的性能有哪些方面的影响呢

1、map task写入磁盘文件的IO,减少:100万文件 -> 20万文件

2、第二个stage,原本要拉取第一个stage的task数量份文件,1000个task,第二个stage的每个task,都要拉取1000份文件,走网络传输;合并以后,100个节点,每个节点2个cpu core,第二个stage的每个task,主要拉取100 * 2 = 200个文件即可;网络传输的性能消耗是不是也大大减少

分享一下,实际在生产环境中,使用了spark.shuffle.consolidateFiles机制以后,实际的性能调优的效果:对于上述的这种生产环境的配置,性能的提升,还是相当的客观的。spark作业,5个小时 -> 2~3个小时。

大家不要小看这个map端输出文件合并机制。实际上,在数据量比较大,你自己本身做了前面的性能调优,executor上去->cpu core上去->并行度(task数量)上去,shuffle没调优,shuffle就很糟糕了;大量的map端输出文件的产生。对性能有比较恶劣的影响。

这个时候,去开启这个机制,可以很有效的提升性能。

Shuffel调优之map端内存缓冲和reduce端内存占比

默认情况下,shuffle的map task,输出到磁盘文件的时候,统一都会先写入每个task自己关联的一个内存缓冲区。

这个缓冲区大小,默认是32kb。

每一次,当内存缓冲区满溢之后,才会进行spill操作,溢写操作,溢写到磁盘文件中去。

\

默认情况下,shuffle的map task,输出到磁盘文件的时候,统一都会先写入每个task自己关联的一个内存缓冲区。

这个缓冲区大小,默认是32kb。

每一次,当内存缓冲区满溢之后,才会进行spill操作,溢写操作,溢写到磁盘文件中去。

运行原理

reduce端task,在拉取到数据之后,会用hashmap的数据格式,来对各个key对应的values进行汇聚。针对每个key对应的values,执行我们自定义的聚合函数的代码,比如_ + _(把所有values累加起来)

reduce task,在进行汇聚、聚合等操作的时候,实际上,使用的就是自己对应的executor的内存,executor(jvm进程,堆),默认executor内存中划分给reduce task进行聚合的比例,是0.2。

问题来了,因为比例是0.2,所以,理论上,很有可能会出现,拉取过来的数据很多,那么在内存中,放不下;这个时候,默认的行为,将在内存放不下的数据,都spill(溢写)到磁盘文件中去。

可能出现的问题

原理说完之后,来看一下,默认情况下,不调优,可能会出现什么样的问题

默认,map端内存缓冲是每个task,32kb。

默认,reduce端聚合内存比例,是0.2,也就是20%。

如果map端的task,处理的数据量比较大,但是呢,你的内存缓冲大小是固定的。可能会出现什么样的情况

每个task就处理320kb,32kb,总共会向磁盘溢写320 / 32 = 10次。

每个task处理32000kb,32kb,总共会向磁盘溢写32000 / 32 = 1000次。

在map task处理的数据量比较大的情况下,而你的task的内存缓冲默认是比较小的,32kb。可能会造成多次的map端往磁盘文件的spill溢写操作,发生大量的磁盘IO,从而降低性能。

reduce端聚合内存,占比。默认是0.2。如果数据量比较大,reduce task拉取过来的数据很多,那么就会频繁发生reduce端聚合内存不够用,频繁发生spill操作,溢写到磁盘上去。而且最要命的是,磁盘上溢写的数据量越大,后面在进行聚合操作的时候,很可能会多次读取磁盘中的数据,进行聚合。

默认不调优,在数据量比较大的情况下,可能频繁地发生reduce端的磁盘文件的读写。

这两个点之所以放在一起,是因为他们俩是有关联的。数据量变大,map端肯定会出点问题;reduce端肯定也会出点问题;出的问题是一样的,都是磁盘IO频繁,变多,影响性能。

调优

调节map task内存缓冲:spark.shuffle.file.buffer,默认32k(spark 1.3.x不是这个参数,后面还有一个后缀,kb;spark 1.5.x以后,变了,就是现在这个参数)

调节reduce端聚合内存占比:spark.shuffle.memoryFraction,0.2

在实际生产环境中,我们在什么时候来调节两个参数

看Spark UI,如果你的公司是决定采用standalone模式,那么狠简单,你的spark跑起来,会显示一个Spark UI的地址,4040的端口,进去看,依次点击进去,可以看到,你的每个stage的详情,有哪些executor,有哪些task,每个task的shuffle write和shuffle read的量,shuffle的磁盘和内存,读写的数据量;如果是用的yarn模式来提交,课程最前面,从yarn的界面进去,点击对应的application,进入Spark UI,查看详情。

如果发现shuffle 磁盘的write和read,很大。这个时候,就意味着最好调节一些shuffle的参数。进行调优。首先当然是考虑开启map端输出文件合并机制。

调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer,每次扩大一倍,然后看看效果,64,128;spark.shuffle.memoryFraction,每次提高0.1,看看效果。

不能调节的太大,太大了以后过犹不及,因为内存资源是有限的,你这里调节的太大了,其他环节的内存使用就会有问题了。

调节了以后,效果map task内存缓冲变大了,减少spill到磁盘文件的次数;reduce端聚合内存变大了,减少spill到磁盘的次数,而且减少了后面聚合读取磁盘文件的数量。

Shuffle调优之HashShuffleManager与SortShuffleManager

spark.shuffle.manager:hash、sort、tungsten-sort(自己实现内存管理)

spark.shuffle.sort.bypassMergeThreshold:200

首先先声明一点:

之前我们所讲的,其实都是已经属于Spark中,比较老旧的一种shuffle manager,HashShuffleManager;这种manager,实际上,从spark 1.2.x版本以后,就不再是默认的选择了。

HashShuffleManager的原理,以及对应的一些性能调优的点,基本上,之前几讲,咱们就都讲过了。

spark 1.2.x版本以后,默认的shuffle manager,是什么呢SortShuffleManager。

\

SortShuffleManager与HashShuffleManager两点不同:

1、SortShuffleManager会对每个reduce task要处理的数据,进行排序(默认的)。

2、SortShuffleManager会避免像HashShuffleManager那样,默认就去创建多份磁盘文件。一个task,只会写入一个磁盘文件,不同reduce task的数据,用offset来划分界定。

\

自己可以设定一个阈值,默认是200,当reduce task数量少于等于200;map task创建的输出文件小于等于200的;最后会将所有的输出文件合并为一份文件。

这样做的好处,就是避免了sort排序,节省了性能开销。而且还能将多个reduce task的文件合并成一份文件。节省了reduce task拉取数据的时候的磁盘IO的开销。

在spark 1.5.x以后,对于shuffle manager又出来了一种新的manager,tungsten-sort(钨丝),钨丝sort shuffle manager。官网上一般说,钨丝sort shuffle manager,效果跟sort shuffle manager是差不多的。

但是,唯一的不同之处在于,钨丝manager,是使用了自己实现的一套内存管理机制,性能上有很大的提升, 而且可以避免shuffle过程中产生的大量的OOM,GC,等等内存相关的异常。

来一个总结,现在相当于把spark的shuffle的东西又多讲了一些。大家理解的更加深入了。hash、sort、tungsten-sort。如何来选择

1、需不需要数据默认就让spark给你进行排序就好像mapreduce,默认就是有按照key的排序。如果不需要的话,其实还是建议搭建就使用最基本的HashShuffleManager,因为最开始就是考虑的是不排序,换取高性能;

2、什么时候需要用sort shuffle manager如果你需要你的那些数据按key排序了,那么就选择这种吧,而且要注意,reduce task的数量应该是超过200的,这样sort、merge(多个文件合并成一个)的机制,才能生效把。但是这里要注意,你一定要自己考量一下,有没有必要在shuffle的过程中,就做这个事情,毕竟对性能是有影响的。

3、如果你不需要排序,而且你希望你的每个task输出的文件最终是会合并成一份的,你自己认为可以减少性能开销;可以去调节bypassMergeThreshold这个阈值,比如你的reduce task数量是500,默认阈值是200,所以默认还是会进行sort和直接merge的;可以将阈值调节成550,不会进行sort,按照hash的做法,每个reduce task创建一份输出文件,最后合并成一份文件。(一定要提醒大家,这个参数,其实我们通常不会在生产环境里去使用,也没有经过验证说,这样的方式,到底有多少性能的提升)

4、如果你想选用sort based shuffle manager,而且你们公司的spark版本比较高,是1.5.x版本的,那么可以考虑去尝试使用tungsten-sort shuffle manager。看看性能的提升与稳定性怎么样。

总结:

1、在生产环境中,不建议大家贸然使用第三点和第四点:

2、如果你不想要你的数据在shuffle时排序,那么就自己设置一下,用hash shuffle manager。

3、如果你的确是需要你的数据在shuffle时进行排序的,那么就默认不用动,默认就是sort shuffle manager;或者是什么如果你压根儿不care是否排序这个事儿,那么就默认让他就是sort的。调节一些其他的参数(consolidation机制)。(80%,都是用这种)

spark.shuffle.manager:hash、sort、tungsten-sort

new SparkConf().set("spark.shuffle.manager", "hash")

new SparkConf().set("spark.shuffle.manager", "tungsten-sort")

// 默认就是,new SparkConf().set("spark.shuffle.manager", "sort")

new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")

算子调优之MapPartitions提升Map类操作性能

spark中,最基本的原则,就是每个task处理一个RDD的partition。

MapPartitions操作的优点:

如果是普通的map,比如一个partition中有1万条数据;ok,那么你的function要执行和计算1万次。

但是,使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。

MapPartitions的缺点:一定是有的。

如果是普通的map操作,一次function的执行就处理一条数据;那么如果内存不够用的情况下,比如处理了1千条数据了,那么这个时候内存不够了,那么就可以将已经处理完的1千条数据从内存里面垃圾回收掉,或者用其他方法,腾出空间来吧。

所以说普通的map操作通常不会导致内存的OOM异常。

但是MapPartitions操作,对于大量数据来说,比如甚至一个partition,100万数据,一次传入一个function以后,那么可能一下子内存不够,但是又没有办法去腾出内存空间来,可能就OOM,内存溢出。

什么时候比较适合用MapPartitions系列操作

就是说,数据量不是特别大的时候,都可以用这种MapPartitions系列操作,性能还是非常不错的,是有提升的。比如原来是15分钟,(曾经有一次性能调优),12分钟。10分钟->9分钟。

但是也有过出问题的经验,MapPartitions只要一用,直接OOM,内存溢出,崩溃。

在项目中,自己先去估算一下RDD的数据量,以及每个partition的量,还有自己分配给每个executor的内存资源。看看一下子内存容纳所有的partition数据,行不行。如果行,可以试一下,能跑通就好。性能肯定是有提升的。

但是试了一下以后,发现,不行,OOM了,那就放弃吧。

算子调优之filter过后使用coalesce减少分区数量

\

默认情况下,经过了这种filter之后,RDD中的每个partition的数据量,可能都不太一样了。(原本每个partition的数据量可能是差不多的)

问题:

1、每个partition数据量变少了,但是在后面进行处理的时候,还是要跟partition数量一样数量的task,来进行处理;有点浪费task计算资源。

2、每个partition的数据量不一样,会导致后面的每个task处理每个partition的时候,每个task要处理的数据量就不同,这个时候很容易发生什么问题数据倾斜。。。。

比如说,第二个partition的数据量才100;但是第三个partition的数据量是900;那么在后面的task处理逻辑一样的情况下,不同的task要处理的数据量可能差别达到了9倍,甚至10倍以上;同样也就导致了速度的差别在9倍,甚至10倍以上。

这样的话呢,就会导致有些task运行的速度很快;有些task运行的速度很慢。这,就是数据倾斜。

针对上述的两个问题,我们希望应该能够怎么样

1、针对第一个问题,我们希望可以进行partition的压缩吧,因为数据量变少了,那么partition其实也完全可以对应的变少。比如原来是4个partition,现在完全可以变成2个partition。那么就只要用后面的2个task来处理即可。就不会造成task计算资源的浪费。(不必要,针对只有一点点数据的partition,还去启动一个task来计算)

2、针对第二个问题,其实解决方案跟第一个问题是一样的;也是去压缩partition,尽量让每个partition的数据量差不多。那么这样的话,后面的task分配到的partition的数据量也就差不多。不会造成有的task运行速度特别慢,有的task运行速度特别快。避免了数据倾斜的问题。

有了解决问题的思路之后,接下来,我们该怎么来做呢实现

coalesce算子

主要就是用于在filter操作之后,针对每个partition的数据量各不相同的情况,来压缩partition的数量。减少partition的数量,而且让每个partition的数据量都尽量均匀紧凑。

从而便于后面的task进行计算操作,在某种程度上,能够一定程度的提升性能。

算子调优之使用repartition解决Spark SQL低并行度的性能问题

并行度:之前说过,并行度是自己可以调节,或者说是设置的。

1、spark.default.parallelism

2、textFile(),传入第二个参数,指定partition数量(比较少用)

咱们的项目代码中,没有设置并行度,实际上,在生产环境中,是最好自己设置一下的。官网有推荐的设置方式,你的spark-submit脚本中,会指定你的application总共要启动多少个executor,100个;每个executor多少个cpu core,2~3个;总共application,有cpu core,200个。

官方推荐,根据你的application的总cpu core数量(在spark-submit中可以指定,200个),自己手动设置spark.default.parallelism参数,指定为cpu core总数的2~3倍。400~600个并行度。600。

承上启下

你设置的这个并行度,在哪些情况下会生效哪些情况下,不会生效

如果你压根儿没有使用Spark SQL(DataFrame),那么你整个spark application默认所有stage的并行度都是你设置的那个参数。(除非你使用coalesce算子缩减过partition数量)

问题来了,Spark SQL,用了。用Spark SQL的那个stage的并行度,你没法自己指定。Spark SQL自己会默认根据hive表对应的hdfs文件的block,自动设置Spark SQL查询所在的那个stage的并行度。你自己通过spark.default.parallelism参数指定的并行度,只会在没有Spark SQL的stage中生效。

比如你第一个stage,用了Spark SQL从hive表中查询出了一些数据,然后做了一些transformation操作,接着做了一个shuffle操作(groupByKey);下一个stage,在shuffle操作之后,做了一些transformation操作。hive表,对应了一个hdfs文件,有20个block;你自己设置了spark.default.parallelism参数为100。

你的第一个stage的并行度,是不受你的控制的,就只有20个task;第二个stage,才会变成你自己设置的那个并行度,100。

问题在哪里

Spark SQL默认情况下,它的那个并行度,咱们没法设置。可能导致的问题,也许没什么问题,也许很有问题。Spark SQL所在的那个stage中,后面的那些transformation操作,可能会有非常复杂的业务逻辑,甚至说复杂的算法。如果你的Spark SQL默认把task数量设置的很少,20个,然后每个task要处理为数不少的数据量,然后还要执行特别复杂的算法。

这个时候,就会导致第一个stage的速度,特别慢。第二个stage,1000个task,刷刷刷,非常快。

解决上述Spark SQL无法设置并行度和task数量的办法,是什么呢

repartition算子,你用Spark SQL这一步的并行度和task数量,肯定是没有办法去改变了。但是呢,可以将你用Spark SQL查询出来的RDD,使用repartition算子,去重新进行分区,此时可以分区成多个partition,比如从20个partition,分区成100个。

然后呢,从repartition以后的RDD,再往后,并行度和task数量,就会按照你预期的来了。就可以避免跟Spark SQL绑定在一个stage中的算子,只能使用少量的task去处理大量数据以及复杂的算法逻辑。

\
\

代码使用

\

算子调优之使用foreachPartition优化写数据库性能

foreach写库原理

\

默认的foreach的性能缺陷在哪里

首先,对于每条数据,都要单独去调用一次function,task为每个数据,都要去执行一次function函数。

如果100万条数据,(一个partition),调用100万次。性能比较差。

另外一个非常非常重要的一点

如果每个数据,你都去创建一个数据库连接的话,那么你就得创建100万次数据库连接。

但是要注意的是,数据库连接的创建和销毁,都是非常非常消耗性能的。虽然我们之前已经用了数据库连接池,只是创建了固定数量的数据库连接。

你还是得多次通过数据库连接,往数据库(MySQL)发送一条SQL语句,然后MySQL需要去执行这条SQL语句。如果有100万条数据,那么就是100万次发送SQL语句。

以上两点(数据库连接,多次发送SQL语句),都是非常消耗性能的。

优化

foreachPartition,在生产环境中,通常来说,都使用foreachPartition来写数据库的

用了foreachPartition算子之后,好处在哪里

1、对于我们写的function函数,就调用一次,一次传入一个partition所有的数据

2、主要创建或者获取一个数据库连接就可以

3、只要向数据库发送一次SQL语句和多组参数即可

在实际生产环境中,清一色,都是使用foreachPartition操作;但是有个问题,跟mapPartitions操作一样,如果一个partition的数量真的特别特别大,比如真的是100万,那基本上就不太靠谱了。

一下子进来,很有可能会发生OOM,内存溢出的问题。

一组数据的对比:生产环境

一个partition大概是1千条左右

用foreach,跟用foreachPartition,性能的提升达到了2~3分钟。

reduceByKey本地聚合介绍

reduceByKey,相较于普通的shuffle操作(比如groupByKey),它的一个特点,就是说,会进行map端的本地聚合。

对map端给下个stage每个task创建的输出文件中,写数据之前,就会进行本地的combiner操作,也就是说对每一个key,对应的values,都会执行你的算子函数() + _)

\

用reduceByKey对性能的提升:

1、在本地进行聚合以后,在map端的数据量就变少了,减少磁盘IO。而且可以减少磁盘空间的占用。

2、下一个stage,拉取数据的量,也就变少了。减少网络的数据传输的性能消耗。

3、在reduce端进行数据缓存的内存占用变少了。

4、reduce端,要进行聚合的数据量也变少了。

总结

reduceByKey在什么情况下使用呢

1、非常普通的,比如说,就是要实现类似于wordcount程序一样的,对每个key对应的值,进行某种数据公式或者算法的计算(累加、累乘)

2、对于一些类似于要对每个key进行一些字符串拼接的这种较为复杂的操作,可以自己衡量一下,其实有时,也是可以使用reduceByKey来实现的。但是不太好实现。如果真能够实现出来,对性能绝对是有帮助的。(shuffle基本上就占了整个spark作业的90%以上的性能消耗,只要能对shuffle进行一定的调优,都是有价值的)

troubleshooting之解决JVM GC导致的shuffle文件拉取失败

shuffle操作时的运行原理

1 首先stage0的每个task会生成stage1的task数量个文件,便于stage1的task拉取数据,并将数据信息写入Driver和BlockManager中

2 stage1中的task拉取数据时,首先找到Driver获取数据信息,知道去哪个Executor中获取去数据,然后找到对应的executor中的bolockManager,获取对应应该拉取的文件信息,再去拉取文件。

\

比如,executor的JVM进程,可能内存不是很够用了。那么此时可能就会执行GC。minor GC or full GC。总之一旦发生了JVM之后,就会导致executor内,所有的工作线程全部停止,比如BlockManager,基于netty的网络通信。

有时会出现的一种情况,非常普遍,在spark的作业中;shuffle file not found。(spark作业中,非常非常常见的)而且,有的时候,它是偶尔才会出现的一种情况。有的时候,出现这种情况以后,会重新去提交stage、task。重新执行一遍,发现就好了。没有这种错误了。

log怎么看用client模式去提交你的spark作业。比如standalone client;yarn client。一提交作业,直接可以在本地看到刷刷刷更新的log。

原因

下一个stage的executor,可能是还没有停止掉的,task想要去上一个stage的task所在的exeuctor,去拉取属于自己的数据,结果由于对方正在gc,就导致拉取了半天没有拉取到。

就很可能会报出,shuffle file not found。但是,可能下一个stage又重新提交了stage或task以后,再执行就没有问题了,因为可能第二次就没有碰到JVM在gc了。

参数配置

spark.shuffle.io.maxRetries 3

第一个参数,意思就是说,shuffle文件拉取的时候,如果没有拉取到(拉取失败),最多或重试几次(会重新拉取几次文件),默认是3次。

spark.shuffle.io.retryWait 5s

第二个参数,意思就是说,每一次重试拉取文件的时间间隔,默认是5s钟。

默认情况下,假如说第一个stage的executor正在进行漫长的full gc。第二个stage的executor尝试去拉取文件,结果没有拉取到,默认情况下,会反复重试拉取3次,每次间隔是五秒钟。最多只会等待3 * 5s = 15s。如果15s内,没有拉取到shuffle file。就会报出shuffle file not found。

针对这种情况,我们完全可以进行预备性的参数调节。增大上述两个参数的值,达到比较大的一个值,尽量保证第二个stage的task,一定能够拉取到上一个stage的输出文件。避免报shuffle file not found。然后可能会重新提交stage和task去执行。那样反而对性能也不好。

spark.shuffle.io.maxRetries 60

spark.shuffle.io.retryWait 60s

最多可以忍受1个小时没有拉取到shuffle file。只是去设置一个最大的可能的值。full gc不可能1个小时都没结束吧。

这样呢,就可以尽量避免因为gc导致的shuffle file not found,无法拉取到的问题。

troubleshooting之解决YARN队列资源不足导致的application直接失败

现象:

如果说,你是基于yarn来提交spark。比如yarn-cluster或者yarn-client。你可以指定提交到某个hadoop队列上的。每个队列都是可以有自己的资源的。

跟大家说一个生产环境中的,给spark用的yarn资源队列的情况:500G内存,200个cpu core。

比如说,某个spark application,在spark-submit里面你自己配了,executor,80个;每个executor,4G内存;每个executor,2个cpu core。你的spark作业每次运行,大概要消耗掉320G内存,以及160个cpu core。

乍看起来,咱们的队列资源,是足够的,500G内存,280个cpu core。

首先,第一点,你的spark作业实际运行起来以后,耗费掉的资源量,可能是比你在spark-submit里面配置的,以及你预期的,是要大一些的。400G内存,190个cpu core。

那么这个时候,的确,咱们的队列资源还是有一些剩余的。但是问题是,如果你同时又提交了一个spark作业上去,一模一样的。那就可能会出问题。

第二个spark作业,又要申请320G内存+160个cpu core。结果,发现队列资源不足。。。。

此时,可能会出现两种情况:(备注,具体出现哪种情况,跟你的YARN、Hadoop的版本,你们公司的一些运维参数,以及配置、硬件、资源肯能都有关系)

1、YARN,发现资源不足时,你的spark作业,并没有hang在那里,等待资源的分配,而是直接打印一行fail的log,直接就fail掉了。

2、YARN,发现资源不足,你的spark作业,就hang在那里。一直等待之前的spark作业执行完,等待有资源分配给自己来执行。

采用如下方案:

1、在你的J2EE(我们这个项目里面,spark作业的运行,之前说过了,J2EE平台触发的,执行spark-submit脚本),限制,同时只能提交一个spark作业到yarn上去执行,确保一个spark作业的资源肯定是有的。

2、你应该采用一些简单的调度区分的方式,比如说,你有的spark作业可能是要长时间运行的,比如运行30分钟;有的spark作业,可能是短时间运行的,可能就运行2分钟。此时,都提交到一个队列上去,肯定不合适。很可能出现30分钟的作业卡住后面一大堆2分钟的作业。分队列,可以申请(跟你们的YARN、Hadoop运维的同学申请)。你自己给自己搞两个调度队列。每个队列的根据你要执行的作业的情况来设置。在你的J2EE程序里面,要判断,如果是长时间运行的作业,就干脆都提交到某一个固定的队列里面去把;如果是短时间运行的作业,就统一提交到另外一个队列里面去。这样,避免了长时间运行的作业,阻塞了短时间运行的作业。

3、你的队列里面,无论何时,只会有一个作业在里面运行。那么此时,就应该用我们之前讲过的性能调优的手段,去将每个队列能承载的最大的资源,分配给你的每一个spark作业,比如80个executor;6G的内存;3个cpu core。尽量让你的spark作业每一次运行,都达到最满的资源使用率,最快的速度,最好的性能;并行度,240个cpu core,720个task。

4、在J2EE中,通过newSingleThreadExecutor线程池的方式(一个线程池对应一个资源队列,分别用来执行需要长时间运行的作业,和短时间运行的作业,每个线程池中都只有一个线程,实现一个时间段只有一个作业可以执行,其他作业进入线程池的队列中),来实现上述我们说的方案。

ExecutorService threadPool = Executors.newSingleThreadExecutor(1);

threadPool.submit(new Runnable() {

@Override

public void run() {

}

});

troubleshooting之解决各种序列化导致的报错

你会看到什么样的序列化导致的报错

用client模式去提交spark作业,观察本地打印出来的log。如果出现了类似于Serializable、Serialize等等字眼,报错的log,那么恭喜大家,就碰到了序列化问题导致的报错。

虽然是报错,但是序列化报错,应该是属于比较简单的了,很好处理。

序列化报错要注意的三个点:

1、你的算子函数里面,如果使用到了外部的自定义类型的变量,那么此时,就要求你的自定义类型,必须是可序列化的。

final Teacher teacher = new Teacher("leo");

studentsRDD.foreach(new VoidFunction() {

public void call(Row row) throws Exception {

String teacherName = teacher.getName();

....

}

});

public class Teacher implements Serializable {

}

2、如果要将自定义的类型,作为RDD的元素类型,那么自定义的类型也必须是可以序列化的

JavaPairRDD teacherRDD

JavaPairRDD studentRDD

studentRDD.join(teacherRDD)

public class Teacher implements Serializable {

}

public class Student implements Serializable {

}

3、不能在上述两种情况下,去使用一些第三方的,不支持序列化的类型

Connection conn = newConnection( );

studentsRDD.foreach(new VoidFunction() {

public void call(Row row) throws Exception {

conn.....

}

});

Connection是不支持序列化的

troubleshooting之解决算子函数返回NULL导致的问题

在算子函数中,返回null

// return actionRDD.mapToPair(new PairFunction() {

//

// private static final long serialVersionUID = 1L;

//

// @Override

// public Tuple2 call(Row row) throws Exception {

// return new Tuple2("-999", RowFactory.createRow("-999"));

// }

//

// });

大家可以看到,在有些算子函数里面,是需要我们有一个返回值的。但是,有时候,我们可能对某些值,就是不想有什么返回值。我们如果直接返回NULL的话,那么可以不幸的告诉大家,是不行的,会报错的。

Scala.Math(NULL),异常

如果碰到你的确是对于某些值,不想要有返回值的话,有一个解决的办法:

1、在返回的时候,返回一些特殊的值,不要返回null,比如“-999”

2、在通过算子获取到了一个RDD之后,可以对这个RDD执行filter操作,进行数据过滤。filter内,可以对数据进行判定,如果是-999,那么就返回false,给过滤掉就可以了。

3、大家不要忘了,之前咱们讲过的那个算子调优里面的coalesce算子,在filter之后,可以使用coalesce算子压缩一下RDD的partition的数量,让各个partition的数据比较紧凑一些。也能提升一些性能。

troubleshooting之解决yarn-client模式导致的网卡流量激增问题

yarn-client模式下,会产生什么样的问题呢

由于咱们的driver是启动在本地机器的,而且driver是全权负责所有的任务的调度的,也就是说要跟yarn集群上运行的多个executor进行频繁的通信(中间有task的启动消息、task的执行统计消息、task的运行状态、shuffle的输出结果)。

咱们来想象一下。比如你的executor有100个,stage有10个,task有1000个。每个stage运行的时候,都有1000个task提交到executor上面去运行,平均每个executor有10个task。接下来问题来了,driver要频繁地跟executor上运行的1000个task进行通信。通信消息特别多,通信的频率特别高。运行完一个stage,接着运行下一个stage,又是频繁的通信。

在整个spark运行的生命周期内,都会频繁的去进行通信和调度。所有这一切通信和调度都是从你的本地机器上发出去的,和接收到的。这是最要人命的地方。你的本地机器,很可能在30分钟内(spark作业运行的周期内),进行频繁大量的网络通信。那么此时,你的本地机器的网络通信负载是非常非常高的。会导致你的本地机器的网卡流量会激增!!!

你的本地机器的网卡流量激增,当然不是一件好事了。因为在一些大的公司里面,对每台机器的使用情况,都是有监控的。不会允许单个机器出现耗费大量网络带宽等等这种资源的情况。运维人员。可能对公司的网络,或者其他(你的机器还是一台虚拟机),对其他机器,都会有负面和恶劣的影响。

解决的方法:

实际上解决的方法很简单,就是心里要清楚,yarn-client模式是什么情况下,可以使用的yarn-client模式,通常咱们就只会使用在测试环境中,你写好了某个spark作业,打了一个jar包,在某台测试机器上,用yarn-client模式去提交一下。因为测试的行为是偶尔为之的,不会长时间连续提交大量的spark作业去测试。还有一点好处,yarn-client模式提交,可以在本地机器观察到详细全面的log。通过查看log,可以去解决线上报错的故障(troubleshooting)、对性能进行观察并进行性能调优。

实际上线了以后,在生产环境中,都得用yarn-cluster模式,去提交你的spark作业。

yarn-cluster模式,就跟你的本地机器引起的网卡流量激增的问题,就没有关系了。也就是说,就算有问题,也应该是yarn运维团队和基础运维团队之间的事情了。使用了yarn-cluster模式以后,就不是你的本地机器运行Driver,进行task调度了。是yarn集群中,某个节点会运行driver进程,负责task调度。

troubleshooting之解决yarn-cluster模式的JVM栈内存溢出问题

总结一下yarn-client和yarn-cluster模式的不同之处:

yarn-client模式,driver运行在本地机器上的;yarn-cluster模式,driver是运行在yarn集群上某个nodemanager节点上面的。

yarn-client会导致本地机器负责spark作业的调度,所以网卡流量会激增;yarn-cluster模式就没有这个问题。

yarn-client的driver运行在本地,通常来说本地机器跟yarn集群都不会在一个机房的,所以说性能可能不是特别好;yarn-cluster模式下,driver是跟yarn集群运行在一个机房内,性能上来说,也会好一些。

实践经验,碰到的yarn-cluster的问题:

有的时候,运行一些包含了spark sql的spark作业,可能会碰到yarn-client模式下,可以正常提交运行;yarn-cluster模式下,可能是无法提交运行的,会报出JVM的PermGen(永久代)的内存溢出,OOM。

yarn-client模式下,driver是运行在本地机器上的,spark使用的JVM的PermGen的配置,是本地的spark-class文件(spark客户端是默认有配置的),JVM的永久代的大小是128M,这个是没有问题的;但是呢,在yarn-cluster模式下,driver是运行在yarn集群的某个节点上的,使用的是没有经过配置的默认设置(PermGen永久代大小),82M。

spark-sql,它的内部是要进行很复杂的SQL的语义解析、语法树的转换等等,特别复杂,在这种复杂的情况下,如果说你的sql本身特别复杂的话,很可能会比较导致性能的消耗,内存的消耗。可能对PermGen永久代的占用会比较大。

所以,此时,如果对永久代的占用需求,超过了82M的话,但是呢又在128M以内;就会出现如上所述的问题,yarn-client模式下,默认是128M,这个还能运行;如果在yarn-cluster模式下,默认是82M,就有问题了。会报出PermGen Out of Memory error log。

如何解决这种问题

既然是JVM的PermGen永久代内存溢出,那么就是内存不够用。咱们呢,就给yarn-cluster模式下的,driver的PermGen多设置一些。

spark-submit脚本中,加入以下配置即可:

--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

这个就设置了driver永久代的大小,默认是128M,最大是256M。那么,这样的话,就可以基本保证你的spark作业不会出现上述的yarn-cluster模式导致的永久代内存溢出的问题。

Spark sql 多重or递归导致栈溢出

spark sql,sql,要注意,一个问题

sql,有大量的or语句。比如where keywords='' or keywords='' or keywords=''

当达到or语句,有成百上千的时候,此时可能就会出现一个driver端的jvm stack overflow,JVM栈内存溢出的问题

JVM栈内存溢出,基本上就是由于调用的方法层级过多,因为产生了大量的,非常深的,超出了JVM栈深度限制的,递归。递归方法。我们的猜测,spark sql,有大量or语句的时候,spark sql内部源码中,在解析sql,比如转换成语法树,或者进行执行计划的生成的时候,对or的处理是递归。or特别多的话,就会发生大量的递归。

JVM Stack Memory Overflow,栈内存溢出。

这种时候,建议不要搞那么复杂的spark sql语句。采用替代方案:将一条sql语句,拆解成多条sql语句来执行。每条sql语句,就只有100个or子句以内;一条一条SQL语句来执行。根据生产环境经验的测试,一条sql语句,100个or子句以内,是还可以的。通常情况下,不会报那个栈内存溢出。

troubleshooting之错误的持久化方式以及checkpoint的使用

错误的持久化使用方式:

usersRDD,想要对这个RDD做一个cache,希望能够在后面多次使用这个RDD的时候,不用反复重新计算RDD;可以直接使用通过各个节点上的executor的BlockManager管理的内存 / 磁盘上的数据,避免重新反复计算RDD。

usersRDD.cache()

usersRDD.count()

usersRDD.take()

上面这种方式,不要说会不会生效了,实际上是会报错的。会报什么错误呢会报一大堆file not found的错误。

正确的持久化使用方式:

usersRDD = usersRDD.cache()

val cachedUsersRDD = usersRDD.cache()

之后再去使用usersRDD,或者cachedUsersRDD,就可以了。就不会报错了。所以说,这个是咱们的持久化的正确的使用方式。

\

持久化,大多数时候,都是会正常工作的。但是就怕,有些时候,会出现意外。

比如说,缓存在内存中的数据,可能莫名其妙就丢失掉了。

或者说,存储在磁盘文件中的数据,莫名其妙就没了,文件被误删了。

出现上述情况的时候,接下来,如果要对这个RDD执行某些操作,可能会发现RDD的某个partition找不到了。

对消失的partition重新计算,计算完以后再缓存和使用。

有些时候,计算某个RDD,可能是极其耗时的。可能RDD之前有大量的父RDD。那么如果你要重新计算一个partition,可能要重新计算之前所有的父RDD对应的partition。

这种情况下,就可以选择对这个RDD进行checkpoint,以防万一。进行checkpoint,就是说,会将RDD的数据,持久化一份到容错的文件系统上(比如hdfs)。

在对这个RDD进行计算的时候,如果发现它的缓存数据不见了。优先就是先找一下有没有checkpoint数据(到hdfs上面去找)。如果有的话,就使用checkpoint数据了。不至于说是去重新计算。

checkpoint,其实就是可以作为是cache的一个备胎。如果cache失效了,checkpoint就可以上来使用了。

checkpoint有利有弊,利在于,提高了spark作业的可靠性,一旦发生问题,还是很可靠的,不用重新计算大量的rdd;但是弊在于,进行checkpoint操作的时候,也就是将rdd数据写入hdfs中的时候,还是会消耗性能的。

checkpoint,用性能换可靠性。

checkpoint原理:

1、在代码中,用SparkContext,设置一个checkpoint目录,可以是一个容错文件系统的目录,比如hdfs;

\

2、在代码中,对需要进行checkpoint的rdd,执行RDD.checkpoint();

\

3、RDDCheckpointData(spark内部的API),接管你的RDD,会标记为marked for checkpoint,准备进行checkpoint

4、你的job运行完之后,会调用一个finalRDD.doCheckpoint()方法,会顺着rdd lineage,回溯扫描,发现有标记为待checkpoint的rdd,就会进行二次标记,inProgressCheckpoint,正在接受checkpoint操作

5、job执行完之后,就会启动一个内部的新job,去将标记为inProgressCheckpoint的rdd的数据,都写入hdfs文件中。(备注,如果rdd之前cache过,会直接从缓存中获取数据,写入hdfs中;如果没有cache过,那么就会重新计算一遍这个rdd,再checkpoint)

6、将checkpoint过的rdd之前的依赖rdd,改成一个CheckpointRDD*,强制改变你的rdd的lineage。后面如果rdd的cache数据获取失败,直接会通过它的上游CheckpointRDD,去容错的文件系统,比如hdfs,中,获取checkpoint的数据。

数据倾斜解决方案之原理以及现象分析项目,第一个模块,用户访问session分析模块最后一个部分

1、大数据开发流程(需求分析。。。性能调优)

2、用户行为分析的业务(聚合统计、随机抽取、topn、排序)

3、技术点:大数据项目模块的技术架构、spark core各种算子、自定义Accumulator、随机抽取算法、分组取topn、二次排序

4、大数据项目中的性能调优和troubleshooting

5、完整的数据倾斜解决方案

\

在执行shuffle操作的时候,大家都知道,我们之前讲解过shuffle的原理。是按照key,来进行values的数据的输出、拉取和聚合的。

同一个key的values,一定是分配到一个reduce task进行处理的。

多个key对应的values,总共是90万。但是问题是,可能某个key对应了88万数据,key-88万values,分配到一个task上去面去执行。

另外两个task,可能各分配到了1万数据,可能是数百个key,对应的1万条数据。

想象一下,出现数据倾斜以后的运行的情况。很糟糕!极其糟糕!无比糟糕!

第一个和第二个task,各分配到了1万数据;那么可能1万条数据,需要10分钟计算完毕;第一个和第二个task,可能同时在10分钟内都运行完了;第三个task要88万条,88 * 10 = 880分钟 = 14.5个小时;

大家看看,本来另外两个task很快就运行完毕了(10分钟),但是由于一个拖后腿的家伙,第三个task,要14.5个小时才能运行完,就导致整个spark作业,也得14.5个小时才能运行完。

导致spark作业,跑的特别特别特别特别慢!!!像老牛拉破车!

数据倾斜,一旦出现,是不是性能杀手。。。。

发生数据倾斜以后的现象

spark数据倾斜,有两种表现:

1、你的大部分的task,都执行的特别特别快,刷刷刷,就执行完了(你要用client模式,standalone client,yarn client,本地机器主要一执行spark-submit脚本,就会开始打印log),task175 finished;剩下几个task,执行的特别特别慢,前面的task,一般1s可以执行完5个;最后发现1000个task,998,999 task,要执行1个小时,2个小时才能执行完一个task。

出现数据倾斜了

还算好的,因为虽然老牛拉破车一样,非常慢,但是至少还能跑。

2、运行的时候,其他task都刷刷刷执行完了,也没什么特别的问题;但是有的task,就是会突然间,啪,报了一个OOM,JVM Out Of Memory,内存溢出了,task failed,task lost,resubmitting task。反复执行几次都到了某个task就是跑不通,最后就挂掉。

某个task就直接OOM,那么基本上也是因为数据倾斜了,task分配的数量实在是太大了!!!所以内存放不下,然后你的task每处理一条数据,还要创建大量的对象。内存爆掉了。

出现数据倾斜了

这种就不太好了,因为你的程序如果不去解决数据倾斜的问题,压根儿就跑不出来。

作业都跑不完,还谈什么性能调优这些东西。扯淡。。。

定位原因与出现问题的位置

根据log去定位

出现数据倾斜的原因,基本只可能是因为发生了shuffle操作,在shuffle的过程中,出现了数据倾斜的问题。因为某个,或者某些key对应的数据,远远的高于其他的key。

1、你在自己的程序里面找找,哪些地方用了会产生shuffle的算子,groupByKey、countByKey、reduceByKey、join

2、看log

log一般会报是在你的哪一行代码,导致了OOM异常;或者呢,看log,看看是执行到了第几个stage!!!

我们这里不会去剖析stage的划分算法,(如果之前不了解,但是想要了解,建议先学习北风网的《Spark从入门到精通》),spark代码,是怎么划分成一个一个的stage的。哪一个stage,task特别慢,就能够自己用肉眼去对你的spark代码进行stage的划分,就能够通过stage定位到你的代码,哪里发生了数据倾斜

去找找,代码那个地方,是哪个shuffle操作。

数据倾斜解决方案之聚合源数据

数据倾斜的解决,跟之前讲解的性能调优,有一点异曲同工之妙。

性能调优,跟大家讲过一个道理,“重剑无锋”。性能调优,调了半天,最有效,最直接,最简单的方式,就是加资源,加并行度,注意RDD架构(复用同一个RDD,加上cache缓存);shuffle、jvm等,次要的。

数据倾斜,解决方案,第一个方案和第二个方案,一起来讲。最朴素、最简谱、最直接、最有效、最简单的,解决数据倾斜问题的方案。

第一个方案:聚合源数据

第二个方案:过滤导致倾斜的key

重剑无锋。后面的五个方案,尤其是最后4个方案,都是那种特别炫酷的方案。双重group聚合方案;sample抽样分解聚合方案;如果碰到了数据倾斜的问题。上来就先考虑考虑第一个和第二个方案,能不能做,如果能做的话,后面的5个方案,都不用去搞了。

有效。简单。直接。效果是非常之好的。彻底根除了数据倾斜的问题。

第一个方案:聚合源数据

咱们现在,做一些聚合的操作,groupByKey、reduceByKey;groupByKey,说白了,就是拿到每个key对应的values;reduceByKey,说白了,就是对每个key对应的values执行一定的计算。

现在这些操作,比如groupByKey和reduceByKey,包括之前说的join。都是在spark作业中执行的。

spark作业的数据来源,通常是哪里呢90%的情况下,数据来源都是hive表(hdfs,大数据分布式存储系统)。hdfs上存储的大数据。hive表,hive表中的数据,通常是怎么出来的呢有了spark以后,hive比较适合做什么事情hive就是适合做离线的,晚上凌晨跑的,ETL(extract transform load,数据的采集、清洗、导入),hive sql,去做这些事情,从而去形成一个完整的hive中的数据仓库;说白了,数据仓库,就是一堆表。

spark作业的源表,hive表,其实通常情况下来说,也是通过某些hive etl生成的。hive etl可能是晚上凌晨在那儿跑。今天跑昨天的数九。

数据倾斜,某个key对应的80万数据,某些key对应几百条,某些key对应几十条;现在,咱们直接在生成hive表的hive etl中,对数据进行聚合。比如按key来分组,将key对应的所有的values,全部用一种特殊的格式,拼接到一个字符串里面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火锅|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。

对key进行group,在spark中,拿到key=sessionid,values;hive etl中,直接对key进行了聚合。那么也就意味着,每个key就只对应一条数据。在spark中,就不需要再去执行groupByKey+map这种操作了。直接对每个key对应的values字符串,map操作,进行你需要的操作即可。key,values串。

spark中,可能对这个操作,就不需要执行shffule操作了,也就根本不可能导致数据倾斜。

或者是,对每个key在hive etl中进行聚合,对所有values聚合一下,不一定是拼接起来,可能是直接进行计算。reduceByKey,计算函数,应用在hive etl中,每个key的values。

聚合源数据方案,第二种做法

你可能没有办法对每个key,就聚合出来一条数据;

那么也可以做一个妥协;对每个key对应的数据,10万条;有好几个粒度,比如10万条里面包含了几个城市、几天、几个地区的数据,现在放粗粒度;直接就按照城市粒度,做一下聚合,几个城市,几天、几个地区粒度的数据,都给聚合起来。比如说

city_id date area_id

select ... from ... group by city_id

尽量去聚合,减少每个key对应的数量,也许聚合到比较粗的粒度之后,原先有10万数据量的key,现在只有1万数据量。减轻数据倾斜的现象和问题。

对于我们的程序来说,完全可以将aggregateBySession( )这一步操作,放在一个hive etl中来做,形成一个新的表。对每天的用户访问行为数据,都按session粒度进行聚合,写一个hive sql。

在spark程序中,就不要去做groupByKey+mapToPair这种算子了。直接从当天的session聚合表中,用Spark SQL查询出来对应的数据,即可。这个RDD在后面就可以使用了。

第二个方案:过滤导致倾斜的key

如果你能够接受某些数据,在spark作业中直接就摒弃掉,不使用。比如说,总共有100万个key。只有2个key,是数据量达到10万的。其他所有的key,对应的数量都是几十。

这个时候,你自己可以去取舍,如果业务和需求可以理解和接受的话,在你从hive表查询源数据的时候,直接在sql中用where条件,过滤掉某几个key。

那么这几个原先有大量数据,会导致数据倾斜的key,被过滤掉之后,那么在你的spark作业中,自然就不会发生数据倾斜了。

数据倾斜解决方案之提高shuffle操作reduce并行度

第一个和第二个方案,都不适合做。

第三个方案,提高shuffle操作的reduce并行度

将reduce task的数量,变多,就可以让每个reduce task分配到更少的数据量,这样的话,也许就可以缓解,或者甚至是基本解决掉数据倾斜的问题。

提升shuffle reduce端并行度,怎么来操作

很简单,主要给我们所有的shuffle算子,比如groupByKey、countByKey、reduceByKey。在调用的时候,传入进去一个参数。一个数字。那个数字,就代表了那个shuffle操作的reduce端的并行度。那么在进行shuffle操作的时候,就会对应着创建指定数量的reduce task。

这样的话,就可以让每个reduce task分配到更少的数据。基本可以缓解数据倾斜的问题。

比如说,原本某个task分配数据特别多,直接OOM,内存溢出了,程序没法运行,直接挂掉。按照log,找到发生数据倾斜的shuffle操作,给它传入一个并行度数字,这样的话,原先那个task分配到的数据,肯定会变少。就至少可以避免OOM的情况,程序至少是可以跑的。

提升shuffle reduce并行度的缺陷

治标不治本的意思,因为,它没有从根本上改变数据倾斜的本质和问题。不像第一个和第二个方案(直接避免了数据倾斜的发生)。原理没有改变,只是说,尽可能地去缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题。

实际生产环境中的经验。

1、如果最理想的情况下,提升并行度以后,减轻了数据倾斜的问题,或者甚至可以让数据倾斜的现象忽略不计,那么就最好。就不用做其他的数据倾斜解决方案了。

2、不太理想的情况下,就是比如之前某个task运行特别慢,要5个小时,现在稍微快了一点,变成了4个小时;或者是原先运行到某个task,直接OOM,现在至少不会OOM了,但是那个task运行特别慢,要5个小时才能跑完。

那么,如果出现第二种情况的话,各位,就立即放弃第三种方案,开始去尝试和选择后面的四种方案。

代码示例

统计每个品类的点击次数

普通品类点击也就几十万次,但热门品类会有上千万次,那就很容易发生数据倾斜

\
\

数据倾斜解决方案之使用随机key实现双重聚合

使用随机key实现双重聚合

原理

第一轮聚合的时候,对key进行打散,将原先一样的key,变成不一样的key,相当于是将每个key分为多组;

先针对多个组,进行key的局部聚合;接着,再去除掉每个key的前缀,然后对所有的key,进行全局的聚合。

\
\

使用场景

(1)groupByKey

(2)reduceByKey

比较适合使用这种方式;join,咱们通常不会这样来做,后面会讲三种,针对不同的join造成的数据倾斜的问题的解决方案。

对groupByKey、reduceByKey造成的数据倾斜,有比较好的效果。

如果说,之前的第一、第二、第三种方案,都没法解决数据倾斜的问题,那么就只能依靠这一种方式了。

代码实现

/**

* 使用随机key实现双重聚合

*/

/*

* 第一步:首先将key打散

*/

JavaPairRDD mappedCategoryIds = categoryIDRDD.mapToPair(tuple2 -> {

Long categoryId = tuple2._1;

Long count = tuple2._2;

return new Tuple2<>(new Random().nextInt(10) + "_" + categoryId, count);

});

/*

* 第二步:局部统计打散后的key的数量

*/

JavaPairRDD mappedCategoryCount =

mappedCategoryIds.reduceByKey((aLong, aLong2) -> aLong + aLong2);

/*

* 第三步:将局部统计出来的key重新全局聚合

*/

JavaPairRDD recoverCateIds = mappedCategoryCount.mapToPair(tuple -> {

return new Tuple2<>(Long.valueOf(tuple._1.split("_")[1]), tuple._2);

});

/*

* 第四步:统计出最终结果

*/

return recoverCateIds.reduceByKey((aLong, aLong2) -> aLong + aLong2);

数据倾斜解决方案之将reduce join转换为map join

原理

普通的join,那么肯定是要走shuffle;那么,所以既然是走shuffle,那么普通的join,就肯定是走的是reduce join。

先将所有相同的key,对应的values,汇聚到一个task中,然后再进行join。

\

reduce join转换为map join,适合在什么样的情况下,可以来使用

如果两个RDD要进行join,其中一个RDD是比较小的。一个RDD是100万数据,一个RDD是1万数据。(一个RDD是1亿数据,一个RDD是100万数据)

其中一个RDD必须是比较小的,broadcast出去那个小RDD的数据以后,就会在每个executor的block manager中都驻留一份。要确保你的内存足够存放那个小RDD中的数据

这种方式下,根本不会发生shuffle操作,肯定也不会发生数据倾斜;从根本上杜绝了join操作可能导致的数据倾斜的问题;

对于join中有数据倾斜的情况,大家尽量第一时间先考虑这种方式,效果非常好;如果某个RDD比较小的情况下。

不适合的情况:

两个RDD都比较大,那么这个时候,你去将其中一个RDD做成broadcast,就很笨拙了。很可能导致内存不足。最终导致内存溢出,程序挂掉。

而且其中某些key(或者是某个key),还发生了数据倾斜;此时可以采用最后两种方式。

建议

对于join这种操作,不光是考虑数据倾斜的问题;即使是没有数据倾斜问题,也完全可以优先考虑,用我们讲的这种高级的reduce join转map join的技术,不要用普通的join,去通过shuffle,进行数据的join;完全可以通过简单的map,使用map join的方式,牺牲一点内存资源;在可行的情况下,优先这么使用。

不走shuffle,直接走map,是不是性能也会高很多这是肯定的。

代码实现

/**

* 使用map join替换掉reduce join

*/

List> userid2RDDCollect = userid2RDD.collect();

//将该变量广播出去,这样不用在每个task中都复制一份,直接在每个executor对应的blockManager中取出数据即可

Broadcast>> broadcast = jsc.broadcast(userid2RDDCollect);

JavaPairRDD sessidfullinfo = userid2PartAggrRDD.mapToPair(tuple -> {

//首先将user详细信息转换成Map

Map userRDD = new HashMap<>();

broadcast.value().forEach(user -> userRDD.put(user._1, user._2));

Long userid = tuple._1;

String partinfo = tuple._2;

Row userinfo = userRDD.get(userid);

String sessionid = StringUtils.getFieldFromConcatString(partinfo, "\\|", SparkContantsParam.FIELD_SESSION_ID);

int age = userinfo.getInt(3);

String professional = userinfo.getString(4);

String city = userinfo.getString(5);

String sex = userinfo.getString(6);

//拼接上用户的年龄,职业,城市,性别,形成完整信息

String fullinfo = partinfo + "|"

+ SparkContantsParam.FIELD_AGE + "=" + age + "|"

+ SparkContantsParam.FIELD_PROFESSIONAL + "=" + professional + "|"

+ SparkContantsParam.FIELD_CITY + "=" + city + "|"

+ SparkContantsParam.FIELD_SEX + "=" + sex;

//将sessionid和完成信息返回

return new Tuple2<>(sessionid, fullinfo);

});

各区域商品统计模块介绍

需求:根据用户指定的日期范围,统计各个区域下的最热门的top3商品

1、区域信息在哪里,各个城市的信息,城市是不怎么变化的,没有必要存储在hive里MySQL,Hive和MySQL异构数据源使用,技术点

2、hive用户行为数据,和mysql城市信息,join,关联之后是RDDRDD转换DataFrame,注册临时表,技术点

3、各个区域下各个商品的点击量,保留每个区域的城市列表数据自定义UDAF函数,group_concat_distinct()

4、product_id,join hive表中的商品信息,商品信息在哪里Hive。商品的经营类型是什么自定义UDF函数,get_json_object(),if()

5、获取每个区域的点击量top3商品开窗函数;给每个区域打上级别的标识,西北大区,经济落后,区域上的划分,C类区域;北京、上海,发达,标记A类

6、Spark SQL的数据倾斜解决方案双重group by、随机key以及扩容表(自定义UDF函数,random_key())、内置reduce join转换为map join、shuffle并行度

各区域商品统计模块之需求分析,技术方案,数据设计

需求:根据用户指定的日期范围,统计各个区域下的最热门的top3商品

Spark作业接收taskid,查询对应的MySQL中的task,获取用户指定的筛选参数;统计出指定日期范围内的,各个区域的top3热门商品;最后将结果写入MySQL表中。

技术方案设计:

1、查询task,获取日期范围,通过Spark SQL,查询user_visit_action表中的指定日期范围内的数据,过滤出,商品点击行为,click_product_id is not null;click_product_id != 'NULL';click_product_id != 'null';city_id,click_product_id

2、使用Spark SQL从MySQL中查询出来城市信息(city_id、city_name、area),用户访问行为数据要跟城市信息进行join,city_id、city_name、area、product_id,RDD,转换成DataFrame,注册成一个临时表

3、Spark SQL内置函数(case when),对area打标记(华东大区,A级,华中大区,B级,东北大区,C级,西北大区,D级),area_level

4、计算出来每个区域下每个商品的点击次数,group by area, product_id;保留每个区域的城市名称列表;自定义UDAF,group_concat_distinct()函数,聚合出来一个city_names字段,area、product_id、city_names、click_count

5、join商品明细表,hive(product_id、product_name、extend_info),extend_info是json类型,自定义UDF,get_json_object()函数,取出其中的product_status字段,if()函数(Spark SQL内置函数),判断,0 自营,1 第三方;(area、product_id、city_names、click_count、product_name、product_status)

6、开窗函数,根据area来聚合,获取每个area下,click_count排名前3的product信息;area、area_level、product_id、city_names、click_count、product_name、product_status

7、结果写入MySQL表中

8、Spark SQL的数据倾斜解决方案双重group by、随机key以及扩容表(自定义UDF函数,random_key())、Spark SQL内置的reduce join转换为map join、提高shuffle并行度

9、本地测试和生产环境的测试

各区域热门商品统计-查询用户指定日期范围内的点击行为数据

第一步:查询用户指定时间范围内的点击行为(city_id,在哪个城市的点击行为)

hive数据源的使用

private static JavaPairRDD getcityid2ClickActionRDDByDate(String startDate,

String endDate, SQLContext sqlContext) {

//从user_visit_action中,查询用户访问行为数据

//第一个限定:click_product_id,限定不能为空,就意味着一定是点击行为

//第二个限定:获取在用户指定范围内的数据

String sql = "select city_id,click_product_id product_id from user_visit_action" +

"where click_product_id is not 'NULL'" +

" and click_product_id is not 'null'" +

" and action_time >=" + startDate +

" and action_time <=" + endDate;

DataFrame dataFrame = sqlContext.sql(sql);

JavaRDD rowJavaRDD = dataFrame.javaRDD();

return rowJavaRDD.mapToPair(tuple-> new Tuple2<>(tuple.getLong(0),tuple));

}

各区域热门商品统计-异构数据源之从MySQL中查询城市数据

第二步:从mysql中查询城市信息

异构数据源Mysql的使用

private static JavaPairRDD getcityid2CityInfo(SQLContext sqlContext) {

String url = SparkContantsParam.JDBC_URL;

Map map = new HashMap<>();

map.put("url",url);

map.put("dbtable","city_info");

map.put("user","hive");

map.put("password","hive");

JavaRDD javaRDD = sqlContext.read().format("jdbc")

.options(map).load().javaRDD();

return javaRDD.mapToPair(tuple -> new Tuple2<>(Long.valueOf(tuple.get(0)+""),tuple));

}

第三步:生成点击商品基础信息临时表

/**

* 生成点击商品基础信息临时表(productid + 城市信息的临时表)

* @param sqlContext

* @param clickActionRDDByDate

* @param cityInfoRDD

*/

private static void generateTmpClickProductBasicTable(SQLContext sqlContext,

JavaPairRDD clickActionRDDByDate, JavaPairRDD cityInfoRDD) {

// 执行join操作,进行点击行为数据和城市数据的关联

JavaPairRDD> joinRDD = clickActionRDDByDate.join(cityInfoRDD);

// 将上面的JavaPairRDD,转换成一个JavaRDD(才能将RDD转换为DataFrame)

JavaRDD map = joinRDD.map(tuple -> {

Long cityId = tuple._1;

Row action = tuple._2._1;

Row cityInfo = tuple._2._2;

long productId = action.getLong(1);

String cityName = cityInfo.getString(1);

String area = cityInfo.getString(2);

return RowFactory.create(cityId, cityName, productId, area);

});

// 基于JavaRDD的格式,就可以将其转换为DataFrame

List list = new ArrayList<>();

list.add(DataTypes.createStructField("cityId",DataTypes.LongType,true));

list.add(DataTypes.createStructField("cityName",DataTypes.StringType,true));

list.add(DataTypes.createStructField("productId",DataTypes.LongType,true));

list.add(DataTypes.createStructField("area",DataTypes.StringType,true));

StructType scema = DataTypes.createStructType(list);

DataFrame dataFrame = sqlContext.createDataFrame(map, scema);

// 将DataFrame中的数据,注册成临时表(tmp_clk_prod_basic)

dataFrame.registerTempTable("tmp_clk_prod_basic");

}

各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()

注册UDA函数

package com.spark.sparkproject.product;

import org.apache.spark.sql.api.java.UDF3;

public class ConcatLongStringUDF implements UDF3 {

@Override

public String call(Long v1, String v2, String split) {

return v1 + split + v2;

}

}

注册UDAF函数

package com.spark.sparkproject.product;

import com.spark.sparkproject.util.StringUtils;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.expressions.MutableAggregationBuffer;

import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;

import org.apache.spark.sql.types.DataType;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.StructType;

import java.util.Arrays;

public class GroupConcatDistinctUDAF extends UserDefinedAggregateFunction {

//指定输入字段的数据与类型

private StructType inputScema = DataTypes.createStructType(Arrays.asList(

DataTypes.createStructField("cityInfo",DataTypes.StringType,true)));

//指定缓冲字段的数据与类型

private StructType bufferInputScema = DataTypes.createStructType(Arrays.asList(

DataTypes.createStructField("bufferCityInfo",DataTypes.StringType,true)));

//指定返回类型

private DataType dataType = DataTypes.StringType;

//是否确定

private boolean deterministic = true;

@Override

public StructType inputSchema() {

return inputScema;

}

@Override

public StructType bufferSchema() {

return bufferInputScema;

}

@Override

public DataType dataType() {

return dataType;

}

@Override

public boolean deterministic() {

return deterministic;

}

/**

* 初始化,在自己内部指定一个初始的值

* @param buffer

*/

@Override

public void initialize(MutableAggregationBuffer buffer) {

buffer.update(0,"");

}

/**

* 更新,可以认为是将组内一个一个字段传递进来

* 实现拼接的逻辑

* @param buffer

* @param input

*/

@Override

public void update(MutableAggregationBuffer buffer, Row input) {

//缓冲中已经拼接过的字符串

String bufferCityInfo = buffer.getString(0);

//刚刚传递进来的某个字符串信息

String cityInfo = input.getString(0);

//实现去重的逻辑

//判断:之前没有拼接过该城市信息,才去拼接

if(!bufferCityInfo.contains(cityInfo)){

if(StringUtils.isEmpty(bufferCityInfo

)){

bufferCityInfo += cityInfo;

}else{

// 比如1:北京

// 1:北京,2:上海

bufferCityInfo += "," + cityInfo;

}

buffer.update(0,bufferCityInfo);

}

}

/**

* 合并

* 上述的update操作是针对分组内的一个节点上的操作,是部分操作

* 因为一个分组内的操作可能会分布在多个节点上

* 此时就要用merge操作,将多个节点上的串全部合并起来

* @param buffer1

* @param buffer2

*/

@Override

public void merge(MutableAggregationBuffer buffer1, Row buffer2) {

String bufferInfo = buffer1.getString(0);

String cityinfos = buffer2.getString(0);

for(String cityInfo : cityinfos.split(",")){

if(!bufferInfo.contains(cityInfo)){

if(StringUtils.isEmpty(bufferInfo)){

bufferInfo += cityInfo;

}else{

bufferInfo += "," + cityInfo;

}

}

}

buffer1.update(0,bufferInfo);

}

@Override

public Object evaluate(Row buffer) {

return buffer.getString(0);

}

}

第四步:注册生成各区域各商品点击次数的临时表

/**

* 生成各区域各商品的点击次数 + 城市信息

* @param sqlContext

*/

private static void generateTmpAreaProductClickCountTable(SQLContext sqlContext) {

/**

* 按照area,productId进行分组

* 计算出各个区域的商品点击次数

* 可以获取到每个area下的各个productId的城市信息拼接起来的串

*/

String sql = "select " +

"area,productId," +

"count(*) click_count" +

"group_concat_distinct(concat_long_string(cityId,cityName,':')) city_infos" +

" from tmp_clk_prod_basic group by area,productId";

DataFrame df = sqlContext.sql(sql);

/**

* 再次将查询出的数据注册为一个临时表

* 各区域商品的点击次数(以及额外的城市列表)

*/

df.registerTempTable("tmp_area_product_clk_count");

}

第五步:使用自定义get_json_object函数和内置if函数标记经营类型,生成各区域商品点击次数 + 商品完整信息

将之前得到的各区域商品点击次数表

去关联商品信息表

product_status要特殊处理,0,1,分别表示自营商品和第三方商品,放在product_info的extend_info的json串中

技术点:get_json_field()函数,可以从json串中获取指定字段的值

if()函数判断,如果product_status是0,则为自营商品,否则就是第三方商品

为什么要计算商品经营类型

拿到某个区域top3热门的商品,是自营还是第三方的,对于公司来讲是一个很重要的信息

/**

* 生成各区域商品点击次数 + 商品完整信息

*/

public static void generateAreaClickFullProductClickCount(SQLContext sqlContext){

/**

* 将之前得到的各区域商品点击次数表

* 去关联商品信息表

* product_status要特殊处理,0,1,分别表示自营商品和第三方商品,放在product_info的extend_info json串中

* get_json_field()函数,可以从json串中获取指定字段的值

* if()函数判断,如果product_status是0,则为自营商品,否则就是第三方商品

*

* 为什么要计算商品经营类型

* 拿到某个区域top3热门的商品,是自营还是第三方的是一个很重要的信息

*/

String sql = "select " +

"tapcc.area,tapcc.productId,tapcc.click_count,tapcc.city_infos," +

"pi.product_name" +

"if(get_json_field(pi.extend_info,'product_status')=0,'自营商品','第三方商品') product_status" +

" from tmp_area_product_clk_count tapcc " +

"left join product_info pi on tapcc.productId = pi.product_id ";

DataFrame df = sqlContext.sql(sql);

df.registerTempTable("tmp_area_fullproduct_clk_count");

}

第六步:使用开窗函数 + case when得到各区域点击次数前三的商品信息

/**

* 使用开窗函数row_number() over(partition by ... order by ...)得到各区域点击次数前三的商品信息

* @param sqlContext

* @return

*/

public static JavaRDD getAreaTop3ProductRDD(SQLContext sqlContext){

// 技术点:开窗函数

// 使用开窗函数先进行一个子查询

// 按照area进行分组,给每个分组内的数据,按照点击次数降序排序,打上一个组内的行号

// 接着在外层查询中,过滤出各个组内的行号排名前3的数据

// 其实就是咱们的各个区域下top3热门商品

// 华北、华东、华南、华中、西北、西南、东北

// A级:华北、华东

// B级:华南、华中

// C级:西北、西南

// D级:东北

// case when

// 根据多个条件,不同的条件对应不同的值

// case when then ... when then ... else ... end

String sql =

"select " +

"area," +

"CASE " +

"WHEN area = '华北' OR area = '华东' THEN 'A级'" +

"WHEN area = '华南' OR area = '华中' THEN 'B级'" +

"WHEN area = '西北' OR area = '西南' THEN 'C级'" +

"ELSE '东北' " +

"END area_level" +

"productId," +

"click_count," +

"city_infos," +

"product_name," +

"product_status " +

"from (" +

"select " +

"area," +

"productId," +

"click_count," +

"city_infos," +

"product_name," +

"product_status," +

"row_number() OVER ( partition by area order by click_count desc ) rank" +

" from tmp_area_fullproduct_clk_count " +

") t "+

"where rank <=3";

return sqlContext.sql(sql).javaRDD();

}

第七步:将各区域top3商品批量插入数据库中

这边的写入mysql和之前不太一样

因为实际上,就这个业务需求而言,计算出来的最终数据量是比较小的

总共就不到10个区域,每个区域还是top3热门商品,总共最后数据量也就是几十个

所以可以直接将数据collect()到本地

用批量插入的方式,一次性插入mysql即可

private static void presistAreaTop3Product(int taskId, List collect) {

List list = new ArrayList();

for(Row row:collect){

AreaTop3Product product = new AreaTop3Product();

product.setTaskid(taskId);

product.setArea(row.getString(0));

product.setAreaLevel(row.getString(1));

product.setProductid(row.getLong(2));

product.setClickCount(Long.valueOf(row.get(3)+""));

product.setCityInfos(row.getString(4));

product.setProductName(row.getString(5));

product.setProductStatus(row.getString(6));

list.add(product);

}

areaTop3ProductDAO.insertBatch(list);

}

各区域热门商品统计:Spark SQL数据倾斜解决方案

1、聚合源数据

2、过滤导致倾斜的key

3、提高shuffle并行度:spark.sql.shuffle.partitions

4、双重group by

5、reduce join转换为map join:spark.sql.autoBroadcastJoinThreshold

6、采样倾斜key并单独进行join

7、随机key与扩容表

由于Spark的这种都是基于RDD的特性;哪怕是Spark SQL,原本你是用纯的SQL来实现的;各位想一想,其实你用纯RDD,也能够实现一模一样的功能。

之前使用在Spark Core中的数据倾斜解决方案,全部都可以直接套用在Spark SQL上。

我们要讲一下,之前讲解的方案,如果是用纯的Spark SQL来实现,应该如何来实现。

1、聚合源数据:Spark Core和Spark SQL没有任何的区别

2、过滤导致倾斜的key:在sql中用where条件

3、提高shuffle并行度:groupByKey(1000),spark.sql.shuffle.partitions(默认是200)

4、双重group by:改写SQL,两次group by

5、reduce join转换为map join:spark.sql.autoBroadcastJoinThreshold(默认是10485760 )

你可以自己将表做成RDD,自己手动去实现map join

Spark SQL内置的map join,默认是如果有一个小表,是在10M以内,默认就会将该表进行broadcast,然后执行map join;调节这个阈值,比如调节到20M、50M、甚至1G。20 971 520

6、采样倾斜key并单独进行join:纯Spark Core的一种方式,sample、filter等算子

7、随机key与扩容表:Spark SQL+Spark Core

广告点击流量实时统计-需求分析、技术方案设计以及数据设计

需求分析

1、实现实时的动态黑名单机制:将每天对某个广告点击超过100次的用户拉黑

2、基于黑名单的非法广告点击流量过滤机制:

3、每天各省各城市各广告的点击流量实时统计:

4、统计每天各省top3热门广告

5、统计各广告最近1小时内的点击量趋势:各广告最近1小时内各分钟的点击量

6、使用高性能方式将实时统计结果写入MySQL

7、实现实时计算程序的HA高可用性(Spark Streaming HA方案)

8、实现实时计算程序的性能调优(Spark Streaming Performence Tuning方案)

数据格式介绍

timestamp 1450702800(实时数据需要将大量数据通过网络进行传输,timestamp是long型的,数据格式会紧凑一些,字符串形式中会有一些不必要的字符串)

province Jiangsu

city Nanjing

userid 100001

adid 100001

技术方案设计

1、实时计算各batch中的每天各用户对各广告的点击次数

2、使用高性能方式将每天各用户对各广告的点击次数写入MySQL中(更新)

3、使用filter过滤出每天对某个广告点击超过100次的黑名单用户,并写入MySQL中

4、使用transform操作,对每个batch RDD进行处理,都动态加载MySQL中的黑名单生成RDD,然后进行join后,过滤掉batch RDD中的黑名单用户的广告点击行为

5、使用updateStateByKey操作,实时计算每天各省各城市各广告的点击量,并时候更新到MySQL

6、使用transform结合Spark SQL,统计每天各省份top3热门广告:首先以每天各省各城市各广告的点击量数据作为基础,首先统计出每天各省份各广告的点击量;然后启动一个异步子线程,使用Spark SQL动态将数据RDD转换为DataFrame后,注册为临时表;最后使用Spark SQL开窗函数,统计出各省份top3热门的广告,并更新到MySQL中

7、使用window操作,对最近1小时滑动窗口内的数据,计算出各广告各分钟的点击量,并更新到MySQL中

8、实现实时计算程序的HA高可用性

9、对实时计算程序进行性能调优

数据设计

CREATE TABLE `ad_user_click_count` (

`date` varchar(30) DEFAULT NULL,

`user_id` int(11) DEFAULT NULL,

`ad_id` int(11) DEFAULT NULL,

`click_count` int(11) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `ad_blacklist` (

`user_id` int(11) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `ad_stat` (

`date` varchar(30) DEFAULT NULL,

`province` varchar(100) DEFAULT NULL,

`city` varchar(100) DEFAULT NULL,

`ad_id` int(11) DEFAULT NULL,

`click_count` int(11) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `ad_province_top3` (

`date` varchar(30) DEFAULT NULL,

`province` varchar(100) DEFAULT NULL,

`ad_id` int(11) DEFAULT NULL,

`click_count` int(11) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `ad_click_trend` (

`date` varchar(30) DEFAULT NULL,

`ad_id` int(11) DEFAULT NULL,

`minute` varchar(30) DEFAULT NULL,

`click_count` int(11) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8

电商用户行为分析大数据平台

1、用户访问session分析模块:会话(session),用户的基础访问行为

2、页面单跳转化率模块:页面(page),用户的页面访问和页面跳转行为

3、各区域热门商品统计模块:商品(product),用户的商品点击行为

4、广告点击流量实时统计模块:广告(ad,advertisement),用户的广告点击行为

电商网站 / app(移动互联网),用户行为的方方面面

咱们这个项目课程,基本上,内容还是非常丰富的;无论是技术含量,还是业务含量;技术上来说,对实际项目进行过改造和整合,比一般的生产环境中的真实项目的技术含量,还要高;业务上来说,除了对一些数据格式和琐碎的业务细节,进行了必要的简化(对大家学习没有任何意义),表中的字段(需要使用的),脏数据(清洗),业务的真实性和复杂性是非常高的,基本上就是完全的完整的一个企业真实项目;

实际企业中的讲师做过的这个项目,模块会达到几十个(二三十个);各个模块之间,做多了会发现,就是业务上有一些不同;其实技术上,基本都涵盖在咱们这个项目课程中了;技术给掌握了,去做任何用户行为分析类的项目,或者是业务需求,都ok,都没有问题的;本项目课程中,涵盖的4个模块,都是非常经典的4个模块

广告点击流量实时统计模块

网站 / app,是不是通常会给一些第三方的客户,打一些广告;也是一些互联网公司的核心收入来源;广告在网站 / app某个广告位打出去,在用户来使用网站 / app的时候,广告会显示出来;此时,有些用户可能就会去点击那个广告。

广告被点击以后,实际上,我们就是要针对这种用户行为(广告点击行为),实时数据,进行实时的大数据计算和统计。

每次点击一个广告以后,通常来说,网站 / app中都会有埋点(前端的应用中,比如JavaScript Ajax;app中的socket网络请求,往后台发送一条日志数据);日志数据而言,通常,如果要做实时统计的话,那么就会通过某些方式将数据写入到分布式消息队列中(Kafka);

日志写入到后台web服务器(nginx),nginx产生的实时的不断增加 / 更新的本地日志文件,就会被日志监控客户端(比如flume agent),写入到消息队列中(kafka),我们要负责编写实时计算程序,去从消息队列中(kafka)去实时地拉取数据,然后对数据进行实时的计算和统计。

这个模块的意义在于,让产品经理、高管可以实时地掌握到公司打的各种广告的投放效果。以便于后期持续地对公司的广告投放相关的战略和策略,进行调整和优化;以期望获得最好的广告收益。

为动态黑名单实时计算每天各用户对各广告的点击次数

1 首先构建SparkStreaming上下文环境

public static void main(String[] args) {

//构建spark streaming上下文

SparkConf conf = new SparkConf()

.setMaster("local[2]")

.setAppName("AdClickRealTimeStatSpark");

// spark streaming的上下文是构建JavaStreamingContext对象

// 而不是像之前的JavaSparkContext、SQLContext/HiveContext

// 传入的第一个参数,和之前的spark上下文一样,也是SparkConf对象;第二个参数则不太一样

// 第二个参数是spark streaming类型作业比较有特色的一个参数

// 实时处理batch的interval

// spark streaming,每隔一小段时间,会去收集一次数据源(kafka)中的数据,做成一个batch

// 每次都是处理一个batch中的数据

// 通常来说,batch interval,就是指每隔多少时间收集一次数据源中的数据,然后进行处理

// 一遍spark streaming的应用,都是设置数秒到数十秒(很少会超过1分钟)

// 咱们这里项目中,就设置5秒钟的batch interval

// 每隔5秒钟,咱们的spark streaming作业就会收集最近5秒内的数据源接收过来的数据

JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

// 创建针对Kafka数据来源的输入DStream(离线流,代表了一个源源不断的数据来源,抽象)

// 选用kafka direct api(很多好处,包括自己内部自适应调整每次接收数据量的特性,等等)

// 构建kafka参数map

// 主要要放置的就是,你要连接的kafka集群的地址(broker集群的地址列表)

Map kafkaMap = new HashMap<>();

kafkaMap.put("metadata.broker.list", "192.168.25.110:9092,192.168.25.111:9092,192.168.25.112:9092");

//构建topicSet

Set topics = new HashSet<>();

String[] tops = SparkContantsParam.KAFKA_TOPICS.split(",");

for (String topic : tops) {

topics.add(topic);

}

// 基于kafka direct api模式,构建出了针对kafka集群中指定topic的输入DStream

// 两个值,val1,val2;val1没有什么特殊的意义;val2中包含了kafka topic中的一条一条的实时日志数据

JavaPairInputDStream adRealTimeLogDStream = KafkaUtils.createDirectStream(jsc, String.class, String.class,

StringDecoder.class, StringDecoder.class, kafkaMap, topics);

//根据动态黑名单进行数据过滤

JavaPairDStream filterByBlackList =

filterByBlackList(adRealTimeLogDStream);

// 构建完spark streaming上下文之后,记得要进行上下文的启动、等待执行结束、关闭

jsc.start();

jsc.awaitTermination();

jsc.close();

}

首先过滤掉黑名单用户数据

1 使用transformToPair算子处理原始数据的RDD

2 查询出黑名单用户列表,使用rdd.context()生成JavaSparkContext,使用JavaSparkContext将其映射成格式的RDD

3 将原始数据转化为格式的RDD

4 将转化过后的原始数据左外连接黑名单的用户列表

5 过滤掉标记为true的黑名单用户

6 返回过滤后的原始数据

private static JavaPairDStream filterByBlackList(

JavaPairInputDStream adRealTimeLogDStream) {

// 刚刚接受到原始的用户点击行为日志之后

// 根据mysql中的动态黑名单,进行实时的黑名单过滤(黑名单用户的点击行为,直接过滤掉,不要了)

// 使用transform算子(将dstream中的每个batch RDD进行处理,转换为任意的其他RDD,功能很强大)

JavaPairDStream filterDStream = adRealTimeLogDStream

.transformToPair(rdd -> {

//得到所有的黑名单用户

List blacklists = adBlacklistDAO.findAll();

//转化成Tuple2

List> list = new ArrayList<>();

for (AdBlacklist black : blacklists) {

list.add(new Tuple2<>(black.getUserid(), true));

}

JavaSparkContext jsc = new JavaSparkContext(rdd.context());

//黑名单用户RDD

JavaPairRDD blackListRdd = jsc.parallelizePairs(list);

// 将原始数据timestamp province city userid adid rdd映射成>

JavaPairRDD> mapperRDD =

rdd.mapToPair(tuple -> {

String log = tuple._2;

String[] params = log.split(" ");

return new Tuple2<>(Long.valueOf(params[3]), tuple);

});

// 将原始日志数据rdd,与黑名单rdd,进行左外连接

// 如果说原始日志的userid,没有在对应的黑名单中,join不到,左外连接

// 用inner join,内连接,会导致数据丢失

JavaPairRDD, Optional>> joinedRDD =

mapperRDD.leftOuterJoin(blackListRdd);

JavaPairRDD, Optional>> filterRDD =

joinedRDD.filter(tuple -> {

Optional optional = tuple._2._2;

// 如果这个值存在,那么说明原始日志中的userid,join到了某个黑名单用户

if (optional.isPresent() && optional.get()) {

return false;

}

return true;

});

JavaPairRDD resultRDD = filterRDD.mapToPair(tuple -> tuple._2._1);

return resultRDD;

});

return filterDStream;

}

动态生成黑名单

1 过滤原始数据 timestamp province city userid adid,生成格式,并计算出每天每个用户对每支广告的点击量

2 将数据持久化到数据库,相同用户相同广告点击数量不断累积

3 用聚合过后的batch,使用userid查询出每个用户对每支的点击量,如果超过100则为黑名单用户,将其过滤出来

4 将过滤出来的数据使用map取出userid,并使用transform将每个rdd进行全局去重

5 将去重后的用户加入黑名单用户

private static void generateDynamicBlackList(JavaPairDStream filterByBlackList) {

JavaPairDStream dailyUserAdClickCountStream =

filterByBlackList.mapToPair(tuple -> {

// 一条一条的实时日志

// timestamp province city userid adid

// 某个时间点 某个省份 某个城市 某个用户 某个广告

// 计算出每5个秒内的数据中,每天每个用户每个广告的点击量

// 通过对原始实时日志的处理

// 将日志的格式处理成格式

String log = tuple._2;

String[] params = log.split(" ");

Date date = new Date(Long.valueOf(params[0]));

String dateKey = DateUtils.parseDateKey(date);

return new Tuple2<>(dateKey + "_" + params[3] + "_+" + params[4], 1L);

// 针对处理后的日志格式,执行reduceByKey算子即可

// (每个batch中)每天每个用户对每个广告的点击量

}).reduceByKey((s1, s2) -> s1 + s2);

// 到这里为止,获取到了什么数据呢

// dailyUserAdClickCountDStream DStream

// 源源不断的,每个5s的batch中,当天每个用户对每支广告的点击次数

//

dailyUserAdClickCountStream.foreachRDD(rdd -> {

rdd.foreachPartition(iterator -> {

// 对每个分区的数据就去获取一次连接对象

// 每次都是从连接池中获取,而不是每次都创建

// 写数据库操作,性能已经提到最高了

List list = new ArrayList<>();

while (iterator.hasNext()) {

Tuple2 next = iterator.next();

String datekey = next._1.split("_")[0];

String date = DateUtils.parseDateKey(new Date(Long.valueOf(datekey)));

long userid = Long.valueOf(next._1.split("_")[1]);

long adid = Long.valueOf(next._1.split("_")[2]);

AdUserClickCount ad = new AdUserClickCount();

ad.setAdid(adid);

ad.setClickCount(next._2);

ad.setDate(date);

ad.setUserid(userid);

list.add(ad);

}

adUserClickCountDAO.updateBatch(list);

});

return null;

});

// 现在我们在mysql里面,已经有了累计的每天各用户对各广告的点击量

// 遍历每个batch中的所有记录,对每条记录都要去查询一下,这一天这个用户对这个广告的累计点击量是多少

// 从mysql中查询

// 查询出来的结果,如果是100,如果你发现某个用户某天对某个广告的点击量已经大于等于100了

// 那么就判定这个用户就是黑名单用户,就写入mysql的表中,持久化

// 对batch中的数据,去查询mysql中的点击次数,使用哪个dstream呢

// dailyUserAdClickCountDStream

// 为什么用这个batch因为这个batch是聚合过的数据,已经按照yyyyMMdd_userid_adid进行过聚合了

// 比如原始数据可能是一个batch有一万条,聚合过后可能只有五千条

// 所以选用这个聚合后的dstream,既可以满足咱们的需求,而且呢,还可以尽量减少要处理的数据量

JavaPairDStream filterBlackUser =

dailyUserAdClickCountStream.filter(tuple -> {

String date = DateUtils.formatDate(new Date(Long.valueOf(tuple._1.split("_")[0])));

long userId = Long.valueOf(tuple._1.split("_")[1]);

long adid = Long.valueOf(tuple._1.split("_")[2]);

// 从mysql中查询指定日期指定用户对指定广告的点击量

int count = adUserClickCountDAO.findClickCountByMultiKey(date, userId, adid);

// 判断,如果点击量大于等于100,ok,那么不好意思,你就是黑名单用户

// 那么就拉入黑名单,返回true

if (count >= 100) {

return true;

}

// 反之,如果点击量小于100的,那么就暂时不要管它了

return false;

});

//过滤出黑名单用户的userid,并进行全局去重

JavaDStream distinctBlackUserIds =

filterBlackUser.map(tuple -> Long.valueOf(tuple._1.split("_")[1]))

.transform(rdd -> rdd.distinct());

// 到这一步为止,distinctBlackUserIds

// 每一个rdd,只包含了userid,而且还进行了全局的去重,保证每一次过滤出来的黑名单用户都没有重复的

distinctBlackUserIds.foreachRDD(rdd -> {

rdd.foreachPartition(longIterator -> {

List list = new ArrayList<>();

while (longIterator.hasNext()) {

AdBlacklist black = new AdBlacklist();

black.setUserid(longIterator.next());

list.add(black);

}

adBlacklistDAO.insertBatch(list);

// 到此为止,我们其实已经实现了动态黑名单了

// 1、计算出每个batch中的每天每个用户对每个广告的点击量,并持久化到mysql中

// 2、依据上述计算出来的数据,对每个batch中的按date、userid、adid聚合的数据

// 都要遍历一遍,查询一下,对应的累计的点击次数,如果超过了100,那么就认定为黑名单

// 然后对黑名单用户进行去重,去重后,将黑名单用户,持久化插入到mysql中

// 所以说mysql中的ad_blacklist表中的黑名单用户,就是动态地实时地增长的

// 所以说,mysql中的ad_blacklist表,就可以认为是一张动态黑名单

// 3、基于上述计算出来的动态黑名单,在最一开始,就对每个batch中的点击行为

// 根据动态黑名单进行过滤

// 把黑名单中的用户的点击行为,直接过滤掉

// 动态黑名单机制,就完成了

});

return null;

});

}

计算广告流量实时统计结果(yyyyMMdd_province_city_adid,clickCount)

1 首先将原始数据通过mapToPair转化为

2 通过updateStateByKey统计各个batch rdd中每天各省各市各广告的点击次数

3将统计结果循环插入数据库中

private static JavaPairDStream caculateRealTime(JavaPairDStream filterByBlackList) {

// 业务逻辑一

// 广告点击流量实时统计

// 上面的黑名单实际上是广告类的实时系统中,比较常见的一种基础的应用

// 实际上,我们要实现的业务功能,不是黑名单

// 计算每天各省各城市各广告的点击量

// 这份数据,实时不断地更新到mysql中的,J2EE系统,是提供实时报表给用户查看的

// j2ee系统每隔几秒钟,就从mysql中搂一次最新数据,每次都可能不一样

// 设计出来几个维度:日期、省份、城市、广告

// j2ee系统就可以非常的灵活

// 用户可以看到,实时的数据,比如2015-11-01,历史数据

// 2015-12-01,当天,可以看到当天所有的实时数据(动态改变),比如江苏省南京市

// 广告可以进行选择(广告主、广告名称、广告类型来筛选一个出来)

// 拿着date、province、city、adid,去mysql中查询最新的数据

// 等等,基于这几个维度,以及这份动态改变的数据,是可以实现比较灵活的广告点击流量查看的功能的

// date province city userid adid

// date_province_city_adid,作为key;1作为value

// 通过spark,直接统计出来全局的点击次数,在spark集群中保留一份;在mysql中,也保留一份

// 我们要对原始数据进行map,映射成格式

// 然后呢,对上述格式的数据,执行updateStateByKey算子

// spark streaming特有的一种算子,在spark集群内存中,维护一份key的全局状态

JavaPairDStream mappedDStream =

filterByBlackList.mapToPair(tuple -> {

String date = tuple._2.split("_")[0];

String dateKey = DateUtils.parseDateKey(new Date(Long.valueOf(date)));

String province = tuple._2.split("_")[1];

String city = tuple._2.split("_")[2];

String adid = tuple._2.split("_")[4];

return new Tuple2<>(dateKey + "_" + province + "_" + city + "_" + adid, 1L);

});

// 在这个dstream中,就相当于,有每个batch rdd累加的各个key(各天各省份各城市各广告的点击次数)

// 每次计算出最新的值,就在aggregatedDStream中的每个batch rdd中反应出来

JavaPairDStream aggregatedDStream = mappedDStream.

updateStateByKey((Function2, Optional, Optional>) (longs, optional) -> {

// 举例来说

// 对于每个key,都会调用一次这个方法

// 比如key是<20151201_Jiangsu_Nanjing_10001,1>,就会来调用一次这个方法7

// 10个

// values,(1,1,1,1,1,1,1,1,1,1)

// 首先根据optional判断,之前这个key,是否有对应的状态

long clickCount = 0L;

if (optional.isPresent()) {

clickCount = optional.get();

}

// values,代表了,batch rdd中,每个key对应的所有的值

for (Long value : longs) {

clickCount += value;

}

return Optional.of(clickCount);

});

//实时统计出来的广告流量结果,持久化到数据库中,以便j2ee使用

aggregatedDStream.foreachRDD(rdd->{

rdd.foreachPartition(iterator->{

List list = new ArrayList<>();

while (iterator.hasNext()){

Tuple2 next = iterator.next();

//date_province_city_adid

String info = next._1;

Long clickCount = next._2;

AdStat adStat = new AdStat();

adStat.setClickCount(clickCount);

adStat.setDate(info.split("_")[0]);

adStat.setProvince(info.split("_")[1]);

adStat.setCity(info.split("_")[2]);

adStat.setAdid(Long.valueOf(info.split("_")[3]));

list.add(adStat);

}

adStatDAO.updateBatch(list);

});

return null;

});

return aggregatedDStream;

}

计算出各省份每天热点广告Top3

/**

* 计算每天各省份top3热门广告

* 1 使用transform函数

* 2 先将过滤出的数据转化为(yyyyMMdd_province_adid,clickCount)

* 3 reducebyKey统计出各省份各广告的点击次数的RDD

* 4 将该RDD转化为SparkSql

* 5 使用HiveContext利用开窗函数统计出各城市点击量排名前三的数据信息

* 6 将数据持久化到数据库中

*

* @param adRealTimeDStream

*/

private static void caculateDailyProvinceTop3Ad(JavaPairDStream adRealTimeDStream) {

// adRealTimeStatDStream

// 每一个batch rdd,都代表了最新的全量的每天各省份各城市各广告的点击量

//(yyyyMMdd_province_city_adid,clickCount)

JavaDStream rowJavaDStream = adRealTimeDStream.transform(rdd -> {

// 计算出每天各省份各广告的点击量

JavaPairRDD provinceClickCountRDD = rdd.mapToPair(tuple -> {

String[] infos = tuple._1.split("_");

Long clickCount = tuple._2;

return new Tuple2<>(infos[0] + "_" + infos[1] + "_" + Long.valueOf(infos[3]), clickCount);

}).reduceByKey((aLong, aLong2) -> aLong + aLong2);

// 将provinceClickCountRDD转换为DataFrame

// 注册为一张临时表

// 使用Spark SQL,通过开窗函数,获取到各省份的top3热门广告

JavaRDD mapRow = provinceClickCountRDD.map(tuple -> {

String[] infos = tuple._1.split("_");

Long clickCounct = tuple._2;

String dateKey = DateUtils.parseDateKey(new Date(Long.valueOf(infos[0])));

String province = infos[1];

long adid = Long.valueOf(infos[3]);

return RowFactory.create(dateKey, province, adid, clickCounct);

});

StructType schema = DataTypes.createStructType(Arrays.asList(

DataTypes.createStructField("date", DataTypes.StringType, true),

DataTypes.createStructField("province", DataTypes.LongType, true),

DataTypes.createStructField("ad_id", DataTypes.LongType, true),

DataTypes.createStructField("click_count", DataTypes.LongType, true)

));

HiveContext hiveContext = new HiveContext(rdd.context());

DataFrame dataFrame = hiveContext.createDataFrame(mapRow, schema);

dataFrame.registerTempTable("tmp_daily_pro_click_count");

DataFrame dataFrame1 = hiveContext.sql(

"select " +

"date," +

"province," +

"ad_id," +

"click_count " +

"from (" +

"select " +

"date," +

"province," +

"ad_id," +

"click_count," +

"ROW_NUMBER() OVER(partition by province order by click_count desc) rank" +

"from tmp_daily_pro_click_count) t" +

" where rank <=3");

return dataFrame1.javaRDD();

});

rowJavaDStream.foreachRDD(rdd->{

rdd.foreachPartition(iterator->{

List list = new ArrayList<>();

while (iterator.hasNext()){

Row row = iterator.next();

AdProvinceTop3 top3 = new AdProvinceTop3();

top3.setDate(row.getString(0));

top3.setProvince(row.getString(1));

top3.setAdid(row.getLong(2));

top3.setClickCount(row.getLong(3));

list.add(top3);

}

adProvinceTop3DAO.updateBatch(list);

});

return null;

});

}

使用reduceByKeyAndWindow实时统计每天每小时每分钟每个广告的滑动窗口的点击趋势

private static void caculateDailyHourMinAdWindow(JavaPairInputDStream adRealTimeLogDStream) {

//(timestamp province city userid adid )

// 映射成格式

JavaPairDStream mapedDStream =

adRealTimeLogDStream.mapToPair(tuple -> {

String[] infos = tuple._1.split("_");

String dateMin = DateUtils.parseMinutes(DateUtils.formatDateKey(infos[0]));

return new Tuple2<>(dateMin + "_" + infos[4], 1L);

});

// 过来的每个batch rdd,都会被映射成的格式

// 每次出来一个新的batch,都要获取最近1小时内的所有的batch

// 然后根据key进行reduceByKey操作,统计出来最近一小时内的各分钟各广告的点击次数

// 1小时滑动窗口内的广告点击趋势

// 点图 / 折线图

// 每10秒钟统计一次一小时之内的广告点击次数

JavaPairDStream aggrRDD =

mapedDStream.reduceByKeyAndWindow

((aLong, aLong2) -> aLong + aLong2, Durations.minutes(60),

Durations.seconds(10));

// aggrRDD

// 每次都可以拿到,最近1小时内,各分钟(yyyyMMddHHMM)各广告的点击量

// 各广告,在最近1小时内,各分钟的点击量

aggrRDD.foreachRDD(rdd->{

rdd.foreachPartition(iterator->{

List list = new ArrayList<>();

while (iterator.hasNext()){

Tuple2 next = iterator.next();

String dateMin = next._1.split("_")[0];

long adid = Long.valueOf(next._1.split("_")[1]);

Long clickCount = next._2;

String dateKey = DateUtils.formatDate(DateUtils.formatDateKey(dateMin.substring(0,8)));

AdClickTrend adClickTrend = new AdClickTrend();

adClickTrend.setAdid(adid);

adClickTrend.setDate(dateKey);

adClickTrend.setHour(dateMin.substring(8,10));

adClickTrend.setMinute(dateMin.substring(10));

adClickTrend.setClickCount(clickCount);

list.add(adClickTrend);

}

adClickTrendDAO.updateBatch(list);

});

return null;

});

}

实现实时计算程序的HA高可用性

HA高可用性:High Availability,如果有些数据丢失,或者节点挂掉;那么不能让你的实时计算程序挂了;必须做一些数据上的冗余副本,保证你的实时计算程序可以7 * 24小时的运转。

通过一整套方案(3个步骤),开启和实现实时计算程序的HA高可用性,保证一些关键数据都有其冗余副本,不至于因为节点挂掉或者其他原因导致数据丢失。

1、updateStateByKey、window等有状态的操作,自动进行checkpoint,必须设置checkpoint目录

checkpoint目录:容错的文件系统的目录,比如说,常用的是HDFS

SparkStreaming.checkpoint("hdfs://192.168.25.110:9090/checkpoint")

JavaStreamingContext jsc = new JavaStreamingContext

(conf, Durations.seconds(5));

jsc.checkpoint("hdfs://192.168.25.110:9090/checkpoint");

设置完这个基本的checkpoint目录之后,有些会自动进行checkpoint操作的DStream,就实现了HA高可用性;checkpoint,相当于是会把数据保留一份在容错的文件系统中,一旦内存中的数据丢失掉;那么就可以直接从文件系统中读取数据;不需要重新进行计算

2、Driver高可用性

第一次在创建和启动StreamingContext的时候,那么将持续不断地将实时计算程序的元数据(比如说,有些dstream或者job执行到了哪个步骤),如果后面,不幸,因为某些原因导致driver节点挂掉了;那么可以让spark集群帮助我们自动重启driver,然后继续运行时候计算程序,并且是接着之前的作业继续执行;没有中断,没有数据丢失

第一次在创建和启动StreamingContext的时候,将元数据写入容错的文件系统(比如hdfs);spark-submit脚本中加一些参数;保证在driver挂掉之后,spark集群可以自己将driver重新启动起来;而且driver在启动的时候,不会重新创建一个streaming context,而是从容错文件系统(比如hdfs)中读取之前的元数据信息,包括job的执行进度,继续接着之前的进度,继续执行。

使用这种机制,就必须使用cluster模式提交,确保driver运行在某个worker上面;但是这种模式不方便我们调试程序,一会儿还要最终测试整个程序的运行,打印不出log;我们这里仅仅是用我们的代码给大家示范一下:

JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {

@Override

public JavaStreamingContext create() {

JavaStreamingContext jssc = new JavaStreamingContext(...);

JavaDStream lines = jssc.socketTextStream(...);

jssc.checkpoint(checkpointDirectory);

return jssc;

}

};

JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

context.start();

context.awaitTermination();

spark-submit

--deploy-mode cluster

--supervise

3、实现RDD高可用性:启动WAL预写日志机制

spark streaming,从原理上来说,是通过receiver来进行数据接收的;接收到的数据,会被划分成一个一个的block;block会被组合成一个batch;针对一个batch,会创建一个rdd;启动一个job来执行我们定义的算子操作。

receiver主要接收到数据,那么就会立即将数据写入一份到容错文件系统(比如hdfs)上的checkpoint目录中的,一份磁盘文件中去;作为数据的冗余副本。

无论你的程序怎么挂掉,或者是数据丢失,那么数据都不肯能会永久性的丢失;因为肯定有副本。

WAL(Write-Ahead Log)预写日志机制

spark.streaming.receiver.writeAheadLog.enable true

\

对实时计算程序进行性能调优

1、并行化数据接收:处理多个topic的数据时比较有效

int numStreams = 5;

List> kafkaStreams = new ArrayList>(numStreams);

for (int i = 0; i < numStreams; i++) {

kafkaStreams.add(KafkaUtils.createStream(...));

}

JavaPairDStream unifiedStream = streamingContext.union(kafkaStreams.get(0),

kafkaStreams.subList(1, kafkaStreams.size()));

unifiedStream.print();

2、spark.streaming.blockInterval:增加block数量,增加每个batch rdd的partition数量,增加处理并行度

receiver从数据源源源不断地获取到数据;首先是会按照block interval,将指定时间间隔的数据,收集为一个block;默认时间是200ms,官方推荐不要小于50ms;接着呢,会将指定batch interval时间间隔内的block,合并为一个batch;创建为一个rdd,然后启动一个job,去处理这个batch rdd中的数据

\

batch rdd,它的partition数量是多少呢一个batch有多少个block,就有多少个partition;就意味着并行度是多少;就意味着每个batch rdd有多少个task会并行计算和处理。

当然是希望可以比默认的task数量和并行度再多一些了;可以手动调节block interval;减少block interval;每个batch可以包含更多的block;有更多的partition;也就有更多的task并行处理每个batch rdd。

初始的rdd过来,直接就是固定的partition数量了

3、inputStream.repartition():重分区,增加每个batch rdd的partition数量

\

有些时候,希望对某些dstream中的rdd进行定制化的分区

对dstream中的rdd进行重分区,去重分区成指定数量的分区,这样也可以提高指定dstream的rdd的计算并行度

4、调节并行度

spark.default.parallelism

reduceByKey(numPartitions)

5、使用Kryo序列化机制:

spark streaming,也是有不少序列化的场景的

提高序列化task发送到executor上执行的性能,如果task很多的时候,task序列化和反序列化的性能开销也比较可观

默认输入数据的存储级别是StorageLevel.MEMORY_AND_DISK_SER_2,receiver接收到数据,默认就会进行持久化操作;首先序列化数据,存储到内存中;如果内存资源不够大,那么就写入磁盘;而且,还会写一份冗余副本到其他executor的block manager中,进行数据冗余。

6、batch interval:每个的处理时间必须小于batch interval

实际上你的spark streaming跑起来以后,其实都是可以在spark ui上观察它的运行情况的;可以看到batch的处理时间;

如果发现batch的处理时间大于batch interval,就必须调节batch interval

尽量不要让batch处理时间大于batch interval

比如你的batch每隔5秒生成一次;你的batch处理时间要达到6秒;就会出现,batch在你的内存中日积月累,一直囤积着,没法及时计算掉,释放内存空间;而且对内存空间的占用越来越大,那么此时会导致内存空间快速消耗

如果发现batch处理时间比batch interval要大,就尽量将batch interval调节大一些

项目总结

1、大数据集群环境的搭建

CentOS 6.4、hadoop-2.5.0-cdh5.3.6、hive-0.13.1-cdh5.3.6、zookeeper-3.4.5-cdh5.3.6、kafka_2.9.2-0.8.1、flume-ng-1.5.0-cdh5.3.6以及日志采集流程、Spark 1.5.1

2、企业级大数据项目的架构搭建

Java、配置管理组件、JDBC辅助组件(内置数据库连接池)、Domain与DAO模型

scala:只适合用于编写一些比较纯粹的一些数据处理程序(比如说一些复杂的数据etl)

真正的讲师本人做过的项目的原型,技术的使用上,要比这个复杂很多

Spring、MyBatis(半自动ORM框架),复杂的代码组件的管理(Spring),复杂的底层数据库CRUD的操作(MyBatis)

用到了一些额外的辅助组件,包括redis(缓存)、kafka(消息队列,spark一边处理数据,一边往kafka中写)

唯一的选择,就是java

scala(不可能),scala会调用很多底层的java代码,造成项目的多编程语言的混编;最终导致整个项目的可维护性、可扩展性极差

3、J2EE与Spark组成的交互式大数据分析平台架构

没有讲解j2ee,但是至少给大家讲解清楚了j2ee与spark组成的大数据平台架构,拓展了大家的知识面

而且,这是根据本次项目课程的原型项目,讲解的

大家要知道,即使是本套项目课程,你做出来的东西,都是要放在j2ee与spark的架构中的(task、spark触发、spark结果如何被展示)

特别是拿出去面试找工作的时候,这个架构要说清楚,j2ee层不是你做的就可以了

4、企业级大数据项目的开发流程

贯穿了整个项目,每个模块,基本上都是按照这个完整的流程来的

数据分析(来源数据的分析)

需求分析(基于上述数据,要实现什么样的需求和功能)

技术方案设计(基于来源数据与需求,以及你所掌握的spark技术,设计方案来实现需求功能)

数据库设计(技术方案设计完了以后,要配合着技术方案,设计数据库中表)

编码实现(基于上述所有的东西,使用你掌握的spark技术,来编码,实现功能)

功能测试(包括本地测试和生产环境测试,spark的client和cluster的说明)

性能调优(spark core、spark sql、spark streaming)

troubleshooting(项目上线以后,要及时解决出现的线上故障与报错)

解决数据倾斜(后期维护过程中,可能会出现的严重的性能问题)

5、一套项目课程,全面涵盖了90%以上的Spark Core、Spark SQL和Spark Streaming,几乎所有的初中高级技术点;全面锻炼了学员的spark大数据项目实战能力;视频至少看一遍(最佳是两遍以上),代码至少两遍(一遍跟着视频敲,一遍脱开视频自己敲);将大数据项目与spark技术融会贯通

6、用户访问session分析模块

用户session分析业务:复杂业务逻辑,session聚合统计、session随机抽取、top10热门品类、top10活跃用户

技术点:数据的过滤与聚合、自定义Accumulator、按时间比例随机抽取算法、二次排序、分组取topN

性能调优方案:普通调优、jvm调优、shuffle调优、算子调优

troubleshooting经验

数据倾斜解决方案:7种方案

7、页面单跳转化率模块

小小的特色,而且主要是互联网行业中非常常见的一些需求

页面单跳转化率计算业务

页面切片生成以及页面流匹配算法

知道,如何去计算网站 / app的页面之间的流转路径的转化率

8、各区域热门商品统计模块

Spark SQL

区域级别的热门商品的统计业务

技术点:Hive与MySQL异构数据源、RDD转换为DataFrame、注册和使用临时表、自定义UDAF聚合函数、自定义get_json_object等普通函数、Spark SQL的高级内置函数(if与case when等)、开窗函数(高端)

Spark SQL数据倾斜解决方案

9、广告点击流量实时统计模块

广告点击流量的实时统计的业务

技术点:动态黑名单机制(动态生成黑名单以及黑名单过滤)、transform、updateStateByKey、transform与Spark SQL整合、window滑动窗口、高性能写数据库

HA方案:高可用性方案,3种

性能调优:常用的性能调优的技巧

上一篇:大数据技术之hadoop环境搭建教程
下一篇:大数据技术之sbt安装教程
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站