MapReduce

本节介绍 MapReduce 分布式计算模型。它能够实现大数据的分布式处理。

Map Reduce 基本概念

Map 和 Reduce 是一种基本的计算模式。它在 Python 里就有。让我们来看下面的 Python 示例:

>>> list(map(lambda x: x * x, [1, 2, 3]))
[1, 4, 9]

>>> from functools import reduce
>>> reduce(lambda x,y: x + y, [1, 2, 3])
6

如上面的代码所示,map 就是把输入数组(即 [1, 2, 3])里的元素,依次送入 lambda 函数进行计算,并把结果也依次输出。而 reduce,是先把输入数组(即 [1, 2, 3])里的前两个元素(即 1 和 2),送入 lambda 函数进行计算,得到 1 + 2 = 3,然后再在输入数组中,取下一个元素(即 3),然后把这个 3 和前面计算得到的结果 3,又送入 lambda 函数进行计算,这样就得到了 3 + 3 = 6。依次类推,直到整个输入的数组都计算完。所以,reduce 的操作,有点像滚雪球:它把前面滚出的结果,又送入函数,接着滚,直到整个数组都被滚完,然后把最后计算的结果输出。

MapReduce 分布式计算范式

在分布式系统中,Map Reduce 是一种编程范式。它有两个步骤:第一个步骤是 Map,第二个步骤就是 Reduce。

我们给大家举一个例子来说明 Map Reduce。这个例子是单词计数。假设我们有很多的文档。我们想要统计这些文档里各个单词出现的次数。

我们首先分词。如果是英文文档的话,我们就拿空格就能够分出来一个个单词。中文的话,咱们先要用分词的工具把文本分为一个个词。

我们然后统计单词出现的次数。大家看,文档一,文档二,…,我们有很多文档。不同文档存在不同的计算机里。因为一台计算机即有存储,又有 CPU 可以计算,所以咱们首先把文档用存储它的计算机做 Map。用文档一的电脑做文档一的 Map,文档二的电脑做文档二的 Map。这个 Map 的工作,就是各自统计各自的文档中单词出现的次数。所以这个中间结果呢,就是电脑一,它发现词汇一在它存的文档里出现了几次,词汇二出现了几次,词汇三出现了几次,等等。电脑二也计算了,发现词汇一在它存的文档里出现了几次,词汇二出现了几次,词汇三出现了几次,等等。然后呢,我们就做 Reduce,就是把这些电脑的结果再加起来,得到词汇一总共发生了几次。这就算出了所有文档中每个单词出现次数的总和了呀。

示例具体过程

我们下面来具体介绍一下上面例子的 MapReduce 过程。

首先,这是一个单词记数的例子。比如说百度。它有一天突发奇想:能不能统计一下整个中国互联网上的网页一共有多少个单词。百度怎么做呢?它就看看自己的特别大的服务器集群,里面有几万台服务器。每个服务器都存着一部分网页。它就想:那我分别统计每个服务器上网页上的字数,然后把它们加起来,就可以了吧?

然后,它就在每台电脑上,对自己电脑上的文档做 Map 操作,得到一些中间结果,叫“键值对”。Map 做名词时,意思是地图,做动词时,意思是映射。所以,我们现在就是要把文档“映射”到一个中间结果去。这个映射特别简单:就是统计一下这个文档里面每个单词出现的次数。比如 单词 China 出现了三次,它就映射成一个中间结果。这个结果中包括两个字段:第一个字段就是这个单词 China。第二个字段是个数字,代表出现的次数,比如 3。其它机器呢,也会做这个 Map,得到 China 2 等等。这样得到的中间结果,叫做“键值对”。其中第一个字段,就是“键”(key),第二个字段,就是“值”(value)。

然后,我们对这些键值对,做 Group by Key,得到一个 Key 的各种值的列表。通过 Map 得到了这些键值对之后,会做一个特别关键的工作。这个工作有点像我们在学习 Pandas 和 SQL 学过的 Group by。Group by 是做“聚合”的,比如说把男生分成一组,女生分成一组。这就是 Group by sex:根据性别对同学进行聚合。在这里呢,我们就根据键值对的 Key,进行聚合。这样我们就得到了一个 Key 的 Value 的列表。比如 China 这个 Key,我们就得到了 [3,2,…] 的一个它们出现的次数的列表。

最后,我们对每个 Key 的值列表,做 Reduce 操作,把它们聚合为一个值。它是怎么聚合的呢?它就像滚雪球似的,把第一个跟第二个加上,得到一个值,然后呢拿着这个值,再跟第三个加上,…,就这样一直往后滚,直到所有数字都加到一起了。比如如果 China 的值列表是2,3,4。我们就先把二和三加起来等于五,然后五加四等于 9。就这么滚雪球似的往后加。这就是 Reduce。最后就得到了一个数。这个数是不是就是 China 在所有的机器上的总数。

上面这种计算模式,就是 MapReduce。它就是在文件所在的机器上,先给文件 Map 成一些键值对,然后 Group by key,最后把一个 Key 的所有 Value,Reduce 成一个你需要的结果。

我们能够用 MapReduce 的方法,来实现很多复杂的算法。上面只是一个简单的加法的例子。实际上,我们在工作中遇到的很多算法都可以用这种模式来实现。比如说咱们前面在Pandas/SQL 里学过的各种数据处理方法,如聚合、过滤、Join、转换。比如我们要选出表中的某一列,那么就可以先在各个服务器上 Map 出这一列,然后 Reduce 的时候再给它合起来。其它一些机器学习算法也可以用这种方式实现,比如聚类、分类。关键是 Key 怎么设计。

批处理和流处理

上面这种计算模式是对已保存好的大量数据进行一次处理,这常被称为“批处理”的模式。批处理模式对实时性要求稍微低一点。Hadoop 就很擅长做批处理。

和批处理相对的,还有一种计算模式:流处理。流处理要处理的数据,就像一条河流似的,会源源不断地来,而我们最关心的是最新到来的数据。比如我们想要弄一个大屏,上面实时显示过去两分钟内卖了多少双鞋。所以数据源就像一个水龙头,不断地有水流过来,然后我们也不断地对最新收到的数据进行立刻处理。Storm 就是专门做流处理的。

最新的系统都即支持批处理,又支持流处理。比如 Spark、Flink。但它们也还是各有侧重。目前 Flink 很火,因为它是针对流处理开发的,对流处理是强项,同时也支持批处理,所以非常值得学习。

Map Reduce 计算的性能特点

这种 Map Reduce 分布式计算的方式,需要对数据进行网络传输。它首先由很多电脑,对本机的数据执行 Map 操作,然后把得到的中间结果,提供网络送到另一个机器上去做 Reduce 操作。

执行分布式计算时,数据在机器间通过网络传输最花时间。因为我们要计算的数据在一个电脑上存不下,要存到十台电脑里,所有我们需要用一个网络交换机把这十台电脑连到一起。这时,在这些电脑之间,通过网络传输中间结果的速度,就比我们访问本地内存或者磁盘的速度,慢多了。如果有一百台电脑的话,那一个交换机的端口数就不够了,不能用一个交换机把它们全部连在一起。这时,我们需要在一级交换机上面,再加一层交换机。这样传输速度就更慢了。如果一千台电脑的话,很可能你这个电脑在这个楼层,另外一个电脑是在下面的楼层,这样距离就更远,传输速度就更慢了。

网络传输的缓慢就带来一个问题:如果一个电脑在计算的时候需要用到很远很远的电脑上的大量数据,那么这些数据的传输就会成为性能的瓶颈。网络尽管很快,但比我们读本地硬盘,还是慢几百倍,而比我们读本地内存,就慢得更多了。所以这时候我们就能明显的感觉到它的计算慢下来。

为此,我们采用“本地计算”的方式。就是每台电脑首先处理本地的数据,得到中间结果,然后只对这些中间结果进行网络传输。这就是本地计算。我们尽量在本地计算。因为本地这个电脑上是有硬盘的。所以 Map 是在本地 Map。它把一些中间结果放到键值对里,然后做 Group by Key,通过网络把键值对送到负责某一个 Key 的 Reduce 机器去。这样,我们就不需要把原始数据送过去,比如一个 G 的数据。那得十几分钟,太慢了。它就送几个中间结果,比如(China,3),这就快多了。

因此,Map Reduce 让一个电脑在本地执行 Map 操作,尽量避免了数据在机器间的传输。你看,在上面的单词计数的例子中,该统计的,我都本地统计完了,最后只不过是把这个结果送到另一台电脑上去加一下。相比于把整个文件传到另一台电脑上去统计,传输这个统计的结果,所传输的数据量就大大地减少了,所消耗的时间也大大减少了。我用不着把这个电脑的文档送到那个电脑去,让那个电脑去算,而是本地算好,减少节点之间的通信。所以它的速度会快一些。

所以,Map Reduce 在计算上是并行的:同时有很多电脑在同时对本地文档进行计算。这些计算是并行的。然后呢,它的速度还提高了,因为它是把中间结果在网络上传输,避免了原始文档这些大的数据在网络上传输。大家现在在电脑上用 U 盘拷贝 1 G 的文件,还要拷个一分钟呢,对吧?如果你要通过一个网络,即使它是千兆的网络,你也要传很久很久。这个数据如果是一个 P 怎么办?是很慢的。通过 Map Reduce 的方法,就能够加速大数据的处理。这就是Map Reduce 的最基本的概念。

Map Reduce 的具体实现

我们下面介绍 Map Reduce 的具体实现。我们还是使用上述单词计数的例子,但是介绍其具体的实现细节。

第一步是 Map。在具体的实现中,Map 的输出是“键值对”(Key-Value Pair)。比如单词计数的 Map,它就把文档里面的一个个单词啊,转变为“键值对”。 Key 是这个单词,值是数字 1。文档里有几个单词,它就输出几个键值对。举例来说,假设这个文档的文本是 “big document” 这两个单词。 Map 操作特别简单,就是见到 big 这个单词,就拿它的文本作 Key,对应的 Value 也特别简单,就是数字 1,所以就输出 <”big”,1>。然后见到 document 这个单词,它也输出一个 <”document”, 1>。所以这个 Map 的逻辑是超简单的。那么,如果这个文档里还有一个 document 的单词,怎么办呢?它还输出一个 <”document”, 1>。为什么不输出 <”document”, 2> 呢?也可以,但这样的话,我们的 Map 操作的逻辑就要复杂一些。我们先按这个简单的来介绍。所以对于文本“big document document”,我们就会得到三个键值对:<”big”, 1>、<”document”, 1>、<”document”, 1>。所以这个 Map 函数让大家写的话,是不是很简单、很好写?就是用一个 for 循环,不断读这段文本里的单词。见一个单词,就往外塞一个键值对,就可以了嘛。这个代码三四行就出来了,很好写。

第二步是 Group by Key。这是实现 Map Reduce 计算的大数据软件,比如 Hadoop、Spark,给大家实现的一个特别重要的工作。它是 Hadoop 或者 Spark 自己做的,不需要我们编程。它的作用是:收集所有 Map 节点输出的各种键值对,将它们按照它们的“键”进行分组,然后把一组键值对,交给一个做 Reduce 的节点。因此,它就会把所有 Map 节点输出的以 “big” 为“键”的键值对,都分成一组,交给一个做 Reduce 操作的机器。把所有以 “document” 为“键”的键值对,都分成一组,交给一个做 Reduce 操作的机器。注意,这是 Map Reduce 平台做的事,不需要我们编程。也就是说,Map Reduce 平台帮我们做了这样一个合并的工作。合并完之后,所有相同键的键值对,都被交给一台机器了。比如这里有一千台 Map 机器。它们生成的所有以 “big” 为键的键值对,就被交给一台做 Reduce 的机器了。

第三步是 Reduce。负责 Reduce 的机器,拿着 Map Reduce 平台交给它的一个“键”的所有“键值对”后,它就会做 Reduce 的操作。为此,我们要写一个 Reduce 函数。这个函数的输入是这个一个“键值对”:“键”是这些键值对的共同的“键”,值是一个数组,里面是所有这些键值对的值。比如,Map 产生 <”document”, 1> 和 <”document”, 1> 这两个键值对,那么 Reduce 的输入是 <”document”, [1,1]>。因此,我们就可以在 Reduce 的函数里,对这个“键值对”里的值,做 Reduce 操作。比如“求和”:就 1 + 1,把它们加起来,就得到 “document” 这个单词出现的总次数了。所以 Reduce 这个函数也特别好写。

通过上面这种计算范式,我们就能够对大数据进行编程。我们编写 Map 和 Reduce 这两个函数。它们会在各个机器上各自运行。这就是分布式运行。然后,Map Reduce 的平台做了很多事情。它负责把 Map 输出的“键值对”收集起来。然后,它其实还做了一些其它事情,比如对“值”的数组,做了排序。这个也可以用来达到我们想要的一些效果,但是我们的这个计数的应用用不着。

总之,Map 的作用是读本地存储的文件,进行本地处理,产生很多键值对。然后平台做 Group by Key 操作,把一个Key 的所有键值对,从所有 Map 机器收集起来,把它们的值合并到一个数组里,还对数组里的值进行来排序,然后交给负责这个 Key 的 Reduce 机器。这个 Reduce 机器就对这个 Key 的所有值的数组,进行计算,得到一个输出。这就是 Hadoop、Spark 这些 Map Reduce 大数据平台实现数据计算的方式。

代码实现

为了实现 Map-Reduce 算法,程序员的任务是:写 Map 和 Reduce 程序。而 Group by Key 是系统完成的,不用我们写程序。

用 Map-Reduce 方式实现各种算法的关键是:根据算法的需求,选择 Key。比如: 两个矩阵相乘,我们就要把“行”做 Key,以便一行和一列的乘积最后能被 Group 到一起,加起来。类似的,关系代数的投影、选择、Join,也应该设置特定的 Key。比如 Join,就要用把两个表 Join 起来的字段,做为 Key。

开销模型

上述计算中最主要的开销是 Reduce 机器通过网络去取远端 Map 机器的 Map 结果(存在Map的机器上)的网络通信开销。其次是 Map 读本机 Disk 的开销。最后才是 计算开销很小。相对于网络通讯和磁盘读取开销,计算开销是很小的。

所以,要优化 MapReduce 的开销,最主要的是减少通信开销。一个优秀的大数据工程师会根据数据和自己任务的特点,合理规划自己数据的放置位置、分区、副本位置,降低任务开销,降低任务的完成时间。

小结

我们然后介绍 MapReduce 分布式计算模型。它能够实现大数据的分布式处理。我们介绍它的原理和使用。利用 MapReduce,我们可以在计算机集群上,进行并行处理。其中,Map 会利用本机的CPU,对本地数据做Map操作,输出“键值对”,然后大数据计算框架(如 Hadoop,Spark)会对这些“键值对”做 “Group by Key”,即把相同 Key 的键值对,分成一组,输出:键:值List。然后,一组的输出会被交给一个 Reduce 机器。这个机器就会在“键:值List”上做 Reduce操作。我们将介绍一个大文档的单词计数的例子。其中,Map 会简单地输出 “单词:1”,而 Reduce 会把“单词”的 Value 列表中的“1”累加起来。


Index Previous Next