频道栏目
首页 > 网络 > 云计算 > 正文

问题驱动学习大数据

2018-09-13 15:59:27      个评论      
收藏   我要投稿

如果单机系统能够较好的支撑我们的需求,那么可能分布式系统和大数据技术也就不会出现,至少不会成为主流。

近些年,spark 异军突起发展迅猛,15 年,spark 刚刚起步时,中文资料匮乏,只要应用 spark 做过一些稍微复杂的开发就可以敲开腾讯等著名公司的大门,但现在即使对 spark 的运行、调度机理烂熟于心也未必管用。对于 IT 人员来说,大数据技术在发展,相应的对我们的要求也在提高。

如果能知道大数据发展的方向,知道下一个 spark 是谁,我们就能尽早布局工资小涨一把。但是我们能做到吗,看到这篇文章的亲们有福气了,我可以肯定的告诉你,做不到,反正我做不到!但是当我们了解了一些大数据中间件的部件和原理的时候,我们至少能够判断未来出现的某些组件是否符合我们业务的需要,做到这一点我觉得已经很不错了。

如果你认同这个思路,我们至少应该知道大数据中间件的这些组件是怎么来的。

分布式是怎么来的

我并不打算严谨地讲述历史,因为我做不到,对就是这么怂。我们只是简单地设想几个场景来推演一下。

场景一

我们有一台服务器做存储,但是数据量过大很快就把磁盘挂满了,我们就去买硬盘挂载,但是挂载的硬盘很快也满了,我们就换上更大的硬盘,但是没过多久还是满了。

场景二

我们花了八千万买了当今市场上配置最高的服务器,终于可以在短时间内不用担心存储爆掉了(长期就不好说了),但是因为服务器太高端,系统和接口都是定制化的,非常复杂,需要专人维护,维护价格是每个月十万。

场景三

我们又买了一台,在某一个机房托管。在某一个风和日丽的下午,我们正吃着火锅唱着歌,突然服务器所在的机房着火了,我们赶到的时候服务器已经冶炼成为一块好钢,数据全特么丢了。

场景四

又买了一台,但是太贵了我们不得不把北京六环的一套 80 平的房产卖了,卖了三个亿。我们想尽快挣钱把房子买回来,时间长了可能就要涨到十个亿了。所以我们开始卖数据,提供数据服务。数据很受欢迎,同时访问并发量达到了 1 W +,服务器虽然很强大,但是服务还是出现明显的延迟,服务器读、写、计算都要做,负载太高,随时可能去世。用户开始投诉,维护人员雪中送碳,将维护费用提高到每个月四十万。

以上的场景虽然是假设的,但是还是能够反映单机系统的缺陷:处理能力有限,单点故障且扩展成本高。这些缺陷在数据为王的现在越来越让人难以接受。

相对于在单机系统上一条道走到黑(最终走向结构复杂的巨型机),更流行的解决方案是分布式集群。也就是利用网络将多个小型机(相对于巨型机)连接起来联合提供服务。

相对于巨型机的解决方案,分布式的解决方案有以下优势。

扩展成本低:大家可以想象一下,一条 128 G 的内存条和 32 条 4 G 的内存条哪种更贵,服务器同理; 扩展容量大: 服务器的扩展理论上来说是没有上限的,但是巨型机则不可能; 维护成本低: 小型机的结构相对简单一些,维护起来成本更低; 负载可均衡: 集群间的机器可以分摊流量,防止某一台机器的负载过高; 服务多样化: 各服务器可以承载不同的服务,巨型机像是已经组装好的玩具,而集群像是一堆积木,可以有更多的玩法,但是相对于巨型机,集群的解决方案也有一些关键的缺陷。

这些缺陷全部来自于这样一个事实 —— 任何东西都是不可靠的,网络是不可靠的,服务器也是不可靠的。之所以有这么多种大数据中间件的存在,不仅是因为用户需求的分化,也是针对这些缺陷应对策略的分化。

我们下面分别就分布式存储和分布式计算来展开讨论以下这些应对策略。

分布式存储 - HDFS,HBase 和 Redis

我们上文中提到,服务器和网络(网络是满足某些协议 -- IP 协议,TCP 协议等 -- 的信号传输)都是不稳定的。既然服务器是不稳定的,那么存储在服务器上的数据就应该进行备份。当然在硬件的层面上可以进行 RAID 备份,但是这种方法已经超出了本文的讨论范围。就软件层面上的备份来说,业界的解决方案主要有两种。

全量备份: 假如我们需要双备份存储 1T 的数据,则 A,B 两台服务器上都存储着全量的 1T 数据,A 服务器挂了,B 服务器可以完美顶上。图数据库 Neo4J 采用的就是这种策略。 分片备份: 首先将数据进行分片,比如将 1T 数据分为 3 片,分别是阿片、波片、祠片,如果我们想要双备份存储这 1T 数据,我们可以使用以下的存储策略。在这种策略下,如果 C 服务器挂了,A 和 B 也能够保证数据的全备性。如果将备份数提高,则单点故障发生的概率将会更低。 3.

问题驱动学习大数据(首发在GitChat上)

考虑到全量备份的并没有能够解决单机容量有限的问题,分片备份方案扩展性更强。而分片备份方案,就是 google Google File System 论文的精髓所在,HDFS 也是本方案的一种实现。以下,我们将着重讨论分片备份方案。

分片备份方案解决了单机存储扩展成本和单点故障的问题,但是相应的也引入了其他的问题。这些问题有不同的解决方案,催生了各种的分布式存储方案。

如何进行数据备份(副本更新策略):如果数据有 n 个副本,在我们接到写请求时,几个副本完成数据更新操作后再返回? 如何进行分片(分片策略):如何确定哪些数据组成一个分片,或者一条数据进入哪一个分片。 一个数据分片存储到那几台机器上(分片存储策略):分片存储的策略非常重要,如果策略不完善,我们可能会遇到以下的几个问题。

(1) 一个数据分片几个备份所在的服务器位于同一个机架,机架网络出问题了,导致数据丢失。

(2) 大部分数据被分配到同一台服务器,导致此服务器负载远远大于其他的所有服务器。

(3) 某一台服务器已经挂了,但是数据还在此服务器前排队等待写入,导致大量的超时异常和写入故障。

副本更新

副本更新会引入一致性的问题。如果数据共有 3 个副本,我们设想以下的情景。

请求在 3 个副本全部完成数据更新后再返回:在数据写入后,我们在任意时间任意机器上都能够读取相同的最新数据。 请求在 2 个副本更新完之后就返回,最后一个副本异步更新:请求返回之后,我们立刻进行读取,如果请求被定位到最后一个副本上,我们极有可能读取不到最新的数据,也就是说我们相同的两次请求可能返回不同的结果。但是过了一段时间后(异步更新完成后),这种不一致的现象会消失。

对于第一种情景,我们称为强一致性,第二种我们称之为最终一致性,这两个概念对我们技术选型至关重要。

当然,细分下来一致性模型分为强一致性、最终一致性、因果一致性、“ 读你所写 ” 一致性、会话一致性、单调读一致性、单调写一致性,我以后会尝试写一下,但是本篇不展开描述。

强一致性能够避免脑裂的现象,而且能够保证数据是高可靠的,但是写入效率低,延迟大。最终一致性写入效率高,延迟小,但是会带来脑裂的问题,也不要保证数据的可靠性。在技术选型的时候,我们不得不在强一致性、可用性和分区容忍性之间做出选择,三者不可能同时满足,这就是著名的 CAP(Consistency/Availability/Partition Tolerance)理论。

有些组件为用户提供了可以灵活选择的接口,也就是说如果数据有 n 个副本,可以设置在 i(i<=n)个副本完成更新后返回请求。Kafka 便是如此。

当然,我们这里讨论的也只是副本更新策略的中主从式更新,其他还有同时更新和任意节点更新,后面有机会我们会展开讨论。

分片策略和分片存储策略

在谈及分片策略的时候,我们要考虑到以下的事实:磁盘的顺序读速度要远远大于随机读写。所以常见的大数据存储往往是产生数据块后一次性顺序写入,让落地后的数据是不可变的。

对于如何产生数据块,常用的方法如下:

输入转换成为文件流,按照大小对文件流切分 - HDFS。 将数据暂时缓存的内存中,等缓存到一定的量级后溢出到磁盘 -HBase,ElasticSearch。

问题驱动学习大数据(首发在GitChat上)

以上两种策略解决的是从数据到数据块之间映射的问题,而得到数据块之后,我们同样要确定数据块(分片)和数据节点之间映射的问题,需要注意的是,上图中的数据节点可能是实体的服务器,也可能是虚拟的存储单元。

那么如何建立数据块 / 数据和数据节点之间的映射关系呢?假如一条数据或者一个数据分片有一个唯一的标识 ID,我们考虑以下两种场景。

如果 ID 是整型的,我们可以按照范围存储到对应的节点上,比如A节点存储 ID 1-10,B 节点存储 ID 11-20 等,也可以通过节点与 ID 之间的某种函数关系直接建立两者之间的映射。 如果 ID 是非整型的,我们可以通过某种函数将 ID 转换成为唯一对应的整型数据,然后使用以上的方案。

以上的两种场景可以解决所有的分片存储问题,那么我们展开来看一下这其中存在的一些疑问。

什么函数可以将非整型转换成为唯一对应整型:哈希函数可以做到这一点,而且哈希函数并不唯一,我们这里不详细展开。 什么函数可以建立数据到数据节点之间的映射关系:哈希表可以做到这一点,使用整型值(无论是 ID 还是哈希)%节点数 建立节点与数值之间的映射。

实例如下:

问题驱动学习大数据(首发在GitChat上)

但是这种方式有很大的隐患,第一是极有可能会出现数据倾斜,大量的数据可能会分布在同一个节点上,第二是如果增加或者挂掉一个节点,将会引起大量的数据迁移,这在大数据场景下几乎是一种灾难。为了规避以上的问题,又出现了一致性 hash 算法。将数据按照特征值映射到一个收尾相接的 hash 环上,同时也将节点(按照 IP 或主机名 hash)映射到这个环上。从数据在换上的位置开始,顺时针找到的第一个节点便是数据的存储节点。

实例如下:

问题驱动学习大数据(首发在GitChat上)

采用以上的方案,在添加或者删减节点的时候,迁移的数据量最大只是该节点 “ 上游 ” 的数据量,不会引起全局迁移。但是相应的,增加一个节点只能分担下游一个节点的压力,删减一个节点则压力也会全部传递到下一个节点,并不能做到一方有难八方支援。

为了解决这个问题,通常在实体节点和数据之间再引入一层虚拟节点的概念,数据通过一致性 hash 映射到虚拟节点上,虚拟节点通过某种关系与实体节点映射,虚拟节点数量远大于实体节点。

HDFS,Hbase 和 Redis 存储架构

分布式有很多问题以及从问题引申出的概念,我们只是列举了自己觉得比较重要的几个概念。那么我们下面所要谈及的架构也将围绕这些概念展开。

HDFS 存储架构

为了方(尽)便(快)理(写)解(完),我们简单粗暴一些,先上图一睹为了个快:

问题驱动学习大数据(首发在GitChat上)

字母我们都认识,单词也差不多认识,所以应该不是很难。我们挑几个重要概念解释以下:

Client 用于读写交互的客户端,偏用户侧; Namenode 文件系统的管理节点,维护文件目录树,文件元信息,数据块列表,并负责和 Client 交互; Datanode 提供真实的文件数据存储服务,数据块的载体; Secondary Namenode 一个(粗鄙之语)的命名,容易让人误解为Namenode 的备份,其实只是帮助 Namenode 异步整合元数据文件(fsimage 和 edits);

这些概念本身并不构成逻辑关系,我们先给他们一些形象化的身份帮助理解。Namenode 是校长,Datanode 是班主任,Block 是座位,Client 是学生家长。

家长(Client)把孩子(Data)送给校长(Namenode),校长查询名单 (元数据) 后说某个班级 (Datanode)的某个座位上没人座,你把孩子送过去吧,座位大小固定,如果你的孩子太胖了,就需要把孩子切成几块放过去。

然后家长找到对应的班主任,班主任把孩子块放到对应的座位上。然后汇报给校长说,这几个座位已经有东西了。校长接到班主任的消息后更新列表,孩子来的太快了,班主任们一会儿哔哔一阵,说这个孩子丢了,那个孩子出问题了,校长实在忙不过来,就找了个秘书 (Secondary Namenode) 说我就不一条一条写了,班主任的话我写到一个新的文件里面,你后面把新文件和老文件拿过去帮我合并一下再给我送过来。

以上就是抽象出来的 HDFS 写流程,不是很严谨,但是足够我们对整个架构有整体的了解。

那么 HDFS 是如何做副本更新的呢?

答案是 HDFS 文件再写进去的那一刻就是不变的,而且 HDFS 是三副本写完(学校怕孩子丢了家长找事,会把孩子克隆三份放到三个班级,这样的话即使一个班级出了事故,家长还有孩子用)才会返回响应,所以我们可以说 HDFS 是强一致性的。

但是很少有这么说的,强弱一致性多用在低延迟,随即查询能力强的 kv 分布式存储上,HDFS 这种高延迟的数据存储很少人会去谈一致性的问题。

当然,新版本的 HDFS 有 Namenode 高可用备份机制,还有其他提高便捷性和可用性的措施,校长和班主任的交互也有很多的细节,我们不在这里展开,有机会再详细说一下。

Hbase 和 Redis 存储架构

从上面我们知道,HDFS 是以块为单位进行存储的,当我们要取一条数据的时候,需要扫描至少一个块,实际上很多时候我们不得不扫描所有的块,其成本可想而知。对于那些做统计工作,并不关心某一条数据的个体情况,他们很喜欢这种存储模式,这也就是所谓的批处理。

如果你是家长,你把孩子放到学校,找他的时候需要一个一个班级挨个找,你愿意吗?我们希望说出孩子的名字,能够立刻就能找到孩子,这时,我们需要一种 kv 存储,类似于我们常用的 HashMap 结构。

Hbase 存储结构如下图

问题驱动学习大数据(首发在GitChat上)

我们先来说 Hbase。Hbase 的设计非常巧妙,它的存储建立在 HDFS 之上,也利用 HDFS 数据块的不变性实现了强一致性。既然两者之间有关系,我们先对比一下 Hbase 和 HDFS 的区别之处。

HDFS 适合批量查询, Hbase 适合单点查询。 HDFS 不支持单条数据的更新和删除操作(当然可以通过批量修改覆写实现),Hbase 则支持。

也就是说,Hbase 基于 HDFS,但是实现了场景迁移,从批处理迁移到随机查询,实现迁移的过程也就是架构形成的过程,所以我们基于上面两个不同来推演一下 Hbase 的架构。

关于第一点不同:

为什么 Hbase 适合单点查询,因为它随机查询能力强,返回响应速度快。

为什么响应速度快呢?是因为它在找寻一条数据(一个 key)时需要扫描的数据量少;为什么能少呢?是因为它能够很快的定位 key 在哪些数据块上;为什么能够定位这么快呢?你猜。

我们先自以为是的抛出一个结论:如果说在磁盘上有加快数据定位速度的方法,那无非是数据组织和索引,Kafka,ES,Hbase,ORC,Parquet 都是如此。

所谓的数据组织,就是类似于排序等操作,让数据的排列是有章可循的。Hbase 是怎么做的呢,我们举一个例子,假如 key 1-100 存储到 Hbase,Hbase 按照顺序将它分为十块,分别存在 10 个 HFile(Hbase 是 KV 形式存储的 HDFS,具有 HDFS 的三备份属性)中,在存储的时候它就知道每个块的最大值最小值,将这些信息存储到元信息表中(ROOT 表和 META 表),如果我们查询 key 5 对应的 value 是多少,那么首先找到表,然后找到块(第一块),然后将第一块加载到内存扫描(其实块内也有一些索引),因为之扫描一个块,很快就能够找到想要的数据。

当然,这时最理想化的情况,实际上,我们并不知道数据写入的顺序,那么我们如何保证写入后的数据是有序的呢,Hbase 会将写入的数据缓存到内存中一段时间(MemStore),当缓存的数据量达到一定大小的时候就会按照 key 字典序排列后溢出到 HFile,所以 Hbase 只能保证同一个 HFile 的数据有序,而不能保证整体有序,所以说扫描的时候有可能需要扫描多个块。

以上面例子来说,假如第一个块最大值是 20,最小值是 1,第二个块最大值是 30,最小值是 2,那么我们在找 5 的时候需要将这两个都加载扫描。当然我们还可以使用布隆过滤器帮助快速定位 key 到底在哪个块上。

关于第二点不同:

为什么 HDFS 不支持更新和删除,因为数据块落地之后是不可变的;为什么 HBase 支持,这与 HDFS 的不可变性冲突了吗?

其实并没有,Hbase 并没有直接操作 HDFS,而是标注出哪些数据需要做修改,在查询端将这些标注作为过滤条件,比如说查询已经标注为删除的数据,得不到结果只是因为它被标注成为删除了,HDFS可能并没有删除该数据。

随着更新数据越来越多,这种操作信息也会越来越多,势必造成存储和维护的压力,所以 Hbase 会在后台定时对数据进行合并和分拆,在做融合的过程中将这些操作逻辑加进去,这时候数据才真正被更新,因为融合操作生成了新的 HDFS,所以并没有违背不可变性。

需要注意的是,Hbase 比其他的 KV 数据库更骚的一点是,它有列的概念,也就是一个 key 可以对应多个列。但是列是不需要对齐的,也就是说每个 key 不需要具有完全相同的列。而且列之上有列族的概念,列可以不对齐,但是列族必须要对齐。

Redis 存储结构如下:

问题驱动学习大数据(首发在GitChat上)

关于 Redis(我们这里指 Redis Cluster),它并不是基于 HDFS 的,所以以下的问题需要被重提:

它是如何做到副本更新的; 它是如何进行分片的。

关于副本更新:

我们知道 Redis 是内存数据库,但是也并非与硬盘没有任何关系,否则一旦机器重启数据岂不是全部都丢了,所以 Redis 会将 RDB(数据)和 AOF(指令)持久化文件写入到硬盘保证数据的可靠性(当然也可以不开启持久化)。这两个文件在副本更新的时候起到了重要的作用,在从节点刚刚挂载的时候,主节点会将 RDB 文件发送给副本进行数据初始化,在初始化之后从节点会定时向主节点发送同步请求,主节点会将这段时间内的 AOF 文件发送给从节点,从节点执行 AOF 上的指令,从而达到数据同步。

主从节点均可以接收来自于 Client 的读请求,可以推断出,在某些时间段(从节点从发送同步请求到执行完毕过程中),主和从的数据是不一致的,所以 Redis 是最终一致性的数据库。

关于数据分片:

Redis 集群由 16383 个 slot 组成,这些 slot 按照范围分配到各个节点上,写入的 key 按照 hash 映射到 slot 上,从而对应到节点。我们可以把 slot 看作虚拟节点,这样便可以看出 Redis cluster 采用的是一致性 hash 分片策略。

可以看出,Redis 的架构相对来说是比较简易的,真正有意思的是编码和压缩的策略,如果后续有机会我们再分享吧。

分布式计算 - Spark

分布式存储出现是因为单机存储资源有限且难以扩展,分布式计算出现同样是因为单机计算资源有限且难以扩展。存储资源一般指磁盘容量,而计算资源通常指内存和 CPU。

我们引用中科院的定义来详细看一下分布式计算到底是什么:

分布式计算是一种新的计算方式。所谓分布式计算就是在两个或多个软件互相共享信息,这些软件既可以在同一台计算机上运行,也可以在通过网络连接起来的多台计算机上运行。

分布式计算比起其它算法具有以下几个优点:

1、稀有资源可以共享。

2、通过分布式计算可以在多台计算机上平衡计算负载。

3、可以把程序放在最适合运行它的计算机上。

这个定义真的是精辟,不仅指出了分布式计算的内涵,也同样指出了分布式计算的问题。我们分解一下这个定义。

首先我们标注一些关键词:软件,网络,运行,资源,共享,平衡,负载,适合

我们不看上面的定义,只看这些关键词,便可以勾勒出分布式计算需要的子系统:

1、调度系统:对应关键词 平衡,适合,负载 --> 想要生活的井井有条,最好有人帮你安排好。

2、通信系统:对应关键词 网络,共享 --> 想要沟通一起玩耍,至少有个电话吧。

3、计算系统:对应关键词 软件,资源,运行 --> 想要吃饭,至少有个活厨子吧。

同样,我们可以先列举这些关键词的对立面:网络不可靠,运行出问题,资源不足,共享不畅,分配不平衡,负载过大,不适合 根据这些反向关键词勾勒出分布式系统可能存在的问题:

1、资源调度不合理,导致负载不平衡,不能充分使用集群资源: 对应关键字 资源不足,分配不平衡,负载过大,不适合。

2、计算可靠性不高导致结果错误或者计算失败 : 对应关键字 网络不可靠,运行出问题。

调度问题占了 4 个关键字,占总关键字的 67 %,说明中科院认为调度问题是分布式计算的最关键因素,是的,我们也是这么认为的。所以我们会首先介绍 spark 的调度系统,然后带出其他两个系统,顺便将 spark 解决两个问题的方案带出来。

Spark 框架

Spark 的调度系统从上到下包括 DAGScheduler(Job 级调度器)和TaskScheduler(Task 级调度器)和 SchedulerBackend(调度资源管理器)组成,其关系如下图所示:

问题驱动学习大数据(首发在GitChat上)

为了让大家快速对这个流程有一个直观的认识,我们参照例子解释一下:

学校刚来的校长(DAGScheduler)决定梳理一下学校现状,统计学校男女学生的人数,出一份报表(Job - 输出男女人数报表),决定让秘书先找人按照班级各自统计一下男女人数,每个班产生一个报表,然后让合并男生和女生的数量(两个 Stage - 班级报表,报表合并),校长同时规定了最多派多少组人(Executor - 实际执行工作的环境,对应 Java 虚拟机)去干这件事,每个组最多多少人(Thread - 单个 Executor 中最大的并行工作个体,对应 CPU 核数 - 每个 Thread 只能干单一的活,要不只统计男生数量,要不只统计女生数量)。

秘书拿到指令后,根据班级的数量,班级人数和班级的位置对组进行具体工作的规划(比如说哪组人负责那几个班级最快-离得最近)。

然后秘书到外包公司集团(SchedulerBackend - 专门外包人力的组织)按照规划要人,外包集团会将人力需求分配到各个公司(Worker),如果正好所有人都被派出了,就告诉秘书等一会儿(Spark 程序启动时的 Accept 状态),一旦有人回来了,外包集团就让这个人按照规划去对应的几个班级统计男女人数(Task - 数量等于班级数 * 2 - 因为每个班都要统计男生和女生数量),随着人员逐渐回归,派出的人数也会逐渐跟上规划。

校长会跟踪统计的情况,每当有一个班级的男生或者女生被统计出来了,校长就掏出小本在待办清单上把这个 Task 抹掉,直到所有的都抹掉了,然后去看每个班报表是否都生成了,就认为班级统计(第一个 Stage )完成了,同理,校长也会统计 Stage 数量用来判断 Job 是否完成了。

虽然这个工作使用 Spark 的框架做有点杀鸡用牛刀,但是上面的例子应该会让大家对于其中的概念有一个大概的认识,我们下面再对这些概念给出相对严格的定义。

Job 指一次 action 操作,Spark 对数据的操作包括 transform 操作和 action 操作,action 操作指有数据输出(无论是输出到文件还是输出到控制台)的操作,spark 是懒的,一段程序只有遇到 action 操作才会真正调度执行。Spark 按照 action 操作对应用进行 Job 切分。 Stage 一个 Job 会根据 Shuffle 被拆分为多个 Stage 。所谓的 shuffle 就是宽依赖。所谓的宽依赖就是父 RDD 的分区影响了多个 RDD 的分区。

问题驱动学习大数据(首发在GitChat上)

Task Spark 最小执行单元,一般而言数据每个 partition 对应一个 Task。 Driver 就是代码中的 SparkContext,整个 Spark 作业启动、调度、监控者。 Worker 可运行的物理节点。 Executor 执行 Spark 的处理程序,也就是一个 JVM进程,如果使用 yarn 调度的话则对应 yarn 中的一个 Container。

这些概念的关系在上面的例子中便可以推演出来,官方的图则更加简明一些:

问题驱动学习大数据(首发在GitChat上)

虽然 Spark 有自己的调度机制和调度逻辑,但是也只能实现多个提交的 Job 内做出优化,如果要针对集群资源情况和负载压力做出最佳的分配决策,仅凭 Spark 本身是做不到的。所以在工程应用上,很少会单独部署和使用 Spark ,而是将其与 yarn 结合起来,利用 yarn 资源管理和分配能力给出最优的分配策略。

实际上, MapReduce 和 Storm 也通常会依赖 yarn 的资源管理能力。这也就是上文中第一个问题的答案——不要逞能,专业的事情交给专业的人做。

Spark 的内部通信是非常复杂的,client 与 master 之间, master 与 worker 之间 ,worker 内部之间都有通信,而且整个运作时很快的,资源消耗也是比较大的。所以不会采用阻塞 I/O 的方式在线等待,理所当然的选择了 NIO(发出响应后立刻返回,释放资源,等待回调)模型。实际上,几乎没有哪个大数据在线系统会不采用这种方案。在 1.6 之前,Spark 采用了基于 Actor 模型的 Akka 通信框架,但是因为一些管理和依赖冲突的考虑,在 1.6 以后替换成了 Netty 框架。

Spark 也使用了一系列的超时机制来防止网络异常带来的问题, 通过累加器计数来识别运行时的异常并会使用 Stage 重新提交来解决这些异常。

Spark 性能调优

谈及调优这个问题,往往时没有止境的。我们在这里只是尝试为调优给出一些方向。起到抛砖引玉的作用。

所有脱离业务数据特征的调优都是耍流氓。其实这不只是 Spark 调优的原则,更应该是所有数据作业调优的原则。比如说写 ETL,脱离开业务的 SQL 优化多半都是苍白无力的。假如我们的业务数据碎片化严重,片与片之间互相不依赖,在资源分配的时候就应该尽量提高并行度,多给线程,每个线程分的内存少点。 也就是说,在上面统计学生数量的时候,假如班级特别多,每个班的人数又比较少,就应该多安排点能力一般的人过去。如果我们的业务数据碎片化不严重,每个片处理起来比较困难,在资源分配的时候就应该尽量提高单兵作战能力,也就是说假如班级不多,每个班的人数很多,就应该安排少量能力很强的人过去。当然,我们也可以安排很多能力很强的人去,前提是我们要有钱,而且老板要允许我们这么败家。 要尽量减少 shuffle 的数量。当我们谈 shuffle 的时候,我们到底在谈啥?其实说到底 shuffle 是落盘和网络通信的集合体,就比如说上面查学生的例子,因为后面要合并,所以每个班都要把数量写下来(落到硬盘),否则只需要记在脑子里就行了(内存),而且每个班的报表还要跑腿送到合并负责人那里(网络传输)。 Spark 之所以比 MapReduce 快(MR每个步骤都要排序和落盘),就是因为它减少了落盘,大部分时候数据就是在内存中变来变去,在程序设计的时候,要尽量减少宽依赖。使用 coalesce 替代 repartiton 有时(注意是有时,有时效果完全一样)是一个不错的办法。 在工程中不要使用 groupByKey ,让然随着 Spark 的进化,API 已经越来越傻瓜化,很多人已经完全抛开 RDD 编程了,但是我相信,那些复杂的逻辑如果使用 RDD 编程会更加得心应手。当使用 RDD 编程的时候,一定不要使用 groupByKey ,聚合操作本来就是 shuffle 操作,groupByKey 居然还不在 map 端聚合,把小文件统统拉到 reduce 端聚合,带来很大的网络传输成本。在工程上,请使用 aggregateByKey 或者 combinByKey 自己实现 map 端聚合的逻辑。 编程不要直接上生产,先在 spark shell 上进行简单的逻辑和性能测试后再上生产。 spark shell 是个好东西,好东西就应该发挥它的价值。 合理设置 spark.default.parallism 的值。 task 的数量在初始化的时候等于数据源 partition 的数量,但是在 shuffle 之后等于该参数值设置的数量 ,默认值为 200。当 OOM 出现或者程序运行速度慢的时候都应该看一下这个参数是否合适。值得注意的是,该值也并非越大越好,因为在 shuffle 之后,会产生 该值 * core(每个 Executor 的核数)个临时文件,如果该值太大,临时文件太多也会拖慢运行速度。 避免数据倾斜。所谓数据倾斜,就是说绝大多数的数据被分配到一个 Executor 执行去了,假如一共 10 个执行器,九个在十秒就执行完了,最后一个执行了半小时还没有出结果,最后导致作业进度严重拖慢甚至失败。避免数据倾斜,第一种方法是在业务上重组 key ,通过追加字段等手段让 Key 的分布更加碎片化,避免集中。第二种方法是自己写 Partitioner ,根据业务自己写分区器,不使用默认的 Hash Partitioner。 尽量少使用 collect 。collect 操作会将分布式数据集全部拉回到 Driver,第一速度慢,第二会对 Driver 产生较大压力。 合理使用 cache (persist)。因为 spark 是懒的,每次计算它都会从最早的依赖从头算一遍,比如说 B 依赖 A ,C 依赖 B ,D 也依赖 B。假如不把 B cache 住,算 D 和 C 的时候都会先把 B 算一遍。当然要记得 unpersist。 学会根据业务数据选择垃圾回收器。各种垃圾回收器的追求方向是不一致的,有的追求速度,有的追求吞吐量。而 Spark 的 Executor 本质上是一个 JVM 虚拟机,具有虚拟机的所有特性,垃圾回收是 JVM 一个重要的优化点,所以也是 Spark 一个重要的优化点。

当然,实际使用的时候会出现各种各样的问题,上面提到的方向不可能解决所有的问题,欢迎大家多交流。技术大牛很多都是虐出来的,我希望尽快被虐成大牛。

流处理框架 - Spark Streaming

我们假设一种场景,广告部门想要针对用户做推送,当一个用户进入一家商场的时候,广告部门想在 5S 内关联到这个用户的消费记录,年龄等信息,为这个用户推送她可能感兴趣的广告内容(无论是公众号,短信还是其他媒介),引导她进行消费。

这时当下非常常见的一种场景,但是怎样应对这种场景呢?用 Spark 吗,用 MapReduce 吗?只是启动计算引擎的时间可能都不止 5S ,何谈接受,处理,决策和推送。所以,应对这种低延迟,在线的作业需要一种流处理的引擎和解决方案,Spark 和 MapReduce 更加适合离线批处理的场景。这种流处理引擎更像是一种监听服务,是一个常驻进程,一直在等数据过来,一旦过来立刻处理,处理完了也不歇着,继续等待下一批数据过来。

其实我们这个标题并不十分恰当,Spark Streaming 更像是一种引擎,不能算是一个完整的框架,因为流作业至少需要一个管道,但是 Streaming 是不提供这种管道的,只提供计算能力,所以工程上经常使用 Kafka ,MQ ,Socket 来提供管道能力。Kafka 也是一个有意思的东西,但是超出了我们这次讨论的范围,以后有机会我们也可以玩一下。

当我们提到流框架的时候,我们在谈什么?所谓的数据流,本质上是数据随着时间序列依次进出的过程,流是与时间点绑定的,而批是与时间段绑定的。到这里,对于流处理方案就有了方法论上的分歧,有人认为时间点累计就成了时间段,所以,批是特殊的流,有人认为时间点是时间段无限的分割,所以流是特殊的批,Flink 选择了前者,而 Steaming 则选择了后者。所以对于 Streaming 来说,本质上还是批处理,只不过是批的量级被切分成很小而已。所以Spark Steaming 是将 Spark 引擎做了一层包装,外层提供了一个编程模型。

其架构简图如下:

问题驱动学习大数据(首发在GitChat上)

图中的 input data steam 严格说来不算是 Steaming 的范畴,而是数据管道的范畴(当然 Steaming 也可以通过读取文件构造流),提供了实时输送的能力。管道中的数据流经 Streaming 转换成为 batches of input data ,然后流入 Spark 引擎。 因为 Spark 引擎处理的是 RDD ,所以 Spark Streaming 是将管道流数据封装成了一个 RDD 顺序序列,也就是 DSteam。

在我们拿到 DSteam 后,可以使用 foreachRDD 拿到其中的每一个 RDD ,然后使用 Spark Core 中的 RDD Api进行编程,调度和运行模型也几乎与 Spark 完全一致。当然 DStream 也有专属的 API ,例如windows操作便是针对时间序列的操作。Streaming 通过 chinkpoint机制实现容错,会将每一批数据的执行状态写入到 HDFS 系统中,可见其肯定是有一定延迟的。

Flink 是流处理领域的后起之秀,但是在华为的工作生涯并没有机会能够接触这一技术栈,但是到了互联网金融行业后,我们正准备调研 Flink 的可用性,所以后续有可能会工程应用这一技术,这让我非常兴奋。但是至今为止,我并没有有机会应用这一技术,所以下面关于 Flink 的论断,纯属形而上的个人理解,如有错误希望大家务必指出,提前感谢。

Streaming 是准实时的处理引擎,做不到毫秒级的延迟,而 Flink 则是实时处理引擎。之所以 Flink 能够做到这一点,说到底是因为方法论的区别,Flink 的控制可以精确到每一条记录,但是 Streaming 的控制只能控制到每一个 batch。这从窗口操作便可见端倪,Flink 支持两种窗口模式,一种是按照时间生成的窗口,另一种是按照数据条数生成的窗口,但是 Streming 则只支持时间窗口。时间窗口是无法预知数据量的,有可能一个窗口数据量很大,也有可能一条数据都没有,所以 Streaming 达不到事务级别的支持。

而且 Flink 的内存管理效率要更高一些。至于两者之调度机制和运行原理,本质上则并没有太大的区别,在这里我们就不展开,我也不敢乱说。等到后续我有了大规模的开发经验,再回来补充吧。

上一篇:python2.7默认编码修改为utf-8
下一篇:Zookeeper应用程序解析
相关文章
图文推荐
热门新闻

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站