June 23, 2021
Here is the article.
I like to read Chinese first, and then go back to read English version paper again.
首先简单介绍下MapReduce,MapReduce实际上是一种编程模型,主要是用于处理大规模数据集,其实现核心逻辑实际上是跟分治方法是统一的。在这个编程模型下,用户只需要关心两个函数,一个是map函数,用于处理一个键值对,然后生成一个中间键值对的数据集合。另一个是reduce函数,是用来将map产生的中间键值对数据集根据相同中间键来进行合并操作。这种编程模型自然而然的是可以通过在一个集群上进行并行的处理。整个系统需要做的是划分输入数据、调度作业任务与机器、处理机器故障以及管理机器间的通信等等,但是这些问题并不是用户需要操心的,当一个完备的集群方案确定后,用户完全不用去了解并行分布式系统的操作流程,便可以进行最大化利用分布式集群的功能。
一、编程模型
整个计算需要一个键值对数据集输入,同时输出的也是一个键值对数据集。前面也提到,用户在使用MapReduce的时候只需要定义map和reduce函数。map函数会处理输入键值对然后产生一个中间键值对数据集,然后这个编程模型会将所有中间键相同的数据组合起来,并将它们传递给reduce函数。而reduce函数则是接受某个中间键和该键的一个数据集合,他会根据需求去合并这些数据,从而产生一个更小的数据集,通常情况下reduce函数产生0或者1个输出。中间数据是通过迭代的方式来提供给reduce函数的,这样就可以处理内存无法满足的大量数据。
1.1 例程
考虑一个问题,那就是统计大批量文档中每个词语的出现的次数。用户可以按照下面的伪代码完成MapReduce过程:
这个map函数会将每个word都填上一个1的词频数发射出去,然后reduce函数将同一个word发射出来的数目加起来。对的,事情就是这么简单!!!
1.2 更多例子
这部分主要是介绍MapReduce可以应用到很多有意思的项目去,比如:
分布式查找:这里面的map函数是当某一行满足提供的匹配,就把改行发射出去,然后reduce函数是一个identity函数,仅仅是将提供的中间数据拷贝到输出。
URL统计:这里面的map函数处理网页请求的日志信息,然后输出(url, 1),然后reduce函数为同样的url的值累加起来,输出(url, total_count)。
网络连接图逆向:这里面的map函数为每一个source网页中的一个target链接输出一个(source, target)对,然后reduce函数对每一个给定的目标url,建立一个list,并输出(target, list(source))。
二、实现
2.1 执行流程
这部分主要讲的是google他们用的一个计算集群,具体的集群机器情况,可以看原文介绍,总结就是普通机器,普通内存,普通网络,普通存储,哈哈,不然怎么显示出威力呢。下图是文中介绍的mapreduce执行流程图,这个就比较重要了:
大体上是这样的,M调用会通过自动将输入数据划分成M份的数据集后在多个机器上分布,这些被切分的小份数据会在不同的机器上并行地被处理。而reduce函数则是根据一个parition函数根据中间键将中间数据分成R份。至于分成数量R和partition函数都是由用户自定义。上图就是一个mapreduce操作的执行的整体流程,当一个mapreduce被执行,需要经过以下步骤:
1)用户程序中的MapReduce库首先将输入文件分成M份,每份大小一般是16到64MB,这个可以由用户可选参数控制,然后便会将程序在集群的机器上拷贝。
2)在程序中,有一个程序比较特别,那就是master程序,其他的程序都是worker程序,worker程序用来接收master分配的任务。根据上面分配可知,一共有M个Map任务和R个reduce任务等待分配,master会选择空闲的worker,然后分配map或者reduce任务给他们。
3)当一个worker接收到一个map任务时,便会读取对应的输入。他会从输入文件中解析出一组键值对,并将每一个键值对依次传给用户自定义的map函数,而map函数输出的中间键值对会被保存到内存中去。
4)周期性地,这些被缓存在内存中的键值对数据会由前面提到parition函数分成R份后写入到本地文件中去。这些被存到本地的键值对数据的地址会被回传给master,之后再由master传递给reduce任务的worker。
5)当一个reduce任务的worker被master告知了这些地址,它便会使用远程程序调用方式从那些map任务的worker机器本地磁盘上读取数据。当一个reduce任务的worker读完了所有的中间数据后,他会根据中间键排序,从而使得具有相同的中间键能够被分到一起去。这个排序过程是需要的,因为有时候可能不同的中间键值对数据会被映射到同一个reduce任务的worker。如果中间键值对数据过大很难在内存张排序,外部排序会被使用。
6)reduce的worker会被已排完序的数据中迭代,对每一个唯一的中间key,worker会将这个key和对应的数据集合传递给用户定义的reduce函数。reduce函数的输出会被添加到最终的一个输出文件中去。
7)当所有的map和reduce任务都完成后,master会唤醒用户程序。此时,执行mapreduce的用户程序会得到一个返回结果。
当成功完成后,mapreduce执行的结果输出会存在R个输出文件中,一般情况下,用户并不需要将这R个输出文件合并成一个文件,这些文件经常会被作为输入文件进行另外一个mapreduce过程,或者其他的分布式程序再次被切分成多个文件。
2.2 master数据结构
从刚才的流程看,master在整个流程中起到了至关重要的作用,不仅需要了解任务的情况,还需要了解worker的情况,以及输入输出的情况,这时候高效的数据结构也就显得很重要了。典型情况下,master会保存若干个数据结果,对每一个map任务和reduce任务,它会保存他们的状态(空闲,进行中,已完成)以及对应的worker机器。master作为map任务中间输出到reduce任务的管道,因此对每一个已完成的map任务,master会保存R份中间数据文件的大小和位置,当map任务不断完成时不断更新。这些信息会不停地添加到执行reduce的worker机器上去。
2.3 容错
其实仔细看前面的流程,我们就会有很多很多的疑问,比如说万一master机器挂了呢,岂不是所有任务都跑不了了?比如说某个机器被分配了任务,然后任务出错了,其实不是任务永远都完成不了了?等等,当然不会!!不过说到这里,正好说明下一件事情,其实MapReduce编程的逻辑没什么可以多说的,就是一个简单的分而治之的思想,但是对于分布式系统来说,最难的根本不是这方面,而是对于多机器情况下,任务分配、资源管理、容错控制、一致性保证,这些才是最最关键的,也是最难的,如果真正做到了补充不漏不丢不冲突的分布式系统,那绝对是一个特别特别牛逼的工作。
这个系统也一样,既然是在成百上千台机器上进行的运算,维护的难度可想而知,因此容错是必须的,但是必须是优雅的容错,什么意思呢,如果出现了问题,一个报警告诉管理员,然后程序员坑次坑次跑去重启任务,重启调度这种就显得很low了,文章说了几个容错的方式,一起了解下:
1)worker失败。master会周期性的ping每个机器,如果一个worker在一定时间内没有回应,master会将该机器标记为failed。这时候,这个机器上的所有已完成map任务都会被标记为空闲状态,这样那些map任务就会被重新调度给其他worker去处理。同样,任何在一个失败机器正在执行的map或者reduce任务也会被重置为空闲状态等待重新调度。可能会有疑问,为神马map任务都完成了还需要重新执行,这是因为已完成map任务的输出是存储在失败机器的本地磁盘上,这时候这些数据在机器有问题以后是不能被访问的;而已完成的reduce则不需要重新执行是因为这些输出已经存到了最终输出文件里面去了。当一个map任务在A机器上执行失败后,在B机器上会重新执行,这时候,master会通知所有正在执行reduce任务的worker这种变更,这样还没有从A机器获取数据的reduce任务会从B机器上获取。
2)master失败。对于master的可能性失败,一种比较简单的方法是周期性的将前述master的数据结构写到磁盘里面去,这样万一master挂了,另一个master会从最后写入的数据中恢复出来。
3)失败出现的语义描述。当用户自定义的map和reduce函数是确定性函数的时候,我们分布式实现产生的结果肯定是与无错误情况下整个程序顺序执行的结果一致。在实际过程中,我们依赖map和reduce任务输出的原子提交来实现这一点。每一个进行中的任务,都会将自己的输出写到一个私有的临时文件中去。在mapreduce操作中,一个reduce任务会写一个这样的文件,一个map任务会写R个这样的文件。当一个map任务完成后,这个worker会给master发送消息,并告知这R个临时文件的名字。如果这时候master收到早已完成map任务的消息,即之前已完成过,会忽略这个消息,否则,他会将这R个文件名存到一个数据结构中去。当一个reduce任务完成后,reduce worker会原子性的将临时文件改成最终输出文件。如果同一个reduce任务在多个机器上执行,多次重命名调用会被执行在同一个输出文件上。我们可以依赖文件系统本身提供的原子重命名操作来保证最终的文件系统只包含一个reduce任务的最终输出。
大多数情况下,我们的map和reduce操作都是确定性的。另外,实际上我们的语法实际上与顺序执行时一致的,这样程序员就很容易去推理他们程序的行为。当map或者reduce操作不是确定性的时候,我们同样提供了差一点但是同样合理的语义方法。在非确定性操作出现的时候,一个特定的reduce任务R1的输出会和另一个reduce任务R1的输出一样(因为执行程序不确定性)。但是,一个不同的reduce任务R2则会跟另一个reduce任务R2不一样了就。
2.4 备份任务
一个比较普遍影响MapReduce操作总体时间的原因是因为有一个"掉队者"出现了:有一个机器在处理剩余的一个map或者reduce任务消耗的时间超过正常范围。这种现象产生的原因可能很多,比如网络,比如磁盘IO,比如有新的其他的任务插进来了,等等。这时候,有一个比较简单的机制来处理这个问题,那就是多一个mapreduce操作快接近结束时,master会给这些剩余正在进行中的任务建立备份任务,这样无论是原先的任务完成还是备份的任务完成都会被标记成任务完成状态。这种机制可能会多侵占些资源,但是效果很好。
三、优化
尽管简单的map和reduce函数已经很好地满足了大部分需求,但是还是有一些优化可以极大的提高效率。
3.1 分类函数
MapReduce的用户指定了他们希望得到的reduce任务输出的个数是R个,数据会通过在中间key上执行分类函数进行划分。一般情况下默认的分类函数是哈希函数,这一函数会得到一个相对比较平衡的划分结果。但是,在某些情况下,对key使用自定义的分类函数来划分数据更加有效。比如说,有时候中间key是url,我们想将连接同一个host的数据放到同一个文件中去,这时候如果我们不是对url进行哈希,而是先提取url中的主机数据,再对主机数据进行hash,这时候才能满足更好的满足我们的需求。
3.2 顺序保证
我们保证在一个给定的分类中,中间键值对数据是按照中间key进行升序排列的。这个顺序保证使得对每一个分类输出一个有序结果非常简单,这对于要求输出结果方便查找等情况非常有用。
3.3 连接函数
在某些情况下,每个map任务产生的中间key数据有大量重复,这时候reduce函数就需要进行相互交互和聚合操作。一个很简单的例子就是词频统计,由于每个词频遵循Zipf分布,每个map任务都会产生成百上千个<the, 1>这种记录。这些记录都会通过网络传递给同一个reduce任务,然后再在reduce函数中进行统计得出结果。这时候,我们就允许用户自定义一个combiner函数,这个函数会在数据传递给reduce任务之前进行局部的合并操作,节省了大量的网络传输。这个combiner函数是在每一个map任务机器上执行的,通常情况下,这个函数与reduce函数是同样的代码。他们两之间的区别在于输出是最终输出文件还是中间数据文件。
其实文中还介绍了其他比如说类型优化,跳跃某些有问题的记录的优化,状态信息统计优化,以及本地执行调试的优化,这些大家都可以自己从文章里面看,只是因为他们的重要性不是那么强,就不在这里详述了。同时文中最后部分介绍了这个模型的功能强大,也给出了很多实际使用的例子,实际上可以了解一点,也再次说明一点,大规模分布式系统的逻辑实际上并不是那么复杂,真正复杂之处在于性能的优化,而优化点设计范围很广,技术要求也就很高了,实际上主要是可以分成几个方面:1)资源管理,磁盘的管理,网络的管理,内存的管理,任务的管理,如果在最大化使用硬件本身的同时最大化使用结果;2)容错的管理,分布式的最佳实践是用户感知不到是分布式工作,这时候就需要分布式系统能够保证数据补充不漏不丢,容错容灾能力是考研分布式系统稳定性很重要的一环;3)数据的一致性,其实mapreduce的一致性要求还不是那么严格,而且前面提到了许多操作是原子性操作的要求,这个要求实际上就是保持数据一致的,真正数据一致性的严格可能还是在文件系统,分布式数据库等等方面,这是一个艰难的问题,同时也是效率与准确之间的取舍。
No comments:
Post a Comment