RDD 弹性分布式数据集
本节介绍 Spark 计算的核心数据元素:RDD(弹性分布式数据集)。我们首先介绍它的 Lineage (血统)机制、Partition 分区存储,然后介绍它的各种操作,包括创建、变换、控制、行动。我们特别介绍 RDD 的 Lazy 评估机制,这让 Spark 可以自动优化操作的执行计划,加快处理速度。最后介绍两个 RDD 编程的实际示例。通过理解 RDD,我们对 Spark 的内部机制就非常理解了。
RDD 弹性分布式数据集
我们首先介绍 RDD 的 Lineage (血统)机制和 Partition 分区存储。
SPARK 的核心数据结构是 RDD。R 是 Resilient(弹性)的英文首字母。第一个 D 是Distributed(分布式),第二个 D 是 Dataset(数据集)。所以RDD 是弹性分布式数据集。
我们首先介绍它为什么是“弹性”的。大家想想为什么它叫“弹性”这个名字?因为它坏了后,还能再生。大家记不记得,咱们这些机器都是些什么机器?它们随时会坏,对吧?但 RDD 这个数据集是有“弹性”的,就是说一旦它坏了,它还能再生。具体来说,就是一旦机器宕了,它能够从头恢复回来。这就是它的弹性。
RDD 为什么能再生呢?因为它有一个“血统”(Lineage)机制。什么叫血统机制?就是那些良种马,它前 100 代都有记录的。RDD 也类似。它记录着自己是怎么从最原始的数据一路计算过来的。因为 Spark 是个基于内存的,所以这个 RDD通常就存在内存里头。所以,如果你的机器断电了,这个机器里的内存里的东西不就没了?这时候,一旦它发现这个 RDD 没了,它就根据这个“血统”记录里的东西,再重新从原始数据开始,一路算过来,就恢复了。这就像我们有一个日记本,记录着我们高中是如何学习的。等我们考上了大学,突然有一天时间反转了,我们回到了高一,怎么办?我们就再按着自己当年这个日记本,从头学过来,又能恢复到现在的你,就是这种感觉。RDD 就是这样具有弹性的。
我们然后介绍它为什么是“分布式”的。RDD 其实是一个很大很大的分布式的数据结构。一个 RDD 很可能在一台电脑上存不下,所以它内部是做了切分(Partition)的。就像我们的硬盘要分成 C 盘、D 盘似的,切出来的每一部分,就叫一个“分区”(Partition)。我们对它操作的时候,看起来我们是对这一个 RDD 操作,但实际上 Spark 会把咱们的操作用 Map Reduce 的方式,分到各个 Partition 上去做。这些 Partition 很可能存在不同的机器上,这样它就实现了在很多计算机上并行地工作。所以 RDD 是分布式的。
RDD 操作
我们现在介绍 RDD 的各种操作,包括创建、变换、控制、动作。
RDD 的主要操作可以分为两种:一种是 Transformation(变换),另一种是 Action(动作)。其中,Transformation 包括 Filter(过滤),join(合并),sort(排序),reduce by key,group by key。Action 包括Reduce,collect,count(计数),first(取第一个结果),take(取出好几个结果),save 到文件里,等等。
Spark 之所以会分“变换”和“动作”这两种操作,是因为它设计了一个很有用的 “Lazy” 操作执行机制。这个机制就是:节点收到“变换”命令后,不会马上执行这些操作,因为它“懒”(lazy)。那么,节点什么时候才开始执行呢?在收到一个“动作”操作后。节点收到一个“动作”命令后,就会把在此之前收到的所有还没有执行的“变换”操作,全部执行一遍,直到完成这个最新的“动作”命令。
所以,你给 Spark 输入完一个“变换”命令(比如 map,reduce by key,sort by)后,回车,它会立刻要你敲下一行。你发现它这么快就让你敲下一个命令,就会觉得很神奇:我这个数据这么大,它不可能这么快就完成这个操作啊。确实不可能。它这么快就回复,只是因为它是 Lazy 的。它实际上并没有完成你要求它做的这个操作。直到你输入一个“动作”操作,比如 take(10),就是你想看 10 条最终结果了,它没办法了,这个时候它才开始拼命做了。这时候你可能要等半个小时,才能出来结果。
这就像陈老师给大家布置作业,有两种指令:一种是“做练习”,另一种是“交”。陈老师布置完“做练习”后,大家不会马上做,但会记着。这样就累积了一些作业。然后等陈老师发出指令“交”的时候,大家才开始猛做:就把前面累积的作业都完成,然后“交”上来。
Spark 的 Lazy 操作执行方式,有什么好处呢?这样的话,它就可以优化这些累积下来的操作的执行顺序,提高处理速度。具体来说,我们很多时候输入的很多命令,先后顺序不是特别好。它就会自动帮你优化这些执行的顺序。
这就像老师想起来了就布置一个作业,慢慢地大家就累积了 10 个作业。但这些作业的布置顺序可能并不是最优的。你可能先做后面布置的一个作业,掌握了它之后,再做前面布置的一个作业,效率反而会更高。这时候,聪明的同学就会把这 10 个作业排出来,整理出一个最优的作业顺序,然后再做,这样就做得更快。这就是“磨刀不误砍柴工”,看起来“懒”(Lazy),其实完成得是更快的。Spark 选择这种 Lazy 的操作执行机制,也是这个原因。
日志分析示例
我们下面举个例子,说明 Spark 的运行机制。这是一个日志分析的 Spark 示例。
我们首先看日志数据输入。首先,日志数据是存在 LoglinesRDD 里的。这个 RDD 里面存了很多的日志。大家工作了以后,会处理很多日志。比如说去银行,它的系统每天产生很多日志,甚至出现日志风暴。因为这个 RDD 特别大,所以这个RDD 用了 4 个 Partition 来存。
第一个操作是“过滤”(Filter)。Filter 是个 Transformation(变换),不是个“动作”(action)。这里我们要它过滤出来“故障”(Error)。所以我们送进去一个 Lambda 函数。这个函数会检查我们过滤的布尔表达式是不是被满足。如果这个 Lambda 函数被满足,就让它通过,否则的话,它就被滤掉了。所以,现在 Warning(警告)什么的都就过滤掉了,只剩下 Error(故障)。
第二个操作是“合并”(Coalesce)。因为我们有 4 个 Partition,其中一个 Partition 过滤后都没有数据了,所以我们把这些 Partition 合并成两个 Partition。
第三个操作是“收集”(Collect)。注意,Filter 和 Coalesce 都是“变换”,所以,它们不会立刻被执行。直到你说你要 Collect 了,就是你要把结果给我报出来了,这个时候他就开始“动作”(action)了。就像打印机似的,他就开始打印了。这时候他就开始干活了。
当要执行的操作众多的时候,这些操作因为数据的依存关系,会构成一个 DAG(有向无环图)。DAG 是图论里面的一个概念,其中 D 是 Directed(有向),A 是 Acyclic(无环),G 是 Graph(图)。为什么大数据的操作可以模型为有向无环图呢?因为我们做大数据处理的时候,有可能要从这里弄点数据过来,变换一下,那里再弄点什么数据过来,再变换一下,然后把这些变换的结果联合起来,再变换一下,然后把结果再和另外一个地方的结果再联合起来,变换一下。这种变换所需要的数据之间的依存关系,就构成了一个有向无环图。
当我们输入了各种“变换”操作后,最后说要执行一个“动作”(Action)时,Spark 就开始优化这些操作的 DAG,然后开始从头执行。所以它其实是执行这个图。因此,它在执行这个图之前,就可以做优化。
Cache 操作
上面是 Spark 自动帮助我们优化操作的执行顺序。但是,当我们要执行多个 Action 时,我们自己还可以利用 Cache(缓存)命令,来缓存一些中间的 RDD,来增加多个 Action 之间的共享,因此提高整体的运算速度。
为了理解这一点,让我们发展一下上面的例子。对上面的例子合并得到的 RDD,我们可以让它 saveAsTextFile,让它输出。saveAsTextFile 是一个 Action,因此,Spark 收到它之后,就会开始从 Filter 开始,执行前面累积的“变换”。
那么,让我们想象这种情况:我们需要基于这个 RDD,还要干些事。比如,统计一下故障的个数,或者再过滤一下,挑出报告 msg1 的故障,并打印出来。这里的“统计个数”和“打印 msg1 的故障”就都是 Action 了。所以,我们就有三个Action。
那么,如果我已经做了 saveAsTextFile 这个 Action,得到了合并后的所有故障的 RDD,而这个 RDD 是可以被后面的 “统计个数” 和 “打印 msg1 的故障” 这两个 Action 重用的,那么,后面这两个 Action 是不是就可以重用它,就别再重新算一次啦?
所以,我们就加一行语句,对合并后,所有故障的 RDD,做一个 cache(缓存)。这样,当它被算出来后,Spark 就会保存它,后面的 Action 就不用再算一遍了。这是很重要的。
我们总结一下。对于一个 Spark 程序来说,它的工作流程是这样的:首先创建一些输入的 RDD,比如 load text file 就是从 Hadoop 的 HDFS 里读文本文件,生成 RDD。然后,输入各种 Lazy 的 Transform,比如 Filter、Map、sort。如果我们发现一个中间生成的 RDD 是需要重用的,就明确的要求 Spark 缓存(Cache)它。最后,我们输入一个 Action,比如让它统计个数,或者收集最后的结果,Spark 就开始优化各种变换的 DAG 图,然后在各个 Partition 分头干活。这就是 Spark 的工作流程。
Spark Lazy 优化示例
RDD 最有意思的是:它是 Lazy(懒)的。为什么说它懒呢?因为它收到你的一个指令之后,不会立刻干。为什么会这么做呢?这就让它有一些优化的可能。为了理解 Spark 基于 Lazy 操作运行机制的 DAG 优化,让我们看下面这个例子。
比如说这里有两个表。一个是用户信息表,列出了我们班上的同学,包括同学的 ID、姓名、实验室。一个是用户参加 Party 的登记表,列出了 Party 的举办时间、参加同学的 ID。比如这几个同学去了 Party A,那个同学去了另外一个 Party B。
然后我想把两个表 Join 起来,并且 Filter 一下,找出 2015 年 1 月 1 日前参加过 Party 的同学所在的实验室,以后就多给这些实验室发传单。
我送进去 Spark 的操作顺序是这样的:
我先把两个表 Join 起来。怎么 Join 呢?就拿用户表里面的用户 ID 和事件表里面的用户 ID,把它们对上,这样把两个表 Join 起来。
我然后进行 Filter 过滤。挑出 2015 年 1 月 1 日前的。
如果 Spark 不 Lazy,收到命令后就立刻按照输入操作的顺序执行,就会产生很大的浪费,做很多无用的 Join。假设用户参加 Party 的登记表里有 3 万条记录,用户表里有 300 个同学。按照我输入的操作顺序执行,首先做 Join,这就需要 Join 3 万次,最后得到近 3 万条记录。然后对这近 3 万条 Join 后的记录进行过滤,最后得到可能 3 千多条。
相反,如果 Spark 是 Lazy 的,那么,它就有优化我们指令的机会,避免上面提到的这些浪费。具体来说,收到第一个 Join 命令之后,它不会立刻执行。它就会等着。等收到第二个 Filter 指令之后,它也还等着。直到我们最后要执行一个所谓的 Action,就是要结果了,比如 head,查看前几行的结果,这时候它不能 Lazy 了,就会起来,看看收到的所有这些指令。看了之后,它就会自动地优化。
它怎么优化呢?它会很聪明地想:如果我们先过滤登记表,然后再去 Join 呢?这样就能减少上面的这些无用的 Join。先过滤登记表的话,我们就能够把 3 万条登记记录,缩小到 3 千多条,然后对这 3千多条记录,和用户表做 Join。这就只需要 Join 3 千多次了。这就是 Spark Lazy 的原因。
因此,Spark 的 Lazy 让它能自动帮助我们进行优化。具体来说,即使我们输入给 Spark 的操作是先 Join 再 Filter,因为 Join 是 Lazy 的“变换”,所以它不会立刻做。等到它收到 Filter,然后接到一个 Action,它就会帮你优化。就是说:虽然同学你先输入 Join,但是 Spark 会帮我们改成先做 Filter。这就是 Spark 帮我们做的优化,这也就是 Lazy 的好处。没有 Lazy 的话,你输入了 Join 以后,他立刻就干,等你输入 Filter 后,他想优化都优化不了。
我们在平时工作生活中,即要自己保持适当的 Lazy,也要给别人的 Lazy 提供条件,以便自己和别人都有优化的时间和空间。比如说同学毕业的时候,一会这个同学找老师签名,一会那个同学找老师写评审意见,一会另一个同学又来说老师那个网站上赶紧帮我点一下。经验丰富的老师就是 Lazy 的。他得到一个同学的请求后,不会立刻做,而是会记着。等所有同学都来找他之后,知道所有同学的材料都准备好了,再一次批处理完成所有这些工作,大家心情就都很愉快,对吧?大家平时也是这样。不要时时刻刻挂在微信上,收到每一个消息都立刻回。每天看一两次微信,统一回一下就行了。有重要的事,别人会给你打电话的。同学们找老师签字、改论文,也要尽量早一点,给老师一点优化的空间,不能说老师一个小时以后就要截止了,你赶紧帮我改。这时最顶级的老师也崩溃了。
总结一下,大家现在学会了,Spark 有两种操作。这些操作构成一个 DAG(有向无环图)。Spark 是 Lazy 的,这样它就会在做 Action 的时候,优化 DAG 进行执行。Spark 是很聪明的。
小结
本节介绍了 Spark 计算的核心数据元素:RDD(弹性分布式数据集)。RDD 具有 Lineage (血统)机制,可以从头恢复。它也通过 Partition 分区存储,因此支持并行计算,提高了处理速度。
我们还介绍了一个 RDD 程序的流程示例:日志处理。由此介绍 RDD 的创建、变换(Transformation,如 map,filter,reduceBykey,join)、控制(如Cache,指定特定 RDD 缓存起来,重用)、行动(Action,如reduce,collect,count,first,take,saveAsTextFile)等。
我们特别介绍了 RDD 的 Lazy 评估机制,即系统会等遇到 Action时,才开始执行前面收到的代码。这是因为 Spark 可以自动进行执行计划的优化,加快处理速度。我们将用一个“先 Filter,再 Join”的例子对此进行说明。
通过理解 RDD,我们对 Spark 的内部机制就非常理解了。
Index | Previous | Next |