Categories
程式開發

MapReduce簡介及過程詳解


MapReduce是面向大數據並行處理的計算模型、框架和平台,對於大數據開發或者想要接觸大數據開發的開發者來說,是必須要掌握的,它是一種經典大數據計算框架,現在有很多開源項目的內部實現都會直接或間接地借鑒了MR過程的實現。 Hadoop中的MapReduce 是一個離線批處理計算框架。

1)MapReduce是一個基於集群的高性能並行計算平台(Cluster Infrastructure)。它允許用市場上普通的商用服務器構成一個包含數十、數百至數千個節點的分佈和並行計算集群。

2)MapReduce是一個並行計算與運行軟件框架(Software Framework)。它提供了一個龐大但設計精良的並行計算軟件框架,能自動完成計算任務的並行化處理,自動劃分計算數據和計算任務,在集群節點上自動分配和執行任務以及收集計算結果,將數據分佈存儲、數據通信、容錯處理等並行計算涉及到的很多系統底層的複雜細節交由系統負責處理,大大減少了軟件開發人員的負擔。

3)MapReduce是一個並行程序設計模型與方法(Programming Model & Methodology)。它藉助於函數式程序設計語言Lisp的設計思想,提供了一種簡便的並行程序設計方法,用Map和Reduce兩個函數編程實現基本的並行計算任務,提供了抽象的操作和並行編程接口,以簡單方便地完成大規模數據的編程和計算處理。

1 實現流程

MapReduce簡介及過程詳解 1

上圖所示為實現MapReduce操作的總體流程:

1) 用戶程序首先調用的MapReduce庫將輸入文件分成M個數據片段,每個數據片段的大小一般從16MB到64MB(可以通過可選的參數來控制每個數據片段的大小)。然後用戶程序在集群中創建大量的程序副本。

2) 這些程序副本中有一個特殊的master程序副本,其它的都是worker程序,由master分配任務。有M個Map任務和R個Reduce任務將被分配,master將分配空間的worker來執行每一個Map任務或Reduce任務。

3) 被分配了map任務的worker程序(Map worker)讀取相關的輸入數據片段,從輸入的數據片段中解析出鍵值對,然後把鍵值對傳遞給用戶自定義的Map函數,生成並輸出的中間鍵值對,並緩存在內存中。

4) 緩存中的鍵值對通過分區函數分成R個區域,之後周期性的寫入到本地磁盤上。緩存的鍵值對在本地磁盤上的存儲位置將被回傳給master,由master負責把這些存儲位置再傳送給Reduce worker。

5) 當Reduce worker程序被通知到由master程序發來的數據存儲位置信息後,使用RPC從Map worker所在主機的磁盤上讀取這些緩存數據。當Reduce worker讀取了所有的中間數據後,通過對key進行排序後使得具有相同key值的數據聚合在一起。由於許多不同的key值會映射到相同的Reduce任務上,因此必須進行排序。如果中間數據太大無法在內存中完成排序,那麼就要在外部進行排序。

6) Reduce worker程序遍歷排序後的中間數據,對於每一個唯一的中間key值,Reduce worker程序將這個key值和它相關的中間value值的集合傳遞給用戶自定義的Reduce函數。 Reduce函數的輸出被追加到所屬分區的輸出文件。

當所有的Map和Reduce任務都完成之後,master喚醒用戶程序。在這個時候,在用戶程序裡的對MapReduce調用才返回。

2 階段劃分

2.1 提交客戶作業

作業提交通過網絡完成,框架將輸入數據集劃分塊,即輸入分片(input split),以便並行處理。

2.2 執行Map任務

每一個輸入分片都會對應一個map任務,該任務對分片中的每一條記錄運行用戶提供的map函數。格式化輸入負責將每一條記錄轉換成對應的鍵值對錶示法(例如:有一種內建的輸入格式,它能將文件的每一行轉換成值並以文件偏移值為鍵),map函數用鍵值對來作為輸入並產生零個或多個中間鍵值對。

在map任務處理完之後,首先會將這個結果寫入到內存緩存區中(緩衝區的作用就是批量收集Map結果,減少磁盤I/O影響),也就是在內存中會劃分出一片區域來存儲map任務處理完的數據結果,內存緩衝區的大小是有限的,默認是100MB。當寫入的數據所佔大小占整個環形緩衝區的80%之後,就開始把緩衝區中的數據寫入到本地磁盤文件中,即溢寫。在寫出的過程中,map的處理結果仍然會向緩衝區中寫入,這個過程可以看做是一個環狀,所以稱之為環形緩衝區,其實在內存中就是一個數組。

寫出數據之前,每一個鍵會被一個稱作為分區器(partitioner)的組件分配到一個分區(partition)。默認實現是一個哈希分區器:將鍵的哈希值與作業所配置的reduce任務數目進行取模從而得到一個分區號。然後在分區內按照鍵進行排序,排好序之後再將內存中的數據寫入到文件中。因為在內存中已經對分區內的數據進行排序,所以寫出到文件中的數據在分區內是有序的,如果map任務產生的結果比較大,就會不斷地將緩衝區中的數據溢寫到文件中,根據數據量的大小就會產生多個文件,這些文件當中的數據是分區的,且分區內是有序的。

當map任務運行完成後,會將所有輸出的這些小文件合併成一個大的文件,在合併文件的過程中還是會對分區內的文件進行排序。

注意:在執行map任務時,combine操作是可選的,它是一個本地化的reduce操作,是map運算的後續操作,主要是在map計算出中間文件前做一個簡單的合併重複key值的操作,目的是為了減少網絡傳輸數據,提高帶寬的傳輸效率。

2.3 shuffle和排序

MapReduce簡介及過程詳解 2

在MapReduce過程中需要各節點上的同一類數據匯集到某一節點進行計算,把這些分佈在不同節點的數據按照一定的規則聚集到一起的過程稱為Shuffle。 (也有說shuffle是從map輸出結果開始的,按個人理解來)當reduce任務開始的時候,reduce任務就會通過RPC請求map任務所在的節點獲取屬於它的輸出文件。當把所有的數據都拷貝過來後,就會將這些數據合併,map的輸出數據是有序的,因此在reduce端進行合併是採用歸併排序,合併之後就會將相同的鍵的數據分到一組,不同的鍵之間進行排序,最終會形成一個鍵對應一組值這樣一組數據。

2.4 執行reduce任務

當分區數據被合併成一個完整的有序列表後,用戶的reduce代碼就開始被執行。每一個reduce任務都會產生一個單獨的輸出文件,通常存儲在HDFS中。獨立的輸出文件使得reduce任務之間無需協調共享文件的訪問,大大減少了reduce的複雜性並能讓每一個reduce任務運行效率最大化。輸出文件的格式取決於Output format參數,參數在MapReduce的用戶在作業配置中指定。

3 應用場景

1、進行數據統計,比如計算大型網站的瀏覽量;

2、搜素引擎中索引的創建Google最早使用MapReduce就是對每天爬取的幾十億上百億的網頁創建索引,從而產生的MapReduce框架);

3、海量數據中查找出具有某些特徵的數據;

4、複雜數據分析算法的實現(比如聚類,分類算法,推薦算法。這些算法需要大量的訓練數據來訓練算法模型,一些算法集成工具使用MapReduce使程序跑在分佈式系統環境中)。

注意:

通過前面的介紹,我們可以發現MapReduce更適合處理海量數據的分佈式批量離線處理,所以也就限制了它的使用。下面的場景不適合使用MapReduce:

1. 低延遲實時計算,這種場景通常需要秒級甚至毫秒級返回計算結果。 MapReduce式分佈式作業的,需要分發執行任務的程序包並且在計算過程中還要在map階段拆分數據,向Reduce傳輸數據,所以不能在極短時間內返回計算結果。

2. 流式計算,MapReduce自身框架設計決定了它處理輸入的數據必須是靜態的數據,比如說它處理存儲在HDFS上的數據,HDFS上的數據是不能實時變化的。