首頁(yè) > 新聞 > 專家觀點(diǎn) >

Spark技術(shù)解析及在百度開放云BMR應(yīng)用實(shí)踐

2015-01-14 16:45:33   作者:   來(lái)源:CSDN   評(píng)論:0  點(diǎn)擊:


  2014年,Spark開源生態(tài)系統(tǒng)得到了大幅增長(zhǎng),已成為大數(shù)據(jù)領(lǐng)域最人氣的開源項(xiàng)目之一,活躍在Hortonworks、IBM、Cloudera、MapR和Pivotal等眾多知名大數(shù)據(jù)公司,更擁有Spark SQL、Spark Streaming、MLlib、GraphX等多個(gè)相關(guān)項(xiàng)目。同時(shí)值得一提的是,Spark貢獻(xiàn)者中有一半左右的中國(guó)人。

  短短四年時(shí)間,Spark不僅發(fā)展為Apache基金會(huì)的頂級(jí)開源項(xiàng)目,更通過其高性能內(nèi)存計(jì)算及其豐富的生態(tài)快速贏得幾乎所有大數(shù)據(jù)處理用戶。2015年1月10日,一場(chǎng)基于Spark的高性能應(yīng)用實(shí)踐盛宴由Databricks軟件工程師連城、百度高級(jí)工程師甄鵬、百度架構(gòu)師孫垚光、百度美國(guó)研發(fā)中心高級(jí)架構(gòu)師劉少山四位專家聯(lián)手打造。

  Databricks軟件工程師連城——Spark SQL 1.2的提升和新特性

  談及Spark SQL 1.2的提升和新特性,連城主要總結(jié)了4個(gè)方面——External data source API(外部數(shù)據(jù)源API)、列式內(nèi)存存儲(chǔ)加強(qiáng)(Enhanced in-memory columnar storage)、Parquet支持加強(qiáng)(Enhanced Parquet support)和Hive支持加強(qiáng)(Enhanced Hive support)。

  External data source API

  連城表示,因?yàn)樵谔幚砗芏嗤獠繑?shù)據(jù)源中出現(xiàn)的擴(kuò)展問題,Spark在1.2版本發(fā)布了External data source API。通過External data source API,Spark將不同的外部數(shù)據(jù)源抽象成一個(gè)關(guān)系表格,從而實(shí)現(xiàn)更貼近無(wú)縫的操作。

  External data source API在支持了多種如JSON、Avro、CSV等簡(jiǎn)單格式的同時(shí),還實(shí)現(xiàn)了Parquet、ORC等的智能支持;同時(shí),通過這個(gè)API,開發(fā)者還可以使用JDBC將HBase這樣的外部系統(tǒng)對(duì)接到Spark中。

  連城表示,在1.2版本之前,開發(fā)者其實(shí)已經(jīng)實(shí)現(xiàn)了各種各樣外部數(shù)據(jù)源的支持,因此,對(duì)比更原生的支持一些外部數(shù)據(jù)源,External data source API的意義更在于針對(duì)相應(yīng)數(shù)據(jù)源進(jìn)行的特殊優(yōu)化,主要包括Column pruning(列剪枝)和Pushing predicates to datasources(將predicates貼近數(shù)據(jù)源)兩個(gè)方面:

  Column pruning。主要包括縱橫的兩種剪枝。在列剪枝中,Column pruning可以完全忽視無(wú)需處理的字段,從而顯著地減少IO。同時(shí),在某些條件查詢中,基于Parquet、ORC等智能格式寫入時(shí)記錄的統(tǒng)計(jì)信息(比如最大值、最小值等),掃描可以跳過大段的數(shù)據(jù),從而省略了大量的磁盤掃描負(fù)載。

  Pushing predicates to datasources。在更復(fù)雜的SQL查詢中,讓過濾條件維度盡可能的接近數(shù)據(jù)源,從而減少磁盤和網(wǎng)絡(luò)IO,最終提高整體端到端的性能。

  使用External data source API之前

  使用External data source API之后

  搭載了如Parquet和ORC這樣的智能格式

  連城表示,在Spark 1.2版本中,External data source API并沒有實(shí)現(xiàn)預(yù)期中的功能,在Roadmap中,F(xiàn)irst class分片支持(First class partitioning support with partition pruning)、Data sink(insertion)API、將Hive作為外部數(shù)據(jù)源等。

  Enhanced in-memory columnar storage

  連城表示,不管Shark,還是Spark,內(nèi)存緩存表的支持都是非常重要的一個(gè)特性。他表示,雖然在1.1和之前版本中的列式內(nèi)存表的性能已然不錯(cuò),但是還會(huì)出現(xiàn)一些問題:第一,大數(shù)據(jù)量下緩存超大體積表時(shí)(雖然不推薦,但不缺現(xiàn)實(shí)用例),會(huì)出現(xiàn)OOM等問題;第二,在列式存儲(chǔ)中,像Parquet、ORC這種收集統(tǒng)計(jì)信息然后通過這些信息做partition skipping等操作在之前版本中并沒有完全實(shí)現(xiàn)。這些問題在1.2版本中都得到了解決,本節(jié),連城主要介紹了語(yǔ)義統(tǒng)一、緩存實(shí)體化、基于緩存共享的查詢計(jì)劃、Cache大表時(shí)的OOM問題、表格統(tǒng)計(jì)(Table statistics)等方面。

  緩存實(shí)體化。SQLContext.cacheTable(“tbl”)默認(rèn)使用eager模式,緩存實(shí)體化將自動(dòng)進(jìn)行,不會(huì)再等到表被使用或觸發(fā)時(shí),避免手動(dòng)做“SELECT COUNT(*) FROM src;”。同時(shí),新增了“CACHE [LAZY] TABLE tbl [AS SELECT …]”這樣的DML。

  語(yǔ)義統(tǒng)一。早期時(shí)候,SchemaRDD.cache()和SQLContext.cacheTable(“tbl”)這兩個(gè)語(yǔ)義是不同的。其中,SQLContext.cacheTable會(huì)去建立一些列式存儲(chǔ)格式相關(guān)優(yōu)化,而SchemaRDD.cache()卻以一行一個(gè)對(duì)象的模式進(jìn)行。在1.2版本中,這兩個(gè)操作已被統(tǒng)一,同時(shí)各種cache操作都將得到一個(gè)統(tǒng)一的內(nèi)存表。

  基于緩存共享的查詢計(jì)劃。兩個(gè)得到相同結(jié)果的cache語(yǔ)句將共享同一份緩存數(shù)據(jù)。

  避免Cache大表時(shí)的OOM問題。優(yōu)化內(nèi)存表的建立和訪問,減少開銷,進(jìn)一步提升性能;在緩存大表時(shí),引入batched column buffer builder,將每一列切成多個(gè)batch,從而避免了OOM。

  表格統(tǒng)計(jì)。Table statistics,類似Parquet、ORC使用的技術(shù),在1.2版本中主要實(shí)現(xiàn)了Predicate pushdown(實(shí)現(xiàn)更快的表格掃描)和Auto broadcast join(實(shí)現(xiàn)更快的表格join)。

  最后,連城還詳細(xì)介紹了一些關(guān)于加強(qiáng)Parquet和Hive支持的實(shí)現(xiàn),以及Spark未來(lái)的一些工作。

  百度基礎(chǔ)架構(gòu)部高級(jí)工程師甄鵬——Spark在百度開放云BMR中的實(shí)戰(zhàn)分享

  百度分布式計(jì)算團(tuán)隊(duì)從2011年開始持續(xù)關(guān)注Spark,并于2014年將Spark正式引入百度分布式計(jì)算生態(tài)系統(tǒng)中,在國(guó)內(nèi)率先面向開發(fā)者及企業(yè)用戶推出了支持Spark并兼容開源接口的大數(shù)據(jù)處理產(chǎn)品BMR(Baidu MapReduce)。在甄鵬的分享中,我們主要了解了百度Spark 應(yīng)用現(xiàn)狀、百度開放云BMR和Spark On BMR三個(gè)方面的內(nèi)容。

  Spark在百度

  甄鵬表示,當(dāng)前百度的Spark集群由上千臺(tái)物理主機(jī)(數(shù)萬(wàn)Cores,上百TBMemory)組成,日提交App在數(shù)百,已應(yīng)用于鳳巢、大搜索、直達(dá)號(hào)、百度大數(shù)據(jù)等業(yè)務(wù)。之以選擇Spark,甄鵬總結(jié)了三個(gè)原因:快速高效、API 友好易用和組件豐富。

  快速高效。首先,Spark使用了線程池模式,任務(wù)調(diào)度效率很高;其次,Spark可以最大限度地利用內(nèi)存,多輪迭代任務(wù)執(zhí)行效率高。

  API友好易用。這主要基于兩個(gè)方面:第一,Spark支持多門編程語(yǔ)言,可以滿足不同語(yǔ)言背景的人使用;第二,Spark的表達(dá)能力非常豐富,并且封裝了大量常用操作。

  組件豐富。Spark生態(tài)圈當(dāng)下已比較完善,在官方組件涵蓋SQL、圖計(jì)算、機(jī)器學(xué)習(xí)和實(shí)時(shí)計(jì)算的同時(shí),還有著很多第三方開發(fā)的優(yōu)秀組件,足以應(yīng)對(duì)日常的數(shù)據(jù)處理需求。

  百度開放云BMR

  在BMR介紹中,甄鵬表示,雖然BMR被稱為Baidu MapReduce,但是這個(gè)名稱已經(jīng)不能完全表示出這個(gè)平臺(tái):BMR是百度開放云的數(shù)據(jù)分析服務(wù)產(chǎn)品,基于百度多年大數(shù)據(jù)處理分析經(jīng)驗(yàn),面向企業(yè)和開發(fā)者提供按需部署的Hadoop&Spark集群計(jì)算服務(wù),讓客戶具備海量數(shù)據(jù)分析和挖掘能力,從而提升業(yè)務(wù)競(jìng)爭(zhēng)力。

  如圖所示,BMR基于BCC(百度云服務(wù)器),建立在HDFS和BOS(百度對(duì)象存儲(chǔ))分布式存儲(chǔ)之上,其處理引擎包含了MapReduce和Spark,同時(shí)還使用了HBase數(shù)據(jù)庫(kù)。在此之上,系統(tǒng)集成了Pig、Hive、SQL、Streaming、GraphX、MLLib等專有服務(wù)。在系統(tǒng)的最上層,BMR提供了一個(gè)基于Web的控制臺(tái),以及一個(gè)API形式的SDK。

  在圖片的最右邊,Scheduler在BMR中起到了管理作用,使用它開發(fā)者可以編寫比較復(fù)雜的作業(yè)流。

  Spark On BMR

  類似于通常的云服務(wù),BMR中的Spark同樣隨用隨起,集群空閑即銷毀,幫助用戶節(jié)省預(yù)算。此外,集群創(chuàng)建可以在3到5分鐘內(nèi)完成,包含了完整的Spark+HDFS+YARN堆棧。同時(shí),BMR也提供Long Running模式,并有多種套餐可選。

  完善的報(bào)表服務(wù),全方位監(jiān)控

  在安全上,用戶擁有虛擬的獨(dú)立網(wǎng)絡(luò),在同一用戶全部集群可互聯(lián)的同時(shí),BMR用戶間網(wǎng)絡(luò)被完全隔離。同時(shí),BMR還支持動(dòng)態(tài)擴(kuò)容,節(jié)點(diǎn)規(guī)?蓮椥陨炜s。除此之外,在實(shí)現(xiàn)Spark全組件支持的同時(shí),BMR可無(wú)縫對(duì)接百度的對(duì)象存儲(chǔ)BOS服務(wù),借力百度多年的存儲(chǔ)研發(fā)經(jīng)驗(yàn),保證數(shù)據(jù)存儲(chǔ)的高可靠性。

  百度基礎(chǔ)架構(gòu)部架構(gòu)師孫垚光——百度高性能通用Shuffle服務(wù)

  在2014 Sort Benchmark國(guó)際大賽上,百度成功奪冠,其幕后英雄無(wú)疑卓越的Shuffle機(jī)制,在孫垚光的分享中,我們對(duì)Shuffle的發(fā)展、細(xì)節(jié)和未來(lái)有了一次深度的接觸。

  Shuffle簡(jiǎn)介

  孫垚光表示,簡(jiǎn)單來(lái)說,Shuffle就是按照一定的分組和規(guī)則Map一個(gè)數(shù)據(jù),然后傳入Reduce端。不管對(duì)于MapReduce還是Spark,Shuffle都是一個(gè)非常重要的階段。然而,雖然Shuffle解決的問題相同,但是在Spark和MapReduce中,Shuffle流程(具體時(shí)間和細(xì)節(jié))仍然存在一定的差別:

  Baidu Shuffle發(fā)展歷程

  通過孫垚光了解到,Shuffle在百度的發(fā)展主要包括兩個(gè)階段:跟隨社區(qū)和獨(dú)立發(fā)展。從2008年百度的MapReduce/Hadoop起步開始,百度就開始跟隨社區(qū),使用社區(qū)版本,期間的主要工作包含Bug修復(fù)和性能優(yōu)化兩個(gè)方面(增加內(nèi)存池、減少JVMGC,傳輸Server由Jetty換Netty,及批量傳輸、聚合數(shù)據(jù)等方面)。

  分離了shuffle和Map/Reduce

  在2012年開始,Baidu Shuffle開啟獨(dú)立發(fā)展階段,主要源于下一代離線計(jì)算系統(tǒng)的開發(fā),Shuffle被抽離為獨(dú)立的ShuffleService服務(wù),從而提高了集群資源的利用率。

  截止此時(shí),不管是社區(qū)版本(MapReduce/Spark),還是百度研發(fā)的ShuffleService,它們都是基于磁盤的PULL模式;诖疟P,所有Map的數(shù)據(jù)都會(huì)放到磁盤,雖然Spark號(hào)稱內(nèi)存計(jì)算,但是涉及到Shuffle時(shí)還是會(huì)寫磁盤;赑ULL,所有數(shù)據(jù)在放到Map端的磁盤之后,Reduce在使用時(shí)還需要主動(dòng)的拉出來(lái),因此會(huì)受到兩個(gè)問題影響:首先,業(yè)務(wù)數(shù)據(jù)存儲(chǔ)在Map端的服務(wù)器上,機(jī)器宕機(jī)時(shí)會(huì)不可避免丟失數(shù)據(jù),這一點(diǎn)在大規(guī)模分布式集群中非常致命;其次,更重要的是,Shuffle階段會(huì)產(chǎn)生大量的磁盤尋道(隨機(jī)讀)和數(shù)據(jù)重算(中間數(shù)據(jù)存在本地磁盤),舉個(gè)例子,某任務(wù)有1百萬(wàn)個(gè)Map,1萬(wàn)個(gè)Reduce,如果一次磁盤尋道的時(shí)間是10毫秒,那么集群總共的磁盤尋道時(shí)間= 1000000 ×10000 ×0.01 = 1億秒。

  New Shuffle

  基于這些問題,百度設(shè)計(jì)了基于內(nèi)存的PUSH模式。新模式下,Map輸出的數(shù)據(jù)將不落磁盤,并在內(nèi)存中及時(shí)地Push給遠(yuǎn)端的Shuffle模塊,從而將獲得以下提升:

  New Shuffle的優(yōu)勢(shì)

  New Shuffle架構(gòu)

  如圖所示,藍(lán)色部分為New Shuffle部分,主要包含兩個(gè)部分:數(shù)據(jù)寫入和讀取的API,Map端會(huì)使用這個(gè)接口來(lái)讀取數(shù)據(jù),Reduce會(huì)使用這個(gè)接口來(lái)讀取數(shù)據(jù);其次,最終重要的是,服務(wù)器端使用了典型的主從架構(gòu),用多個(gè)shuffle工作者節(jié)點(diǎn)來(lái)shuffle數(shù)據(jù)。同時(shí),在系統(tǒng)設(shè)計(jì)中,Master非常有利于橫向擴(kuò)展,讓shuffle不會(huì)成為整個(gè)分布式系統(tǒng)的瓶頸。

  讓New Shuffle模塊專注于shuffle,不依賴于外部計(jì)算模塊,從而計(jì)算模塊可以專注于計(jì)算,同時(shí)還避免了磁盤IO。然而New Shuffle帶來(lái)的問題也隨之暴漏,其中影響比較重要的兩個(gè)就是:慢節(jié)點(diǎn)和數(shù)據(jù)重復(fù)。

  慢節(jié)點(diǎn)。以shuffle寫入過程中出現(xiàn)慢節(jié)點(diǎn)為例,通常包含兩個(gè)情況。首先,Shuffle自身慢節(jié)點(diǎn),對(duì)比社區(qū)版本中只會(huì)影響到一個(gè)task,New Shuffle中常常會(huì)影響到一片集群。在這里,百度為每個(gè)Shuffle節(jié)點(diǎn)都配置了一個(gè)從節(jié)點(diǎn),當(dāng)Map檢測(cè)到一個(gè)慢節(jié)點(diǎn)時(shí),系統(tǒng)會(huì)自動(dòng)切換到從節(jié)點(diǎn)。其次,DFS出現(xiàn)慢節(jié)點(diǎn),這個(gè)情況下,Shuffle的從節(jié)點(diǎn)只能起到緩解作用。這種情況下,首先DFS系統(tǒng)會(huì)自動(dòng)檢測(cè)出慢節(jié)點(diǎn),并進(jìn)行替換。比如,傳統(tǒng)的HDFS會(huì)以pipeline的形式進(jìn)行寫入,而DFS則轉(zhuǎn)換為分發(fā)寫。

  在此之外,New Shuffle還需要解決更多問題,比如資源共享和隔離等。同時(shí),基于New Shuffle的機(jī)制,New Shuffle還面臨一些其他挑戰(zhàn),比如Reduce全啟動(dòng)、數(shù)據(jù)過于分散、對(duì)DFS壓力過大、連接數(shù)等等。

  數(shù)據(jù)重復(fù)。如上圖所示,這些問題主要因?yàn)镹ew Shuffle對(duì)上層組件缺少感知,這個(gè)問題的解決主要使用task id和block id進(jìn)行去重。

  New Shuffle展望

  孫垚光表示,New Shuffle使用了通用的Writer和Reader接口,當(dāng)下已經(jīng)支持百度MR和DCE(DAG、C++),同時(shí)即將對(duì)開源Spark提供支持。在未來(lái),New Shuffle無(wú)疑將成為更通用的組件,支持更多的計(jì)算模型。

  百度美國(guó)硅谷研發(fā)中心高級(jí)架構(gòu)師劉少山——Fast big data analytics with Spark on Tachyon

  Tachyon是一個(gè)分布式的內(nèi)存文件系統(tǒng),可以在集群里以訪問內(nèi)存的速度來(lái)訪問存在Tachyon里的文件。Tachyon是架構(gòu)在分布式文件存儲(chǔ)和上層各種計(jì)算框架之間的中間件,主要負(fù)責(zé)將那些不需要落到DFS里的文件,落到分布式內(nèi)存文件系統(tǒng)中,從而達(dá)到共享內(nèi)存,以提高效率。1月10日下午的最后一場(chǎng)分享中,劉少山帶來(lái)了一場(chǎng)Tachyon的深入解析。

  Tachyon和Spark

  劉少山表示,在Spark使用過程中,用戶經(jīng)常困擾于3個(gè)問題:首先,兩個(gè)Spark 實(shí)例通過存儲(chǔ)系統(tǒng)來(lái)共享數(shù)據(jù),這個(gè)過程中對(duì)磁盤的操作會(huì)顯著降低性能;其次,因?yàn)镾park崩潰所造成的數(shù)據(jù)丟失;最后,垃圾回收機(jī)制,如果兩個(gè)Spark實(shí)例需求同樣的數(shù)據(jù),那么這個(gè)數(shù)據(jù)會(huì)被緩存兩次,從而造成很大的內(nèi)存壓力,更降低性能。

  使用Tachyon,存儲(chǔ)可以從Spark中分離處理,讓其更專注于計(jì)算,從而避免了上述的3個(gè)問題。

  Tachyon架構(gòu)

  劉少山從Spark的角度分享了Tachyon的部署。在與Spark搭配使用時(shí),系統(tǒng)會(huì)建立一個(gè)Tachyon的job,通過Tachyon Client來(lái)訪問同一個(gè)機(jī)器上的Tachyon Worker,也就是機(jī)器上的內(nèi)存。而Tachyon Client則會(huì)與Tachyon Master交互,來(lái)清楚每個(gè)分節(jié)點(diǎn)所包含的數(shù)據(jù)。由此可見,在整個(gè)Tachyon 系統(tǒng)中,Master、Client和Worker為最重要的三個(gè)部分。

  Tachyon Master。Master主要部件是Inode和Master Worker Info:Inode會(huì)負(fù)責(zé)系統(tǒng)的監(jiān)視,Master Worker Info則存儲(chǔ)了所有Worker的信息。

  Tachyon Worker。Worker主要負(fù)責(zé)存儲(chǔ),其中Worker Storage是最主要的數(shù)據(jù)結(jié)構(gòu),包含Local data folder和Under File System兩個(gè)部分。其中Local data folder表示存在本地的Tachyon文件,Under File System則負(fù)責(zé)從HDFS中讀取Worker中未發(fā)現(xiàn)的數(shù)據(jù)。

  Tachyon Client。Client為上層用戶提供了一個(gè)透明的機(jī)制,其TachyonFS接口負(fù)責(zé)數(shù)據(jù)請(qǐng)求。每個(gè)Client中有多個(gè)Tachyon File,其中Block In Stream負(fù)責(zé)文件讀。↙ocal Block In Stream負(fù)責(zé)本地機(jī)器讀取,Remote Block In Stream則負(fù)責(zé)讀取遠(yuǎn)程機(jī)器);Block Out Stream主要負(fù)責(zé)將文件寫到本地機(jī)器上。在Client上,Master Client會(huì)與Master交互,Worker Client則與Client交互。

  Tachyon在百度

  為什么要使用Tachyon,劉少山指出,在百度,計(jì)算集群和存儲(chǔ)集群往往不在同一個(gè)地理位置的數(shù)據(jù)中心,在大數(shù)據(jù)分析時(shí),遠(yuǎn)程數(shù)據(jù)讀取將帶來(lái)非常高的延時(shí),特別是ad-hoc查詢。因此,將Tachyon作為一個(gè)傳輸緩存層,百度通常會(huì)將之部署在計(jì)算集群上。首次查詢時(shí),數(shù)據(jù)會(huì)從遠(yuǎn)程存儲(chǔ)取出,而在以后的查詢中,數(shù)據(jù)就會(huì)從本地的Tacnyon上讀取,從而大幅的改善了延時(shí)。

  在百度,Tachyon的部署還處于初始階段,大約部署了50臺(tái)機(jī)器,主要服務(wù)于ad-hoc查詢。

  實(shí)踐中遭遇的挑戰(zhàn)

  通過劉少山了解到,Tachyon的使用過程并不是一帆風(fēng)順,比如:因?yàn)門achyon需求對(duì)Block完全讀取,從而可能造成Blocks并未被緩存;有時(shí)候,雖然scheduler已經(jīng)確認(rèn)了數(shù)據(jù)存在本地,Spark workers仍然從遠(yuǎn)程blocks讀取,而緩存命中率也只有可憐的33%(如果你需要的是2號(hào)block,Tachyon會(huì)分別從1、2、3號(hào)block讀取,從而將block讀取了3份)。因此,劉少山表示,如果要使用好Spark與Tachyon,一定要對(duì)用例和Tachyon進(jìn)行充分的了解。

  分享最后,劉少山還介紹了Hierarchical Storage Feature特性以及百度未來(lái)的工作,其中包括緩存替換策略等。

分享到: 收藏

專題