您當(dāng)前的位置是:  首頁 > 資訊 > 文章精選 >
 首頁 > 資訊 > 文章精選 >

監(jiān)控系統(tǒng)哪家強(qiáng)?eBay在監(jiān)控系統(tǒng)上的實(shí)踐應(yīng)用

2019-08-22 16:17:06   作者:   來源:CTI論壇   評(píng)論:0  點(diǎn)擊:


  Sherlock.IO 是 eBay 現(xiàn)有的監(jiān)控平臺(tái),每天要處理上百億條日志、事件和指標(biāo)。Flink Streaming job 實(shí)時(shí)處理系統(tǒng)用于處理其中的日志和事件。本文將結(jié)合監(jiān)控系統(tǒng) Flink 的現(xiàn)狀,具體講述 Flink 在監(jiān)控系統(tǒng)上的實(shí)踐和應(yīng)用,希望給同業(yè)人員一些借鑒和啟發(fā)。
  一。 監(jiān)控系統(tǒng) Flink 的現(xiàn)狀
  eBay 的監(jiān)控平臺(tái) Sherlock.IO 每天處理著上百億條日志(log),事件(event)和指標(biāo)(metric)。通過構(gòu)建 Flink Streaming job 實(shí)時(shí)處理系統(tǒng),監(jiān)控團(tuán)隊(duì)能夠及時(shí)將日志和事件的處理結(jié)果反饋給用戶。當(dāng)前,監(jiān)控團(tuán)隊(duì)維護(hù)著 8 個(gè) Flink 集群,最大的集群規(guī)模達(dá)到上千個(gè) TaskManager,總共運(yùn)行著上百個(gè)作業(yè)(job),一些作業(yè)已經(jīng)穩(wěn)定運(yùn)行了半年以上。
  二。 元數(shù)據(jù)驅(qū)動(dòng)
  為了讓用戶和管理員能夠更加快捷地創(chuàng)建Flink作業(yè)并調(diào)整參數(shù),監(jiān)控團(tuán)隊(duì)在 Flink 上搭建了一套元數(shù)據(jù)微服務(wù)(metadata service),該服務(wù)能夠用Json來描述一個(gè)作業(yè)的 DAG,且相同的 DAG 共用同一個(gè)作業(yè),能夠更加方便地創(chuàng)建作業(yè),無需調(diào)用 Flink API。Sherlock.IO 流處理整體的架構(gòu)如圖1所示。
  圖1 Sherlock.IO 流處理整體架構(gòu)
  目前,用這套元數(shù)據(jù)微服務(wù)創(chuàng)建的作業(yè)僅支持以 Kafka 作為數(shù)據(jù)源,只要數(shù)據(jù)接入到 Kafka,用戶就可以定義 Capability 來處理邏輯從而通過 Flink Streaming 處理數(shù)據(jù)。
  1.元數(shù)據(jù)微服務(wù)
  元數(shù)據(jù)微服務(wù)框架如圖 2 所示,最上層是元數(shù)據(jù)微服務(wù)提供的 Restful API, 用戶通過調(diào)用 API 來描述和提交作業(yè)。描述作業(yè)的元數(shù)據(jù)包含三個(gè)部分:Resource,Capability 和 Policy。Flink 適配器(Adaptor)連接了 Flink Streaming API 和元數(shù)據(jù)微服務(wù) API,且會(huì)根據(jù)元數(shù)據(jù)微服務(wù)描述的作業(yè)調(diào)用 Flink Streaming API 來創(chuàng)建作業(yè),從而屏蔽 Flink StreamAPI。
  因此,用戶不用了解 Flink Streaming API 就可以創(chuàng)建 Flink 作業(yè)。未來如果需要遷移到其他的流處理框架,只要增加一個(gè)適配器,就可以將現(xiàn)有的作業(yè)遷移到新的流處理框架上。
  圖2 元數(shù)據(jù)微服務(wù)框架
  Capability
  Capability 定義了作業(yè)的 DAG 以及每個(gè)算子(Operator)所用的 Class,圖 3 是事件處理(eventProcess) Capability,它最終會(huì)生成如圖 4 的 DAG。事件處理 Capability 先從 Kafka 讀出數(shù)據(jù),再寫到 Elasticsearch 中。該 Capability 將該作業(yè)命名為“eventProcess”,并定義其并行度為“5”,其算子為“EventEsIndexSinkCapability”, 其數(shù)據(jù)流為“Source –> sink”。
  圖3 eventESSink Capability

 
  圖4 生成的Flink作業(yè)
  Policy
  每個(gè)命名空間(Namespace)需要定義一個(gè)或多個(gè) Policy,每個(gè) Policy 指定了相應(yīng)的 Capability,即指定了用哪一套 DAG 來運(yùn)行這個(gè) Policy。Policy 還定義了這個(gè)作業(yè)的相關(guān)配置,例如從哪個(gè) Kafka topic 中讀取數(shù)據(jù),寫到 ElasticSearch 的哪個(gè)索引(Index)中,中間是否要跳過某些算子等等。
  其次,Policy 還能作為一個(gè)簡(jiǎn)易的過濾器(Filter),可以通過配置 Jexl 表達(dá)式過濾掉一些不需要的數(shù)據(jù),提高作業(yè)的吞吐量。
  另外,我們還實(shí)現(xiàn)了 Zookeeper 定時(shí)更新的機(jī)制,使得 Policy 修改后不再需要重啟作業(yè),只要是在更新時(shí)間間隔內(nèi),該命名空間的 Policy 修改就會(huì)被自動(dòng)應(yīng)用到作業(yè)上。圖 5 是命名空間為 paas 的 Policy 示例。
  圖5 paas alertESSink Policy
  Resource
  Resource 定義了某個(gè)命名空間所需要的資源,比如 Flink 集群, Kafka broker,ES 集群等等。我們有多個(gè) Flink 集群和 ES 集群,通過 Resource 配置,作業(yè)可以知道某個(gè)命名空間的日志應(yīng)該寫到哪個(gè) ES 集群,并可以判斷該命名空間的數(shù)據(jù)應(yīng)該從哪個(gè) Kafka 集群讀取。
  2.共享作業(yè)
  為了減少作業(yè)數(shù)量,我們可以讓相同的 DAG 復(fù)用同一個(gè)作業(yè)。我們先給不同的 Policy 指定相同的 Capability,在該 Capability 資源足夠的情況下,這些 Policy 就會(huì)被調(diào)度到同一個(gè)作業(yè)上。
  以 SQL 的 Capability 為例,每個(gè) Policy 的 SQL 語句不盡相同,如果為每個(gè) Policy 都創(chuàng)建一個(gè)作業(yè), Job Manager 的開銷就會(huì)很大,且不好管理。因此,我們可以為 SQL Capability 配置 20 個(gè) Slot,每個(gè) Policy 占用一個(gè) Slot。那么該 Capability 生成的作業(yè)就可以運(yùn)行 20 個(gè) Policy。
  作業(yè)運(yùn)行時(shí),從 Source 讀進(jìn)來的數(shù)據(jù)會(huì)被打上相應(yīng) Policy 的標(biāo)簽,并執(zhí)行該 Policy 定義的 SQL 語句,從而實(shí)現(xiàn)不同 Policy 共享同一個(gè)作業(yè),大大減少了作業(yè)的數(shù)量。
  用共享作業(yè)還有一個(gè)好處:如果多個(gè)命名空間的數(shù)據(jù)在一個(gè) Kafka topic 里,那么只要讀一遍數(shù)據(jù)即可,不用每個(gè)命名空間都讀一次 topic 再過濾,這樣就大大提高了處理的效率。
  三。 Flink 作業(yè)的優(yōu)化和監(jiān)控
  了解元數(shù)據(jù)驅(qū)動(dòng)后,讓我們來看看可以通過哪些方法實(shí)現(xiàn) Flink 作業(yè)的而優(yōu)化和監(jiān)控。
  1.Heartbeat
  在 Flink 集群的運(yùn)維過程中,我們很難監(jiān)控作業(yè)的運(yùn)行情況。即使開啟了檢查點(diǎn)(checkpoint),我們也無法確定是否丟失數(shù)據(jù)或丟失了多少數(shù)據(jù)。因此,我們?yōu)槊總(gè)作業(yè)注入了 Heartbeat 以監(jiān)控其運(yùn)行情況。
  Heartbeat 就像 Flink 中用來監(jiān)控延遲的“LatencyMarker”一樣,它會(huì)流過每個(gè)作業(yè)的管道。但與 LatencyMarker 不同的是,當(dāng) Heartbeat 遇到 DAG 的分支時(shí),它會(huì)分裂并流向每個(gè)分支,而不像 LatencyMarker 那樣隨機(jī)流向某一個(gè)分支。另一個(gè)不同點(diǎn)在于 Heartbeat 不是由 Flink 自身產(chǎn)生,而是由元數(shù)據(jù)微服務(wù)定時(shí)產(chǎn)生,而后由每個(gè)作業(yè)消費(fèi)。
  如圖 4 所示,每個(gè)作業(yè)在啟動(dòng)的時(shí)候會(huì)默認(rèn)加一個(gè) Heartbeat 的數(shù)據(jù)源。Heartbeat 流入每個(gè)作業(yè)后,會(huì)隨數(shù)據(jù)流一起經(jīng)過每個(gè)節(jié)點(diǎn),在每個(gè)節(jié)點(diǎn)上打上當(dāng)前節(jié)點(diǎn)的標(biāo)簽,然后跳過該節(jié)點(diǎn)的處理邏輯流向下個(gè)節(jié)點(diǎn)。直到 Heartbeat 流到最后一個(gè)節(jié)點(diǎn)時(shí),它會(huì)以指標(biāo)(Metric)的形式發(fā)送到 Sherlock.IO(eBay 監(jiān)控平臺(tái))。
  該指標(biāo)包含了 Heartbeat 產(chǎn)生的時(shí)間,流入作業(yè)的時(shí)間以及到達(dá)每個(gè)節(jié)點(diǎn)的時(shí)間。通過這個(gè)指標(biāo),我們可以判斷該作業(yè)在讀取 kafka 時(shí)是否延時(shí),以及一條數(shù)據(jù)被整個(gè)管道處理所用的時(shí)間和每個(gè)節(jié)點(diǎn)處理數(shù)據(jù)所用的時(shí)間,進(jìn)而判斷該作業(yè)的性能瓶頸。
  由于 Heartbeat 是定時(shí)發(fā)送的,因此每個(gè)作業(yè)收到的 Heartbeat 個(gè)數(shù)應(yīng)該一致。若最后發(fā)出的指標(biāo)個(gè)數(shù)與期望不一致,則可以進(jìn)一步判斷是否有數(shù)據(jù)丟失。
  圖 6 描述了某 Flink 作業(yè)中的數(shù)據(jù)流以及 Heartbeat 的運(yùn)行狀態(tài):
  圖6 Heartbeat在作業(yè)中的運(yùn)行過程
  2.可用性
  有了 Heartbeat,我們就可以用來定義集群的可用性。首先,我們需要先定義在什么情況下屬于不可用的:
  Flink 作業(yè)重啟
  當(dāng)內(nèi)存不足(OutofMemory)或代碼運(yùn)行錯(cuò)誤時(shí),作業(yè)就可能會(huì)意外重啟。我們認(rèn)為重啟過程中造成的數(shù)據(jù)丟失是不可用的情況之一。因此我們的目標(biāo)之一是讓 Flink 作業(yè)能夠長(zhǎng)時(shí)間穩(wěn)定運(yùn)行。
  Flink 作業(yè)中止
  有時(shí)因?yàn)榛A(chǔ)設(shè)施的問題導(dǎo)致物理機(jī)或者容器沒啟動(dòng)起來,或是在 Flink 作業(yè)發(fā)生重啟時(shí)由于 Slot 不夠而無法啟動(dòng),或者是因?yàn)?Flink 作業(yè)的重啟次數(shù)已經(jīng)超過了最大重啟次數(shù)(rest.retry.max-attempts), Flink 作業(yè)就會(huì)中止。此時(shí)需要人工干預(yù)才能將作業(yè)重新啟動(dòng)起來。
  我們認(rèn)為 Flink 作業(yè)中止時(shí),也是不可用的情況之一。
  Flink 作業(yè)在運(yùn)行中不再處理數(shù)據(jù)
  發(fā)生這種情況,一般是因?yàn)橛龅搅朔磯海˙ackPressure)。造成反壓的原因有很多種,比如上游的流量過大,或者是中間某個(gè)算子的處理能力不夠,或者是下游存儲(chǔ)節(jié)點(diǎn)遇到性能瓶頸等等。雖然短時(shí)間內(nèi)的反壓不會(huì)造成數(shù)據(jù)丟失,但它會(huì)影響數(shù)據(jù)的實(shí)時(shí)性,最明顯的變化是延遲這個(gè)指標(biāo)會(huì)變大。
  我們認(rèn)為反壓發(fā)生時(shí)是不可用的情況之一。
  針對(duì)以上三種情況,我們都可以用 Heartbeat 來監(jiān)控,并計(jì)算可用性。比如第一種情況,如果作業(yè)重啟時(shí)發(fā)生了數(shù)據(jù)丟失,那么相應(yīng)的那段管道的 Heartbeat 也會(huì)丟失,從而我們可以監(jiān)測(cè)出是否有數(shù)據(jù)丟失以及粗粒度地估算數(shù)據(jù)丟了多少。對(duì)于第二種情況,當(dāng)作業(yè)中止時(shí),HeartBeat 也不會(huì)被處理,因此可以很快發(fā)現(xiàn)作業(yè)停止運(yùn)行并讓 on-call 及時(shí)干預(yù)。第三種情況當(dāng)反壓發(fā)生時(shí),HeartBeat 也會(huì)被阻塞在發(fā)生反壓的上游,因此 on-call 也可以很快地發(fā)現(xiàn)反壓發(fā)生并進(jìn)行人工干預(yù)。
  綜上,Heartbeat 可以很快監(jiān)測(cè)出 Flink 作業(yè)的運(yùn)行情況。那么,如何評(píng)估可用性呢?由于 Heartbeat 是定時(shí)發(fā)生的,默認(rèn)情況下我們?cè)O(shè)置每 10 秒發(fā)一次。1 分鐘內(nèi)我們期望每個(gè)作業(yè)的每條管道能夠發(fā)出 6 個(gè)帶有作業(yè)信息的 heartbeat,那么每天就可以收到 8640 個(gè) Heartbeat。
  因此,一個(gè)作業(yè)的可用性可以定義為:
  3.Flink 作業(yè)隔離
  Slot 是 Flink 運(yùn)行作業(yè)的最小單位[1],每個(gè) TaskManager 可以分配一個(gè)至多個(gè) Slot(一般分配的個(gè)數(shù)為該 TaskManager 的 CPU 數(shù))。根據(jù) Flink 作業(yè)的并行度,一個(gè)作業(yè)可以分配到多個(gè) TaskManager 上,而一個(gè) TaskManager 也可能運(yùn)行著多個(gè)作業(yè)。然而,一個(gè) TaskManager 就是一個(gè) JVM,當(dāng)多個(gè)作業(yè)分配到一個(gè) TaskManager 上時(shí),就會(huì)有搶奪資源的情況發(fā)生。
  例如,我一個(gè) TaskManager 分配了 3 個(gè) Slot(3 個(gè) CPU)和 8G 堆內(nèi)存。當(dāng) JobManager 調(diào)度作業(yè)的時(shí)候,有可能將 3 個(gè)不同作業(yè)的線程調(diào)度到該 TaskManager 上,那么這 3 個(gè)作業(yè)就會(huì)同時(shí)搶奪 CPU 和內(nèi)存的資源。當(dāng)其中一個(gè)作業(yè)特別耗 CPU 或內(nèi)存的時(shí)候,就會(huì)影響其他兩個(gè)作業(yè)。
  在這種情況下,我們通過配置 Flink 可以實(shí)現(xiàn)作業(yè)的隔離,如圖 7 所示:
  圖7 Flink 作業(yè)隔離前后的調(diào)度圖
  通過配置:
  通過以上配置,可以限定每個(gè) TaskManager 獨(dú)占 CPU 和內(nèi)存的資源,且不會(huì)多個(gè)作業(yè)搶占,實(shí)現(xiàn)作業(yè)之間的隔離。
  4.反壓
  我們運(yùn)維 Flink 集群的時(shí)候發(fā)現(xiàn),出現(xiàn)最多的問題就是反壓。在 3.2 中提到過,發(fā)生反壓的原因有很多種,但無論什么原因,數(shù)據(jù)最終都會(huì)被積壓在發(fā)生反壓上游的算子的本地緩沖區(qū)(localBuffer)中。
  我們知道,每一個(gè) TaskManager 有一個(gè)本地緩沖池, 每一個(gè)算子數(shù)據(jù)進(jìn)來后會(huì)把數(shù)據(jù)填充到本地緩沖池中,數(shù)據(jù)從這個(gè)算子出去后會(huì)回收這塊內(nèi)存。當(dāng)被反壓后,數(shù)據(jù)發(fā)不出去,本地緩沖池內(nèi)存就無法釋放,導(dǎo)致一直請(qǐng)求緩沖區(qū)(requestBuffer)。
  由于 Heartbeat 只能監(jiān)控出是否發(fā)生了反壓,但無法定位到是哪個(gè)算子出了問題,因此我們定時(shí)地將每個(gè)算子的 StackTrace 打印出來,當(dāng)發(fā)生反壓時(shí),通過 StackTrace 就可以知道是哪個(gè)算子的瓶頸。
  如圖8所示,我們可以清晰地看到發(fā)生反壓的 Flink 作業(yè)及其所在的 Taskmanager。再通過 Thread Dump,我們就可以定位到代碼的問題。
  圖8 發(fā)生反壓的StackTrace (點(diǎn)擊觀看大圖)
  5.其他監(jiān)控手段
  Flink 本身提供了很多有用的指標(biāo)[2]來監(jiān)控 Flink 作業(yè)的運(yùn)行情況,在此基礎(chǔ)上我們還加了一些業(yè)務(wù)上的指標(biāo)。除此之外,我們還使用了以下工具監(jiān)控 Flink 作業(yè)。
  History server
  Flink 的 History server[3]可以查詢已完成作業(yè)的狀態(tài)和指標(biāo)。比如一個(gè)作業(yè)的重啟次數(shù)、它運(yùn)行的時(shí)間。我們常常用它找出運(yùn)行不正常的作業(yè)。比如,我們可以通過 History server 的 attempt 指標(biāo)知道每個(gè)作業(yè)重啟的次數(shù),從而快速去現(xiàn)場(chǎng)找到重啟的原因,避免下次再發(fā)生。
  監(jiān)控作業(yè)和集群
  雖然 Flink 有 HA 的模式,但在極端情況下,例如整個(gè)集群出現(xiàn)問題時(shí),需要 on-call 即時(shí)發(fā)覺并人工干預(yù)。我們?cè)谠獢?shù)據(jù)微服務(wù)中保存了最后一次提交作業(yè)成功的元數(shù)據(jù),它記錄了在每個(gè) Flink 集群上應(yīng)該運(yùn)行哪些作業(yè)。守護(hù)線程(Daemon thread)會(huì)每分鐘去比較這個(gè)元數(shù)據(jù)和 Flink 上運(yùn)行的作業(yè),若發(fā)現(xiàn) JobManager 連不通或者有作業(yè)運(yùn)行不一致則立刻發(fā)出告警(Alert)通知 on-call。
  四。 實(shí)例
  下面介紹幾個(gè)已經(jīng)運(yùn)行在監(jiān)控系統(tǒng)上的 Flink 流處理系統(tǒng)的應(yīng)用:
  1.Event Alerting
  當(dāng)前監(jiān)控團(tuán)隊(duì)是基于 Flink Streaming 做事件告警(Event alerting),我們定義了一個(gè)告警算子 EventAlertingCapability,該 Capability 可以處理每個(gè) Policy 自定義的規(guī)則。如圖 9 定義的一條性能監(jiān)控規(guī)則:
  該規(guī)則的含義是當(dāng)性能檢測(cè)器的應(yīng)用為“r1rover”, 主機(jī)以“r1rover”開頭,且數(shù)值大于 90 時(shí),就觸發(fā)告警。且生成的告警會(huì)發(fā)送到指定的 Kafka topic 中供下游繼續(xù)處理。
  圖9 Single-Threshold1 Policy (點(diǎn)擊查看大圖)
  2.Eventzon
  Eventzon 就像 eBay 的事件中心,它收集了從各個(gè)應(yīng)用,框架,基礎(chǔ)架構(gòu)發(fā)過來的事件,最后通過監(jiān)控團(tuán)隊(duì)的 Flink Streaming 實(shí)時(shí)生成告警。由于各個(gè)事件的數(shù)據(jù)源不同,它們的元數(shù)據(jù)也不同,因此無法用一條統(tǒng)一的規(guī)則來描述它。
  我們專門定義了一套作業(yè)來處理 Eventzon 的事件,它包含了多個(gè) Capability,比如 Filter Capability,用來過濾非法的或者不符合條件的事件; 又比如 Deduplicate Capability,可以用來去除重復(fù)的事件。Eventzon 的所有事件經(jīng)過一整套作業(yè)后,會(huì)生成有效的告警,并根據(jù)通知機(jī)制通過 E-mail、Slack 或 Pagerduty 發(fā)給相關(guān)團(tuán)隊(duì)。
  3.Netmon
  Netmon 的全稱為 Network Monitoring, 即網(wǎng)絡(luò)監(jiān)控,它可以用來監(jiān)控整個(gè) eBay 網(wǎng)絡(luò)設(shè)備的健康狀態(tài)。它的數(shù)據(jù)源來自 eBay 的交換機(jī),路由器等網(wǎng)絡(luò)設(shè)備的日志。Netmon 的作用是根據(jù)這些日志找出一些特定的信息,往往是一些錯(cuò)誤的日志,以此來生成告警。
  eBay 的每一臺(tái)設(shè)備都要“登記造冊(cè)”,每臺(tái)設(shè)備將日志發(fā)過來后,我們通過 EnrichCapability 從“冊(cè)子”中查詢這臺(tái)設(shè)備的信息,并把相關(guān)信息比如 IP 地址,所在的數(shù)據(jù)中心,所在的機(jī)架等填充到日志信息中作為事件保存。當(dāng)設(shè)備產(chǎn)生一些特定的錯(cuò)誤日志時(shí), 它會(huì)被相應(yīng)的規(guī)則匹配然后生成告警,該告警會(huì)被 EventProcess Capability 保存到 Elasticsearch 中實(shí)時(shí)顯示到 Netmon 的監(jiān)控平臺(tái)(dashboard)上。有時(shí)因?yàn)榫W(wǎng)絡(luò)抖動(dòng)導(dǎo)致一些短暫的錯(cuò)誤發(fā)生,但系統(tǒng)過一會(huì)兒就會(huì)自動(dòng)恢復(fù)。
  當(dāng)上述情況發(fā)生時(shí),Netmon 會(huì)有相應(yīng)的規(guī)則將發(fā)生在網(wǎng)絡(luò)抖動(dòng)時(shí)生成的告警標(biāo)記為“已解決”(Resolved)。對(duì)于一些必須人工干預(yù)的告警,運(yùn)維人員可以通過網(wǎng)絡(luò)監(jiān)控平臺(tái)(Netmon dashboard)手動(dòng)點(diǎn)擊“已解決”,完成該告警的生命周期。
  五。 總結(jié)與展望
  eBay 的監(jiān)控團(tuán)隊(duì)希望能根據(jù)用戶提供的指標(biāo)、事件和日志以及相應(yīng)的告警規(guī)則實(shí)時(shí)告警用戶。Flink Streaming 能夠提供低延時(shí)的處理從而能夠達(dá)到我們低延時(shí)的要求,并且它適合比較復(fù)雜的處理邏輯。
  然而在運(yùn)維 Flink 的過程中,我們也發(fā)現(xiàn)了由于作業(yè)重啟等原因?qū)е抡`報(bào)少報(bào)告警的情況發(fā)生,從而誤導(dǎo)客戶。因此今后我們會(huì)在 Flink 的穩(wěn)定性和高可用性上投入更多。我們也希望在監(jiān)控指標(biāo)、日志上能夠集成一些復(fù)雜的 AI 算法,從而能夠生成更加有效精確的告警,成為運(yùn)維人員的一把利器。
【免責(zé)聲明】本文僅代表作者本人觀點(diǎn),與CTI論壇無關(guān)。CTI論壇對(duì)文中陳述、觀點(diǎn)判斷保持中立,不對(duì)所包含內(nèi)容的準(zhǔn)確性、可靠性或完整性提供任何明示或暗示的保證。請(qǐng)讀者僅作參考,并請(qǐng)自行承擔(dān)全部責(zé)任。

專題

CTI論壇會(huì)員企業(yè)