频道栏目
首页 > 系统 > 虚拟机 > 正文

深入了解SparkSQL运行计划及调优教程

2018-02-09 15:02:23      个评论    来源:CODE男孩的博客  
收藏   我要投稿

1.1运行环境说明

1.1.1硬软件环境

l主机操作系统:Windows 64位,双核4线程,主频2.2G,10G内存

l虚拟软件:VMware Workstation 9.0.0 build-812388

l虚拟机操作系统:CentOS6.5 64位,单核

l虚拟机运行环境:

JDK:1.7.0_55 64位

Hadoop:2.2.0(需要编译为64位)

Scala:2.10.4

Spark:1.1.0(需要编译)

Hive:0.13.1(源代码编译,参见1.2)

1.1.2集群网络环境

本次实验环境只需要hadoop1一台机器即可,网络环境配置如下:

序号

IP地址

机器名

类型

用户名

目录

1

192.168.0.61

hadoop1

NN/DN

hadoop

/app程序所在路径

/app/scala-...

/app/hadoop

/app/complied

1.2编译Hive

1.2.1下载Hive源代码包

这里选择下载的版本为hive-0.13.1,这个版本需要到apache的归档服务器下载,下载地址:http://archive.apache.org/dist/hive/hive-0.13.1/,选择apache-hive-0.13.1-src.tar.gz文件进行下载:

clip_image002

1.2.2上传Hive源代码包

把下载的hive-0.13.0.tar.gz安装包,使用SSH Secure File Transfer工具(参见第2课《Spark编译与部署(上)--基础环境搭建》1.3.1介绍)上传到/home/hadoop/upload目录下。

1.2.3解压缩并移动到编译目录

到上传目录下,用如下命令解压缩hive安装文件:

$cd /home/hadoop/upload

$tar -zxf apache-hive-0.13.1-src.tar.gz

改名并移动到/app/complied目录下:

$sudo mv apache-hive-0.13.1-src /app/complied/hive-0.13.1-src

$ll /app/complied

1.2.4编译Hive

编译Hive源代码的时候,需要从网上下载依赖包,所以整个编译过程机器必须保证在联网状态。编译执行如下脚本:

$cd /app/complied/hive-0.13.1-src/

$export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"

$mvn -Phadoop-2,dist -Dmaven.test.skip=true clean package

clip_image004

在编译过程中可能出现速度慢或者中断,可以再次启动编译,编译程序会在上次的编译中断处继续进行编译,整个编译过程耗时与网速紧密相关,网速较快的情况需要1个小时左右(上图的时间是多次编译后最后成功的界面)。最终编译的结果为$HIVE_HOME/packaging/target/apache-hive-0.13.1-bin.tar.gz

通过如下命令查看最终编译完成整个目录大小,可以看到大小为353.6M左右

$du -s /app/complied/hive-0.13.1-src

clip_image006

【注】已经编译好的Hive包在本系列配套资源/install/6.hive-0.13.1-src.tar.gz,读者直接使用

1.3首次运行hive-console

1.3.1获取Spark源代码

由于首次运行hive-console需要在Spark源代码进行编译,关于Spark源代码的获取可以参考第二课《Spark编译与部署(下)--Spark编译安装》方式进行获取,连接地址为http://spark.apache.org/downloads.html,获取源代码后把Spark源代码移动到/app/complied目录,并命名为spark-1.1.0-hive

1.3.2配置/etc/profile环境变量

第一步使用如下命令打开/etc/profile文件:

$sudo vi /etc/profile

第二步设置如下参数:

export HADOOP_HOME=/app/hadoop/hadoop-2.2.0

export HIVE_HOME=/app/complied/hive-0.13.1-src

export HIVE_DEV_HOME=/app/complied/hive-0.13.1-src

clip_image008

第三步生效配置并验证

$sudo vi /etc/profile

$echo $HIVE_DEV_HOME

1.3.3运行sbt进行编译

运行hive/console不需要启动Spark,需要进入到Spark根目录下使用sbt/sbt hive/console进行首次运行编译,编译以后下次可以直接启动。编译Spark源代码的时候,需要从网上下载依赖包,所以整个编译过程机器必须保证在联网状态。编译命令如下:

$cd /app/complied/spark-1.1.0-hive

$sbt/sbt hive/console

clip_image010

编译时间会很长,在编译过程中可能出现速度慢或者中断,可以再次启动编译,编译程序会在上次的编译中断处继续进行编译,整个编译过程耗时与网速紧密相关。

clip_image012

通过如下命令查看最终编译完成整个目录大小,可以看到大小为267.9M左右

$du -s /app/complied/spark-1.1.0-hive

clip_image014

【注】已经编译好的Spark for hive-console包在本系列配套资源/install/6.spark-1.1.0-hive.tar.gz,可直接使用

1.4使用hive-console

1.4.1启动hive-console

进入到spark根目录下,使用如下命令启动hive-console

$cd /app/complied/spark-1.1.0-hive

$sbt/sbt hive/console

clip_image016

1.4.2辅助命令Help和Tab键

可以使用:help查看帮助内容

scala>:help

clip_image018

可以使用tab键查看所有可使用命令、函数

clip_image020

1.4.3常用操作

首先定义Person类,在该类中定义name、age和state三个列,然后把该类注册为people表并装载数据,最后通过查询到数据存放到query中

scala>case class Person(name:String, age:Int, state:String)

scala>sparkContext.parallelize(Person("Michael",29,"CA")::Person("Andy",30,"NY")::Person("Justin",19,"CA")::Person("Justin",25,"CA")::Nil).registerTempTable("people")

clip_image022

scala>val query= sql("select * from people")

clip_image024

1.4.3.1查看查询的schema

scala>query.printSchema

scala>query.collect()

clip_image026

1.4.3.2查看查询的整个运行计划

scala>query.queryExecution

clip_image028

1.4.4查看查询的Unresolved LogicalPlan

scala>query.queryExecution.logical

clip_image030

1.4.4.1查看查询的Analyzed LogicalPlan

scala>query.queryExecution.analyzed

clip_image032

1.4.4.2查看优化后的LogicalPlan

scala>query.queryExecution.optimizedPlan

clip_image034

1.4.4.3查看物理计划

scala>query.queryExecution.sparkPlan

clip_image036

1.4.4.4查看RDD的转换过程

scala>query.toDebugString

clip_image038

1.4.5不同数据源的运行计划

上面常用操作里介绍了源自RDD的数据,SparkSQL也可以源自多个数据源:jsonFile、parquetFile和Hive等。

1.4.5.1读取Json格式数据

第一步Json测试数据

Json文件支持嵌套表,SparkSQL也可以读入嵌套表,如下面形式的Json数据,可以使用jsonFile读入SparkSQL。该文件可以在配套资源/data/class6中找到,在以下测试中把文件放到/home/hadoop/upload/class6路径中

{

"fullname": "Sean Kelly",

"org": "SK Consulting",

"emailaddrs": [

{"type": "work", "value": "kelly@seankelly.biz"},

{"type": "home", "pref": 1, "value": "kelly@seankelly.tv"}

],

"telephones": [

{"type": "work", "pref": 1, "value": "+1 214 555 1212"},

{"type": "fax", "value": "+1 214 555 1213"},

{"type": "mobile", "value": "+1 214 555 1214"}

],

"addresses": [

{"type": "work", "format": "us",

"value": "1234 Main StnSpringfield, TX 78080-1216"},

{"type": "home", "format": "us",

"value": "5678 Main StnSpringfield, TX 78080-1316"}

],

"urls": [

{"type": "work", "value": "http://seankelly.biz/"},

{"type": "home", "value": "http://seankelly.tv/"}

]

}

第二步读入Json数据

使用jsonFile读入数据并注册成表jsonPerson,然后定义一个查询jsonQuery

scala>jsonFile("/home/hadoop/upload/class6/nestjson.json").registerTempTable("jsonPerson")

scala>val jsonQuery = sql("select * from jsonPerson")

clip_image040

第三步查看jsonQuery的schema

scala>jsonQuery.printSchema

clip_image042

第四步查看jsonQuery的整个运行计划

scala>jsonQuery.queryExecution

clip_image044

1.4.5.2读取Parquet格式数据

Parquet数据放在配套资源/data/class6/wiki_parquet中,在以下测试中把文件放到/home/hadoop/upload/class6路径下

第一步读入Parquet数据

parquet文件读入并注册成表parquetWiki,然后定义一个查询parquetQuery

scala>parquetFile("/home/hadoop/upload/class6/wiki_parquet").registerTempTable("parquetWiki")

scala>val parquetQuery = sql("select * from parquetWiki")

clip_image046

有报错但不影响使用

clip_image048

第二步查询parquetQuery的schema

scala>parquetQuery.printSchema

clip_image050

第三步查询parquetQuery的整个运行计划

scala>parquetQuery.queryExecution

clip_image052

第四步查询取样

scala>parquetQuery.takeSample(false,10,2)

clip_image054

clip_image056

1.4.5.3读取hive内置测试数据

在TestHive类中已经定义了大量的hive0.13的测试数据的表格式,如src、sales等等,在hive-console中可以直接使用;第一次使用的时候,hive-console会装载一次。下面我们使用sales表看看其schema和整个运行计划。

第一步读入测试数据并定义一个查询hiveQuery

scala>val hiveQuery = sql("select * from sales")

clip_image058

第二步查看hiveQuery的schema

scala>hiveQuery.printSchema

clip_image060

第三步查看hiveQuery的整个运行计划

scala>hiveQuery.queryExecution

clip_image062

第四步其他SQL语句的运行计划

scala>val hiveQuery = sql("select * from (select * from src limit 5) a limit 3")

clip_image064

scala>val hiveQuery = sql("select * FROM (select * FROM src) a")

clip_image066

scala>hiveQuery.where('key === 100).queryExecution.toRdd.collect

clip_image068

1.4.6不同查询的运行计划

1.4.6.1聚合查询

scala>sql("select name, age,state as location from people").queryExecution

clip_image070

scala>sql("select name from (select name,state as location from people) a where location='CA'").queryExecution

clip_image072

scala>sql("select sum(age) from people").queryExecution

scala>sql("select sum(age) from people").toDebugString

clip_image074

clip_image076

scala>sql("select state,avg(age) from people group by state").queryExecution

scala>sql("select state,avg(age) from people group by state").toDebugString

clip_image078

clip_image080

 

1.4.6.2Join操作

scala>sql("select a.name,b.name from people a join people b where a.name=b.name").queryExecution

scala>sql("select a.name,b.name from people a join people b where a.name=b.name").toDebugString

clip_image082

clip_image084

1.4.6.3Distinct操作

scala>sql("select distinct a.name,b.name from people a join people b where a.name=b.name").queryExecution

scala>sql("select distinct a.name,b.name from people a join people b where a.name=b.name").toDebugString

clip_image086

clip_image088

1.4.7优化

1.4.7.1CombineFilters

CombineFilters就是合并Filter,在含有多个Filter时发生,如下查询:

sql("select name from (select * from people where age >=19) a where a.age <30").queryExecution

clip_image090

上面的查询,在Optimized的过程中,将age>=19和age<30这两个Filter合并了,合并成((age>=19) && (age<30))。其实上面还做了一个其他的优化,就是project的下推,子查询使用了表的所有列,而主查询使用了列name,在查询数据的时候子查询优化成只查列name。

1.4.7.2PushPredicateThroughProject

PushPredicateThroughProject就是project下推,和上面例子中的project一样

sql("select name from (select name,state as location from people) a where location='CA'").queryExecution

clip_image092

1.4.7.3ConstantFolding:

ConstantFolding是常量叠加,用于表达式。如下面的例子:

sql("select name,1+2 from people").queryExecution

clip_image094

2、SparkSQL调优

Spark是一个快速的内存计算框架,同时是一个并行运算的框架,在计算性能调优的时候,除了要考虑广为人知的木桶原理外,还要考虑平行运算的Amdahl定理。

木桶原理又称短板理论,其核心思想是:一只木桶盛水的多少,并不取决于桶壁上最高的那块木块,而是取决于桶壁上最短的那块。将这个理论应用到系统性能优化上,系统的最终性能取决于系统中性能表现最差的组件。例如,即使系统拥有充足的内存资源和CPU资源,但是如果磁盘I/O性能低下,那么系统的总体性能是取决于当前最慢的磁盘I/O速度,而不是当前最优越的CPU或者内存。在这种情况下,如果需要进一步提升系统性能,优化内存或者CPU资源是毫无用处的。只有提高磁盘I/O性能才能对系统的整体性能进行优化。

clip_image096

Amdahl定理,一个计算机科学界的经验法则,因吉恩·阿姆达尔而得名。它代表了处理器平行运算之后效率提升的能力。并行计算中的加速比是用并行前的执行速度和并行后的执行速度之比来表示的,它表示了在并行化之后的效率提升情况。阿姆达尔定律是固定负载(计算总量不变时)时的量化标准。可用公式:clip_image098来表示。式中clip_image100分别表示问题规模的串行分量(问题中不能并行化的那一部分)和并行分量,p表示处理器数量。当clip_image102时,上式的极限是clip_image104,其中clip_image106。这意味着无论我们如何增大处理器数目,加速比是无法高于这个数的。

SparkSQL作为Spark的一个组件,在调优的时候,也要充分考虑到上面的两个原理,既要考虑如何充分的利用硬件资源,又要考虑如何利用好分布式系统的并行计算。由于测试环境条件有限,本篇不能做出更详尽的实验数据来说明,只能在理论上加以说明。

2.1并行性

SparkSQL在集群中运行,将一个查询任务分解成大量的Task分配给集群中的各个节点来运行。通常情况下,Task的数量是大于集群的并行度。比如前面第六章和第七章查询数据时,shuffle的时候使用了缺省的spark.sql.shuffle.partitions,即200个partition,也就是200个Task:

clip_image108

而实验的集群环境却只能并行3个Task,也就是说同时只能有3个Task保持Running:

clip_image110

这时大家就应该明白了,要跑完这200个Task就要跑200/3=67批次。如何减少运行的批次呢?那就要尽量提高查询任务的并行度。查询任务的并行度由两方面决定:集群的处理能力和集群的有效处理能力。

l对于Spark Standalone集群来说,集群的处理能力是由conf/spark-env中的SPARK_WORKER_INSTANCES参数、SPARK_WORKER_CORES参数决定的;而SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES不能超过物理机器的实际CPU core;

l集群的有效处理能力是指集群中空闲的集群资源,一般是指使用spark-submit或spark-shell时指定的--total-executor-cores,一般情况下,我们不需要指定,这时候,Spark Standalone集群会将所有空闲的core分配给查询,并且在Task轮询运行过程中,Standalone集群会将其他spark应用程序运行完后空闲出来的core也分配给正在运行中的查询。

综上所述,SparkSQL的查询并行度主要和集群的core数量相关,合理配置每个节点的core可以提高集群的并行度,提高查询的效率。

2.2高效的数据格式

高效的数据格式,一方面是加快了数据的读入速度,另一方面可以减少内存的消耗。高效的数据格式包括多个方面:

2.2.1数据本地性

分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值。

下面是Spark webUI监控Stage的一个图:

lPROCESS_LOCAL是指读取缓存在本地节点的数据

lNODE_LOCAL是指读取本地节点硬盘数据

lANY是指读取非本地节点数据

l通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关。

clip_image112

 

2.2.2合适的数据类型

对于要查询的数据,定义合适的数据类型也是非常有必要。对于一个tinyint可以使用的数据列,不需要为了方便定义成int类型,一个tinyint的数据占用了1个byte,而int占用了4个byte。也就是说,一旦将这数据进行缓存的话,内存的消耗将增加数倍。在SparkSQL里,定义合适的数据类型可以节省有限的内存资源。

2.2.3合适的数据列

对于要查询的数据,在写SQL语句的时候,尽量写出要查询的列名,如Select a,b from tbl,而不是使用Select * from tbl;这样不但可以减少磁盘IO,也减少缓存时消耗的内存。

2.2.4优的数据存储格式

在查询的时候,最终还是要读取存储在文件系统中的文件。采用更优的数据存储格式,将有利于数据的读取速度。查看SparkSQL的Stage,可以发现,很多时候,数据读取消耗占有很大的比重。对于sqlContext来说,支持textFiile、SequenceFile、ParquetFile、jsonFile;对于hiveContext来说,支持AvroFile、ORCFile、Parquet File,以及各种压缩。根据自己的业务需求,测试并选择合适的数据存储格式将有利于提高SparkSQL的查询效率。

2.3内存的使用

spark应用程序最纠结的地方就是内存的使用了,也是最能体现“细节是魔鬼”的地方。Spark的内存配置项有不少,其中比较重要的几个是:

lSPARK_WORKER_MEMORY,在conf/spark-env.sh中配置SPARK_WORKER_MEMORY和SPARK_WORKER_INSTANCES,可以充分的利用节点的内存资源,SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY不要超过节点本身具备的内存容量;

lexecutor-memory,在spark-shell或spark-submit提交spark应用程序时申请使用的内存数量;不要超过节点的SPARK_WORKER_MEMORY;

lspark.storage.memoryFraction spark应用程序在所申请的内存资源中可用于cache的比例

lspark.shuffle.memoryFraction spark应用程序在所申请的内存资源中可用于shuffle的比例

在实际使用上,对于后两个参数,可以根据常用查询的内存消耗情况做适当的变更。另外,在SparkSQL使用上,有几点建议:

l对于频繁使用的表或查询才进行缓存,对于只使用一次的表不需要缓存;

l对于join操作,优先缓存较小的表;

l要多注意Stage的监控,多思考如何才能更多的Task使用PROCESS_LOCAL;

l要多注意Storage的监控,多思考如何才能Fraction cached的比例更多

clip_image114

2.4合适的Task

对于SparkSQL,还有一个比较重要的参数,就是shuffle时候的Task数量,通过spark.sql.shuffle.partitions来调节。调节的基础是spark集群的处理能力和要处理的数据量,spark的默认值是200。Task过多,会产生很多的任务启动开销,Task多少,每个Task的处理时间过长,容易straggle。

2.5其他的一些建议

优化的方面的内容很多,但大部分都是细节性的内容,下面就简单地提提:

l想要获取更好的表达式查询速度,可以将spark.sql.codegen设置为Ture;

l对于大数据集的计算结果,不要使用collect() ,collect()就结果返回给driver,很容易撑爆driver的内存;一般直接输出到分布式文件系统中;

l对于Worker倾斜,设置spark.speculation=true将持续不给力的节点去掉;

l对于数据倾斜,采用加入部分中间步骤,如聚合后cache,具体情况具体分析;

l适当的使用序化方案以及压缩方案;

l善于利用集群监控系统,将集群的运行状况维持在一个合理的、平稳的状态;

l善于解决重点矛盾,多观察Stage中的Task,查看最耗时的Task,查找原因并改善;

上一篇:解决win7和虚拟机中文件共享的问题
下一篇:SparkSQL之Spark实战应用教程
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站