五月婷婷丁香性爱|j久久一级免费片|久久美女福利视频|中文观看在线观看|加勒比四区三区二|亚洲裸女视频网站|超碰97AV在线69网站免费观看|有码在线免费视频|久久青青日本视频|亚洲国产AAAA

[快速入門]Mapreduce框架
MapReduce服務(wù)定價

MapReduce服務(wù) 定價與計費 價格計算器 1對1咨詢 計費項 購買MRS集群的費用包含兩個部分: 1、MRS服務(wù)管理費用 2、IaaS基礎(chǔ)設(shè)施資源費用(彈性云服務(wù)器,云硬盤,彈性IP/帶寬等) MRS服務(wù)管理費用詳情,請參見產(chǎn)品價格詳情。您可以通過MRS提供的價格計算器,選

MapReduce服務(wù)入門

購買 文檔 專家咨詢 4步快速使用MapReduce服務(wù) 4步快速使用MapReduce服務(wù) 上傳數(shù)據(jù) 如果您沒有購買集群,請上傳程序和數(shù)據(jù)到對象存儲服務(wù)(OBS)中;如果您已經(jīng)購買了集群,請上傳程序和數(shù)據(jù)到HDFS。 購買集群 進(jìn)入MapReduce管理控制臺,單擊“購買集群”并配

MapReduce服務(wù)學(xué)習(xí)與資源

MapReduce服務(wù) MRS 資源 MapReduce服務(wù) MRS 資源 提供Hadoop、HBase、Hudi、Spark、Flink等開源大數(shù)據(jù)組件,支持湖倉一體、靈活的彈性控制能力。 提供Hadoop、HBase、Hudi、Spark、Flink等開源大數(shù)據(jù)組件,支持湖倉一體、靈活的彈性控制能力。

云數(shù)據(jù)遷移 CDM

景 支持近20種常用數(shù)據(jù)源,滿足數(shù)據(jù)在云上和云下的不同遷移場景 遷移效率高 基于分布式計算框架進(jìn)行數(shù)據(jù)任務(wù)執(zhí)行和數(shù)據(jù)傳輸優(yōu)化,并針對特定數(shù)據(jù)源寫入做了專項優(yōu)化,遷移效率高 基于分布式計算框架進(jìn)行數(shù)據(jù)任務(wù)執(zhí)行和數(shù)據(jù)傳輸優(yōu)化,并針對特定數(shù)據(jù)源寫入做了專項優(yōu)化,遷移效率高 簡單易用 免

華為云數(shù)據(jù)湖探索服務(wù) DLI

統(tǒng)一的管理機(jī)制 使用統(tǒng)一的IAM管理用戶(無需單獨創(chuàng)建DLI用戶),支持IAM細(xì)粒度授權(quán) 搭配使用 MapReduce服務(wù) MRS 基因行業(yè) ????基因數(shù)據(jù)處理 現(xiàn)在基因行業(yè)有很多基于Spark分布式框架的第三方分析庫,如ADAM、Hail等 優(yōu)勢 支持自定義鏡像 支持基于基礎(chǔ)鏡像打包ADA

數(shù)據(jù)治理中心

服務(wù),對象存儲等30+同構(gòu)/異構(gòu)數(shù)據(jù)源,基于分布式計算框架,穩(wěn)定高效地對海量數(shù)據(jù)進(jìn)行遷移,幫助用戶快速完成數(shù)據(jù)入湖開發(fā) 統(tǒng)一集成工具 入湖統(tǒng)一配置:支持全量遷移+增量集成以及實時同步任務(wù) 高可靠高穩(wěn)定:分布式數(shù)據(jù)遷移同步框架,計算資源Serverless化 全域數(shù)據(jù)類型 支持豐富多樣的數(shù)據(jù)源,快速實現(xiàn)數(shù)據(jù)自由流動

對象存儲服務(wù) OBS功能-BigData Pro

存算分離資源利用率更高 存算分離資源利用率更高 OBS具備海量數(shù)據(jù)存儲能力,結(jié)合華為云MapReduce服務(wù),為美圖提供BigData Pro大數(shù)據(jù)解決方案 OBS具備海量數(shù)據(jù)存儲能力,結(jié)合華為云MapReduce服務(wù),為美圖提供BigData Pro大數(shù)據(jù)解決方案 典型業(yè)務(wù)場景-大數(shù)據(jù)離線分析

華為云Astro企業(yè)應(yīng)用

支持領(lǐng)域驅(qū)動設(shè)計的建模,以及復(fù)雜關(guān)系建模,生成架構(gòu)規(guī)范的代碼框架,降低開發(fā)門檻 安全可靠,內(nèi)置業(yè)界領(lǐng)先的華為10+安全防護(hù)能力 內(nèi)置業(yè)界優(yōu)秀開發(fā)規(guī)范實踐,內(nèi)置編碼規(guī)范、API規(guī)范、數(shù)據(jù)規(guī)范,提升企業(yè)研發(fā)質(zhì)量 框架組件:提供標(biāo)準(zhǔn)框架能力,開發(fā)人員聚焦業(yè)務(wù)開發(fā) 框架組件:提供標(biāo)準(zhǔn)框架能力,開發(fā)人員聚焦業(yè)務(wù)開發(fā) 中

數(shù)據(jù)接入服務(wù) DIS

支持百萬并發(fā)消息,端到端時延低至毫秒級 生態(tài)完善 支持對接各分析平臺,數(shù)據(jù)傳輸、計算一站式處理 建議搭配使用 數(shù)據(jù)湖探索 DLI MapReduce 服務(wù) 實時文件傳輸??? 實時檢測客戶應(yīng)用系統(tǒng)中產(chǎn)生的日志文件,并采集上傳到云上,進(jìn)行離線分析、存儲查詢及機(jī)器學(xué)習(xí),可應(yīng)用于日志分析、用戶畫像、營銷推薦等場景。

[相關(guān)產(chǎn)品]Mapreduce框架
云框架-軟件系統(tǒng)框架

使用云框架的兩個常見原因: 在表格中安排數(shù)據(jù),從而用來呈現(xiàn)數(shù)據(jù)間的關(guān)系;或者在ap上組織圖形和文本,也就是用于app布局。1.增大系統(tǒng)容量。我們的業(yè)務(wù)量越來越大,而要能應(yīng)對越來越大的業(yè)務(wù)量,普通框架的性能已經(jīng)無法滿足了,我們需要專業(yè)的框架才能應(yīng)對大規(guī)模的應(yīng)用場景。所以,我們需要垂

應(yīng)用使能框架專業(yè)服務(wù)

業(yè)務(wù)一站式服務(wù)。(4)提供個人協(xié)同服務(wù)、團(tuán)隊協(xié)同服務(wù)、會議協(xié)同服務(wù)服務(wù)、業(yè)務(wù)協(xié)同服務(wù)等協(xié)同辦公服務(wù)。2、開發(fā)框架使能服務(wù):(1)提供移動框架服務(wù)、能力開放服務(wù)、微服務(wù)框架服務(wù)、中間件服務(wù)、通用基礎(chǔ)服務(wù)等通用基礎(chǔ)服務(wù)。(2)提供事件監(jiān)控域服務(wù)、指標(biāo)體系域服務(wù)、專題分析域服務(wù)、協(xié)同辦

UbuntuPHP5.6運行環(huán)境(ThinkPHP框架)

ThinkPHP是一個快速、簡單的基于MVC和面向?qū)ο蟮妮p量級PHP開發(fā)框架。ThinkPHP是一個快速、簡單的基于MVC和面向?qū)ο蟮妮p量級PHP開發(fā)框架,遵循Apache2開源協(xié)議發(fā)布,從誕生以來一直秉承簡潔實用的設(shè)計原則,在保持出色的性能和至簡的代碼的同時,尤其注重開發(fā)體驗和

北交聯(lián)合云分布式服務(wù)框架系統(tǒng)

北交聯(lián)合云分布式服務(wù)框架幫助企業(yè)構(gòu)建 SOA 化的服務(wù)平臺,同時快速積累企業(yè)軟件資產(chǎn),提升企業(yè)的業(yè)務(wù)能力。不僅是企業(yè)架構(gòu)、開放服務(wù)、集中運維平臺,更是從開發(fā)到運維的全生命周期的服務(wù)技術(shù)平臺,幫助企業(yè)留下軟件資產(chǎn),保證服務(wù)能力、保證業(yè)務(wù)共享和業(yè)務(wù)的靈活性。產(chǎn)品功能服務(wù)發(fā)布及管理原有

hadoop生態(tài)組件

rk:spark是個開源的數(shù)據(jù) 分析集群計算框架,最初由加州大學(xué)伯克利分校AMPLab,建立于HDFS之上。spark與hadoop一樣,用于構(gòu)建大規(guī)模,延遲低的數(shù)據(jù)分析應(yīng)用。spark采用Scala語言實現(xiàn),使用Scala作為應(yīng)用框架。spark采用基于內(nèi)存的分布式數(shù)據(jù)集,優(yōu)化

智慧城市運營中心(IOC)共享交換模塊解決方案

3、統(tǒng)一建設(shè),資源整合 4、穩(wěn)定可靠,安全運行 5、面向服務(wù),規(guī)范管理國家電子政務(wù)總體框架由服務(wù)與應(yīng)用系統(tǒng)、信息資源、基礎(chǔ)設(shè)施、法律法規(guī)與標(biāo)準(zhǔn)化體系、管理體制等部分組成。政務(wù)信息資源目錄體系與交換體系是國家電子政務(wù)總體框架中的重要組成部分,是電子政務(wù)的基礎(chǔ)設(shè)施之一。政務(wù)信息資源目錄與交換體系在

科藍(lán)分布式批量任務(wù)調(diào)度平臺配套服務(wù)

志。 3. 支持多種定時觸發(fā)策略,CRON、固定頻率、固定延遲、API、手動觸發(fā)等; 4. 支持多種調(diào)度模式,單機(jī)、廣播、Map、MapReduce; 5. 可視化配置任務(wù)組-工作流,可拖拉拽的方式完成任務(wù)組的編排;支持簡單判斷邏輯及嵌套子任務(wù)組;支持上下游任務(wù)間的參數(shù)傳遞; 6

大數(shù)據(jù)集群搭建

CDH大數(shù)據(jù)集群安裝,包括HDFS、MapReduce、Hive、Hbase、Zookeeper、Sqoop,Kafka簡化了大數(shù)據(jù)平臺的安裝、使用難度。1. CDH簡介      在商業(yè)應(yīng)用中,對于企業(yè)成百上千的機(jī)器集群進(jìn)行安裝hadoop一系列組件費時費力,而且hadoop各

科藍(lán)分布式批量任務(wù)調(diào)度平臺

看日志。3. 支持多種定時觸發(fā)策略,CRON、固定頻率、固定延遲、API、手動觸發(fā)等;4. 支持多種調(diào)度模式,單機(jī)、廣播、Map、MapReduce;5. 可視化配置任務(wù)組-工作流,可拖拉拽的方式完成任務(wù)組的編排;支持簡單判斷邏輯及嵌套子任務(wù)組;支持上下游任務(wù)間的參數(shù)傳遞;6. 支持多種執(zhí)行器,Spring

[相似文章]Mapreduce框架
MapReduce工作原理_MapReduce是什么意思_MapReduce流程_MRS_華為云

站式運維能力。 MapReduce相關(guān)精選推薦 MapReduce服務(wù) MapReduce服務(wù)入門 MapReduce服務(wù)定價 MapReduce服務(wù)學(xué)習(xí)與資源 MapReduce 使用Mapreduce MapReduce Action 使用MapReduce 查看更多 收起

深圳MES系統(tǒng)_MES框架_MES集成

深圳MES系統(tǒng) 深圳MES系統(tǒng) 聚焦行業(yè)化,深度專業(yè)化的MES系統(tǒng),滿足規(guī)上企業(yè)、中大型企業(yè)、專精特新企業(yè)所需的“一站式”數(shù)字化工廠解決方案。 聚焦行業(yè)化,深度專業(yè)化的MES系統(tǒng),滿足規(guī)上企業(yè)、中大型企業(yè)、專精特新企業(yè)所需的“一站式”數(shù)字化工廠解決方案。 歐軟云MES立即購買 免費試用

MapReduce服務(wù)_什么是MapReduce服務(wù)_什么是HBase

使用Hive客戶端創(chuàng)建外部表 MapReduce服務(wù) MRS 03:44 MapReduce服務(wù) MRS 安裝及使用MRS客戶端 MapReduce服務(wù) MRS 03:22 MapReduce服務(wù) MRS 使用HBase客戶端創(chuàng)建表 MapReduce服務(wù) MRS 04:20 MapReduce服務(wù) MRS

MapReduce服務(wù)_什么是Yarn_如何使用Yarn

會處理此應(yīng)用的資源申請。其優(yōu)先級從高到低依次為:本地資源的申請、同機(jī)架的申請,任意機(jī)器的申請。 Yarn原理 新的Hadoop MapReduce框架被命名為MRv2或Yarn。Yarn主要包括ResourceManager、ApplicationMaster與NodeManager三個部分。

MRS備份恢復(fù)_MapReduce備份_數(shù)據(jù)備份

MRS精選文章推薦 大數(shù)據(jù)分析是什么_使用MapReduce_創(chuàng)建MRS服務(wù) 什么是Manager_Manager的功能_MRS運維管理 MapReduce工作原理_MapReduce是什么意思_MapReduce流程 MapReduce服務(wù)_什么是HetuEngine_如何使用HetuEngine

OA協(xié)同辦公_OA數(shù)據(jù)庫_OA框架

OA協(xié)同辦公 OA協(xié)同辦公 浩辰云圖CAD網(wǎng)頁版軟件,包括后端各種API能力+前端JS SDK,專門為企業(yè)量身定制的CAD在線看圖產(chǎn)品,無需安裝任何插件,可無縫集成到各類BS系統(tǒng)、APP軟件、微信小程序中,實現(xiàn)企業(yè)圖紙管理、協(xié)同辦公等私有化。 浩辰云圖CAD網(wǎng)頁版軟件,包括后端各種API能力+前端JS

MapReduce服務(wù)_如何使用MapReduce服務(wù)_MRS集群客戶端安裝與使用

使用Hive客戶端創(chuàng)建外部表 MapReduce服務(wù) MRS 03:44 MapReduce服務(wù) MRS 安裝及使用MRS客戶端 MapReduce服務(wù) MRS 03:22 MapReduce服務(wù) MRS 使用HBase客戶端創(chuàng)建表 MapReduce服務(wù) MRS 04:20 MapReduce服務(wù) MRS

MapReduce服務(wù)_什么是HetuEngine_如何使用HetuEngine

個文件最大值、日志歸檔的最大保留數(shù)目等。 MRS精選文章推薦 大數(shù)據(jù)分析是什么_使用MapReduce_創(chuàng)建MRS服務(wù) MapReduce工作原理_MapReduce是什么意思_MapReduce流程 ECS-服務(wù)器-云服務(wù)器-華為ECS-彈性云服務(wù)器試用 免費云服務(wù)器_個人免費

MapReduce服務(wù)_什么是Hue_如何使用Hue

MRS精選文章推薦 MRS優(yōu)勢_什么是MRS_MRS功能 MapReduce工作原理_MapReduce是什么意思_MapReduce流程 MapReduce服務(wù)_什么是HetuEngine_如何使用HetuEngine MRS備份恢復(fù)_MapReduce備份_數(shù)據(jù)備份 怎樣選擇彈性云服務(wù)器_ECS哪家強_華為ECS

Mapreduce框架

簡介

Spark是基于內(nèi)存的分布式計算框架。在迭代計算的場景下,數(shù)據(jù)處理過程中的數(shù)據(jù)可以存儲在內(nèi)存中,提供了比MapReduce高10到100倍的計算能力。Spark可以使用HDFS作為底層存儲,使用戶能夠快速地從MapReduce切換到Spark計算平臺上去。Spark提供一站式數(shù)據(jù)分析能力,包括小批量流式處理、離線批處理、SQL查詢、數(shù)據(jù)挖掘等,用戶可以在同一個應(yīng)用中無縫結(jié)合使用這些能力。Spark2x的開源新特性請參考Spark2x開源新特性說明。

更多關(guān)于Spark2x組件操作指導(dǎo),請參考使用Spark/Spark2x。

Spark的特點如下:

  • 通過分布式內(nèi)存計算和DAG(無回路有向圖)執(zhí)行引擎提升數(shù)據(jù)處理能力,比MapReduce性能高10倍到100倍。
  • 提供多種語言開發(fā)接口(Scala/Java/Python),并且提供幾十種高度抽象算子,可以很方便構(gòu)建分布式的數(shù)據(jù)處理應(yīng)用。
  • 結(jié)合SQL、Streaming等形成數(shù)據(jù)處理棧,提供一站式數(shù)據(jù)處理能力。
  • 支持契合Hadoop生態(tài)環(huán)境,Spark應(yīng)用可以運行在Standalone、Mesos或者YARN上,能夠接入HDFS、HBase、Hive等多種數(shù)據(jù)源,支持MapReduce程序平滑轉(zhuǎn)接。

結(jié)構(gòu)

Spark的架構(gòu)如圖1所示,各模塊的說明如表1所示。

圖1 Spark架構(gòu)
表1 基本概念說明

模塊

說明

Cluster Manager

集群管理器,管理集群中的資源。Spark支持多種集群管理器,Spark自帶的Standalone集群管理器、Mesos或YARN。Spark集群默認(rèn)采用YARN模式。

Application

Spark應(yīng)用,由一個Driver Program和多個Executor組成。

Deploy Mode

部署模式,分為cluster和client模式。cluster模式下,Driver會在集群內(nèi)的節(jié)點運行;而在client模式下,Driver在客戶端運行(集群外)。

Driver Program

是Spark應(yīng)用程序的主進(jìn)程,運行Application的main()函數(shù)并創(chuàng)建SparkContext。負(fù)責(zé)應(yīng)用程序的解析、生成Stage并調(diào)度Task到Executor上。通常SparkContext代表Driver Program。

Executor

在Work Node上啟動的進(jìn)程,用來執(zhí)行Task,管理并處理應(yīng)用中使用到的數(shù)據(jù)。一個Spark應(yīng)用一般包含多個Executor,每個Executor接收Driver的命令,并執(zhí)行一到多個Task。

Worker Node

集群中負(fù)責(zé)啟動并管理Executor以及資源的節(jié)點。

Job

一個Action算子(比如collect算子)對應(yīng)一個Job,由并行計算的多個Task組成。

Stage

每個Job由多個Stage組成,每個Stage是一個Task集合,由DAG分割而成。

Task

承載業(yè)務(wù)邏輯的運算單元,是Spark平臺上可執(zhí)行的最小工作單元。一個應(yīng)用根據(jù)執(zhí)行計劃以及計算量分為多個Task。

Spark原理

Spark的應(yīng)用運行架構(gòu)如圖2所示,運行流程如下所示:

  1. 應(yīng)用程序(Application)是作為一個進(jìn)程的集合運行在集群上的,由Driver進(jìn)行協(xié)調(diào)。
  2. 在運行一個應(yīng)用時,Driver會去連接集群管理器(Standalone、Mesos、YARN)申請運行Executor資源,并啟動ExecutorBackend。然后由集群管理器在不同的應(yīng)用之間調(diào)度資源。Driver同時會啟動應(yīng)用程序DAG調(diào)度、Stage劃分、Task生成。
  3. 然后Spark會把應(yīng)用的代碼(傳遞給SparkContext的JAR或者Python定義的代碼)發(fā)送到Executor上。
  4. 所有的Task執(zhí)行完成后,用戶的應(yīng)用程序運行結(jié)束。
圖2 Spark應(yīng)用運行架構(gòu)

Spark采用Master和Worker的模式,如圖3所示。用戶在Spark客戶端提交應(yīng)用程序,調(diào)度器將Job分解為多個Task發(fā)送到各個Worker中執(zhí)行,各個Worker將計算的結(jié)果上報給Driver(即Master),Driver聚合結(jié)果返回給客戶端。

圖3 Spark的Master和Worker

在此結(jié)構(gòu)中,有幾個說明點:

  • 應(yīng)用之間是獨立的。

    每個應(yīng)用有自己的executor進(jìn)程,Executor啟動多個線程,并行地執(zhí)行任務(wù)。無論是在調(diào)度方面,或者是executor方面。各個Driver獨立調(diào)度自己的任務(wù);不同的應(yīng)用任務(wù)運行在不同的JVM上,即不同的Executor。

  • 不同Spark應(yīng)用之間是不共享數(shù)據(jù)的,除非把數(shù)據(jù)存儲在外部的存儲系統(tǒng)上(比如HDFS)。
  • 因為Driver程序在集群上調(diào)度任務(wù),所以Driver程序需要和worker節(jié)點比較近,比如在一個相同的局部網(wǎng)絡(luò)內(nèi)。

Spark on YARN有兩種部署模式:

  • YARN-Cluster模式下,Spark的Driver會運行在YARN集群內(nèi)的ApplicationMaster進(jìn)程中,ApplicationMaster已經(jīng)啟動之后,提交任務(wù)的客戶端退出也不會影響任務(wù)的運行。
  • YARN-Client模式下,Driver啟動在客戶端進(jìn)程內(nèi),ApplicationMaster進(jìn)程只用來向YARN集群申請資源。

Spark Streaming原理

Spark Streaming是一種構(gòu)建在Spark上的實時計算框架,擴(kuò)展了Spark處理大規(guī)模流式數(shù)據(jù)的能力。當(dāng)前Spark支持兩種數(shù)據(jù)處理方式:Direct Streaming和Receiver方式。

Direct Streaming計算流程

Direct Streaming方式主要通過采用Direct API對數(shù)據(jù)進(jìn)行處理。以Kafka Direct接口為例,與啟動一個Receiver來連續(xù)不斷地從Kafka中接收數(shù)據(jù)并寫入到WAL中相比,Direct API簡單地給出每個batch區(qū)間需要讀取的偏移量位置。然后,每個batch的Job被運行,而對應(yīng)偏移量的數(shù)據(jù)在Kafka中已準(zhǔn)備好。這些偏移量信息也被可靠地存儲在checkpoint文件中,應(yīng)用失敗重啟時可以直接讀取偏移量信息。

圖4 Direct Kafka接口數(shù)據(jù)傳輸

需要注意的是,Spark Streaming可以在失敗后重新從Kafka中讀取并處理數(shù)據(jù)段。然而,由于語義僅被處理一次,重新處理的結(jié)果和沒有失敗處理的結(jié)果是一致的。

因此,Direct API消除了需要使用WAL和Receivers的情況,且確保每個Kafka記錄僅被接收一次,這種接收更加高效。使得Spark Streaming和Kafka可以很好地整合在一起。總體來說,這些特性使得流處理管道擁有高容錯性、高效性及易用性,因此推薦使用Direct Streaming方式處理數(shù)據(jù)。

Receiver計算流程

在一個Spark Streaming應(yīng)用開始時(也就是Driver開始時),相關(guān)的StreamingContext(所有流功能的基礎(chǔ))使用SparkContext啟動Receiver成為長駐運行任務(wù)。這些Receiver接收并保存流數(shù)據(jù)到Spark內(nèi)存中以供處理。用戶傳送數(shù)據(jù)的生命周期如圖5所示:

圖5 數(shù)據(jù)傳輸生命周期
  1. 接收數(shù)據(jù)(藍(lán)色箭頭)

    Receiver將數(shù)據(jù)流分成一系列小塊,存儲到Executor內(nèi)存中。另外,在啟用預(yù)寫日志(Write-ahead Log,簡稱WAL)以后,數(shù)據(jù)同時還寫入到容錯文件系統(tǒng)的預(yù)寫日志中。

  2. 通知Driver(綠色箭頭)

    接收塊中的元數(shù)據(jù)(Metadata)被發(fā)送到Driver的StreamingContext。這個元數(shù)據(jù)包括:

    • 定位其在Executor內(nèi)存中數(shù)據(jù)位置的塊Reference ID。
    • 若啟用了WAL,還包括塊數(shù)據(jù)在日志中的偏移信息。
  3. 處理數(shù)據(jù)(紅色箭頭)

    對每個批次的數(shù)據(jù),StreamingContext使用Block信息產(chǎn)生RDD及其Job。StreamingContext通過運行任務(wù)處理Executor內(nèi)存中的Block來執(zhí)行Job。

  4. 周期性地設(shè)置檢查點(橙色箭頭)
  5. 為了容錯的需要,StreamingContext會周期性地設(shè)置檢查點,并保存到外部文件系統(tǒng)中。

容錯性

Spark及其RDD允許無縫地處理集群中任何Worker節(jié)點的故障。鑒于Spark Streaming建立于Spark之上,因此其Worker節(jié)點也具備了同樣的容錯能力。然而,由于Spark Streaming的長正常運行需求,其應(yīng)用程序必須也具備從Driver進(jìn)程(協(xié)調(diào)各個Worker的主要應(yīng)用進(jìn)程)故障中恢復(fù)的能力。使Spark Driver能夠容錯是件很棘手的事情,因為可能是任意計算模式實現(xiàn)的任意用戶程序。不過Spark Streaming應(yīng)用程序在計算上有一個內(nèi)在的結(jié)構(gòu):在每批次數(shù)據(jù)周期性地執(zhí)行同樣的Spark計算。這種結(jié)構(gòu)允許把應(yīng)用的狀態(tài)(亦稱Checkpoint)周期性地保存到可靠的存儲空間中,并在Driver重新啟動時恢復(fù)該狀態(tài)。

對于文件這樣的源數(shù)據(jù),這個Driver恢復(fù)機(jī)制足以做到零數(shù)據(jù)丟失,因為所有的數(shù)據(jù)都保存在了像HDFS這樣的容錯文件系統(tǒng)中。但對于像Kafka和Flume等其他數(shù)據(jù)源,有些接收到的數(shù)據(jù)還只緩存在內(nèi)存中,尚未被處理,就有可能會丟失。這是由于Spark應(yīng)用的分布操作方式引起的。當(dāng)Driver進(jìn)程失敗時,所有在Cluster Manager中運行的Executor,連同在內(nèi)存中的所有數(shù)據(jù),也同時被終止。為了避免這種數(shù)據(jù)損失,Spark Streaming引進(jìn)了WAL功能。

WAL通常被用于 數(shù)據(jù)庫 和文件系統(tǒng)中,用來保證任何數(shù)據(jù)操作的持久性,即先將操作記入一個持久的日志,再對數(shù)據(jù)施加這個操作。若施加操作的過程中執(zhí)行失敗了,則通過讀取日志并重新施加前面指定的操作,系統(tǒng)就得到了恢復(fù)。下面介紹了如何利用這樣的概念保證接收到的數(shù)據(jù)的持久性。

Kafka數(shù)據(jù)源使用Receiver來接收數(shù)據(jù),是Executor中的長運行任務(wù),負(fù)責(zé)從數(shù)據(jù)源接收數(shù)據(jù),并且在數(shù)據(jù)源支持時還負(fù)責(zé)確認(rèn)收到數(shù)據(jù)的結(jié)果(收到的數(shù)據(jù)被保存在Executor的內(nèi)存中,然后Driver在Executor中運行來處理任務(wù))。

當(dāng)啟用了預(yù)寫日志以后,所有收到的數(shù)據(jù)同時還保存到了容錯文件系統(tǒng)的日志文件中。此時即使Spark Streaming失敗,這些接收到的數(shù)據(jù)也不會丟失。另外,接收數(shù)據(jù)的正確性只在數(shù)據(jù)被預(yù)寫到日志以后Receiver才會確認(rèn),已經(jīng)緩存但還沒有保存的數(shù)據(jù)可以在Driver重新啟動之后由數(shù)據(jù)源再發(fā)送一次。這兩個機(jī)制確保了零數(shù)據(jù)丟失,即所有的數(shù)據(jù)或者從日志中恢復(fù),或者由數(shù)據(jù)源重發(fā)。

如果需要啟用預(yù)寫日志功能,可以通過如下動作實現(xiàn):

  • 通過“streamingContext.checkpoint”(path-to-directory)設(shè)置checkpoint的目錄,這個目錄是一個HDFS的文件路徑,既用作保存流的checkpoint,又用作保存預(yù)寫日志。
  • 設(shè)置SparkConf的屬性“spark.streaming.receiver.writeAheadLog.enable”“true”(默認(rèn)值是“false”)。

在WAL被啟用以后,所有Receiver都獲得了能夠從可靠收到的數(shù)據(jù)中恢復(fù)的優(yōu)勢。建議緩存RDD時不采取多備份選項,因為用于預(yù)寫日志的容錯文件系統(tǒng)很可能也復(fù)制了數(shù)據(jù)。

在啟用了預(yù)寫日志以后,數(shù)據(jù)接收吞吐率會有降低。由于所有數(shù)據(jù)都被寫入容錯文件系統(tǒng),文件系統(tǒng)的寫入吞吐率和用于 數(shù)據(jù)復(fù)制 的網(wǎng)絡(luò)帶寬,可能就是潛在的瓶頸了。在此情況下,需要創(chuàng)建更多的Receiver增加數(shù)據(jù)接收的并行度,或使用更好的硬件以增加容錯文件系統(tǒng)的吞吐率。

恢復(fù)流程

當(dāng)一個失敗的Driver重啟時,按如下流程啟動:
圖6 計算恢復(fù)流程
  1. 恢復(fù)計算(橙色箭頭)

    使用checkpoint信息重啟Driver,重新構(gòu)造SparkContext并重啟Receiver。

  2. 恢復(fù)元數(shù)據(jù)塊(綠色箭頭)

    為了保證能夠繼續(xù)下去所必備的全部元數(shù)據(jù)塊都被恢復(fù)。

  3. 未完成作業(yè)的重新形成(紅色箭頭)

    由于失敗而沒有處理完成的批處理,將使用恢復(fù)的元數(shù)據(jù)再次產(chǎn)生RDD和對應(yīng)的作業(yè)。

  4. 讀取保存在日志中的塊數(shù)據(jù)(藍(lán)色箭頭)

    在這些作業(yè)執(zhí)行時,塊數(shù)據(jù)直接從預(yù)寫日志中讀出。這將恢復(fù)在日志中可靠地保存的所有必要數(shù)據(jù)。

  5. 重發(fā)尚未確認(rèn)的數(shù)據(jù)(紫色箭頭)

    失敗時沒有保存到日志中的緩存數(shù)據(jù)將由數(shù)據(jù)源再次發(fā)送。因為Receiver尚未對其確認(rèn)。

因此通過預(yù)寫日志和可靠的Receiver,Spark Streaming就可以保證沒有輸入數(shù)據(jù)會由于Driver的失敗而丟失。

SparkSQL和DataSet原理

SparkSQL

圖7 SparkSQL和DataSet

Spark SQL是Spark中用于結(jié)構(gòu)化數(shù)據(jù)處理的模塊。在Spark應(yīng)用中,可以無縫地使用SQL語句亦或是DataSet API對結(jié)構(gòu)化數(shù)據(jù)進(jìn)行查詢。

Spark SQL以及DataSet還提供了一種通用的訪問多數(shù)據(jù)源的方式,可訪問的數(shù)據(jù)源包括Hive、 CS V、Parquet、ORC、JSON和JDBC數(shù)據(jù)源,這些不同的數(shù)據(jù)源之間也可以實現(xiàn)互相操作。Spark SQL復(fù)用了Hive的前端處理邏輯和元數(shù)據(jù)處理模塊,使用Spark SQL可以直接對已有的Hive數(shù)據(jù)進(jìn)行查詢。

另外,SparkSQL還提供了諸如API、CLI、JDBC等諸多接口,對客戶端提供多樣接入形式。

Spark SQL Native DDL/DML

Spark1.5將很多DDL/DML命令下壓到Hive執(zhí)行,造成了與Hive的耦合,且在一定程度上不夠靈活(比如報錯不符合預(yù)期、結(jié)果與預(yù)期不一致等)。

Spark2x實現(xiàn)了命令的本地化,使用Spark SQL Native DDL/DML取代Hive執(zhí)行DDL/DML命令。一方面實現(xiàn)和Hive的解耦,另一方面可以對命令進(jìn)行定制化。

DataSet

DataSet是一個由特定域的對象組成的強類型集合,可通過功能或關(guān)系操作并行轉(zhuǎn)換其中的對象。 每個Dataset還有一個非類型視圖,即由多個列組成的DataSet,稱為DataFrame。

DataFrame是一個由多個列組成的結(jié)構(gòu)化的分布式數(shù)據(jù)集合,等同于關(guān)系數(shù)據(jù)庫中的一張表,或者是R/Python中的data frame。DataFrame是Spark SQL中的最基本的概念,可以通過多種方式創(chuàng)建,例如結(jié)構(gòu)化的數(shù)據(jù)集、Hive表、外部數(shù)據(jù)庫或者是RDD。

可用于DataSet的操作分為Transformation和Action。

  • Transformation操作可生成新的DataSet。

    如map、filter、select和aggregate (groupBy)。

  • Action操作可觸發(fā)計算及返回記結(jié)果。

    如count、show或向文件系統(tǒng)寫數(shù)據(jù)。

通常使用兩種方法創(chuàng)建一個DataSet:

  • 最常見的方法是通過使用SparkSession上的read函數(shù)將Spark指向存儲系統(tǒng)上的某些文件。
    val people = spark.read.parquet("...").as[Person]  // Scala
    DataSet<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class));//Java
  • 還可通過已存在的DataSet上可用的transformation操作來創(chuàng)建數(shù)據(jù)集。 例如,在已存在的DataSet上應(yīng)用map操作來創(chuàng)建新的DataSet:
    val names = people.map(_.name)  // 使用Scala語言,且names為一個Dataset
    Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING)); // Java

CLI和JD BCS erver

除了API編程接口之外,Spark SQL還對外提供CLI/JDBC接口:

  • spark-shell和spark-sql腳本均可以提供CLI,以便于調(diào)試。
  • JDBCServer提供JDBC接口,外部可直接通過發(fā)送JDBC請求來完成結(jié)構(gòu)化數(shù)據(jù)的計算和解析。

SparkSession原理

SparkSession是Spark2x編程的統(tǒng)一API,也可看作是讀取數(shù)據(jù)的統(tǒng)一入口。SparkSession提供了一個統(tǒng)一的入口點來執(zhí)行以前分散在多個類中的許多操作,并且還為那些較舊的類提供了訪問器方法,以實現(xiàn)最大的兼容性。

使用構(gòu)建器模式創(chuàng)建SparkSession。如果存在SparkSession,構(gòu)建器將自動重用現(xiàn)有的SparkSession;如果不存在則會創(chuàng)建一個SparkSession。 在I/O期間,在構(gòu)建器中設(shè)置的配置項將自動同步到Spark和Hadoop。

import org.apache.spark.sql.SparkSessionval sparkSession = SparkSession.builder  .master("local")  .appName("my-spark-app")  .config("spark.some.config.option", "config-value")  .getOrCreate()
  • SparkSession可以用于對數(shù)據(jù)執(zhí)行SQL查詢,將結(jié)果返回為DataFrame。
    sparkSession.sql("select * from person").show
  • SparkSession可以用于設(shè)置運行時的配置項,這些配置項可以在SQL中使用變量替換。
    sparkSession.conf.set("spark.some.config", "abcd")sparkSession.conf.get("spark.some.config")sparkSession.sql("select ${spark.some.config}")
  • SparkSession包括一個“catalog”方法,其中包含使用Metastore(即數(shù)據(jù)目錄)的方法。方法返回值為數(shù)據(jù)集,可以使用相同的Dataset API來運行。
    val tables = sparkSession.catalog.listTables()val columns = sparkSession.catalog.listColumns("myTable")
  • 底層SparkContext可以通過SparkSession的SparkContext API訪問。
    val sparkContext = sparkSession.sparkContext

Structured Streaming原理

Structured Streaming是構(gòu)建在Spark SQL引擎上的流式數(shù)據(jù)處理引擎,用戶可以使用Scala、Java、Python或R中的Dataset/DataFrame API進(jìn)行流數(shù)據(jù)聚合運算、按事件時間窗口計算、流流Join等操作。當(dāng)流數(shù)據(jù)連續(xù)不斷地產(chǎn)生時,Spark SQL將會增量的、持續(xù)不斷地處理這些數(shù)據(jù)并將結(jié)果更新到結(jié)果集中。同時,系統(tǒng)通過checkpoint和Write Ahead Logs確保端到端的完全一次性容錯保證。

Structured Streaming的核心是將流式的數(shù)據(jù)看成一張不斷增加的數(shù)據(jù)庫表,這種流式的數(shù)據(jù)處理模型類似于數(shù)據(jù)塊處理模型,可以把靜態(tài)數(shù)據(jù)庫表的一些查詢操作應(yīng)用在流式計算中,Spark執(zhí)行標(biāo)準(zhǔn)的SQL查詢,從不斷增加的無邊界表中獲取數(shù)據(jù)。
圖8 Structured Streaming無邊界表

每一條查詢的操作都會產(chǎn)生一個結(jié)果集Result Table。每一個觸發(fā)間隔,當(dāng)新的數(shù)據(jù)新增到表中,都會最終更新Result Table。無論何時結(jié)果集發(fā)生了更新,都能將變化的結(jié)果寫入一個外部的存儲系統(tǒng)。

圖9 Structured Streaming數(shù)據(jù)處理模型

Structured Streaming在OutPut階段可以定義不同的存儲方式,有如下3種:

  • Complete Mode:整個更新的結(jié)果集都會寫入外部存儲。整張表的寫入操作將由外部存儲系統(tǒng)的連接器完成。
  • Append Mode:當(dāng)時間間隔觸發(fā)時,只有在Result Table中新增加的數(shù)據(jù)行會被寫入外部存儲。這種方式只適用于結(jié)果集中已經(jīng)存在的內(nèi)容不希望發(fā)生改變的情況下,如果已經(jīng)存在的數(shù)據(jù)會被更新,不適合使用此種方式。
  • Update Mode:當(dāng)時間間隔觸發(fā)時,只有在Result Table中被更新的數(shù)據(jù)才會被寫入外部存儲系統(tǒng)。注意,和Complete Mode方式的不同之處是不更新的結(jié)果集不會寫入外部存儲。

基本概念

  • RDD

    即彈性分布數(shù)據(jù)集(Resilient Distributed Dataset),是Spark的核心概念。指的是一個只讀的,可分區(qū)的分布式數(shù)據(jù)集,這個數(shù)據(jù)集的全部或部分可以緩存在內(nèi)存中,在多次計算間重用。

    RDD的生成:

    • 從HDFS輸入創(chuàng)建,或從與Hadoop兼容的其他存儲系統(tǒng)中輸入創(chuàng)建。
    • 從父RDD轉(zhuǎn)換得到新RDD。
    • 從數(shù)據(jù)集合轉(zhuǎn)換而來,通過編碼實現(xiàn)。

    RDD的存儲:

    • 用戶可以選擇不同的存儲級別緩存RDD以便重用(RDD有11種存儲級別)。
    • 當(dāng)前RDD默認(rèn)是存儲于內(nèi)存,但當(dāng)內(nèi)存不足時,RDD會溢出到磁盤中。
  • Dependency(RDD的依賴)

    RDD的依賴分別為:窄依賴和寬依賴。

    圖10 RDD的依賴
    • 窄依賴:指父RDD的每一個分區(qū)最多被一個子RDD的分區(qū)所用。
    • 寬依賴:指子RDD的分區(qū)依賴于父RDD的所有分區(qū)。

    窄依賴對優(yōu)化很有利。邏輯上,每個RDD的算子都是一個fork/join(此join非上文的join算子,而是指同步多個并行任務(wù)的barrier):把計算fork到每個分區(qū),算完后join,然后fork/join下一個RDD的算子。如果直接翻譯到物理實現(xiàn),是很不經(jīng)濟(jì)的:一是每一個RDD(即使是中間結(jié)果)都需要物化到內(nèi)存或存儲中,費時費空間;二是join作為全局的barrier,是很昂貴的,會被最慢的那個節(jié)點拖死。如果子RDD的分區(qū)到父RDD的分區(qū)是窄依賴,就可以實施經(jīng)典的fusion優(yōu)化,把兩個fork/join合為一個;如果連續(xù)的變換算子序列都是窄依賴,就可以把很多個fork/join并為一個,不但減少了大量的全局barrier,而且無需物化很多中間結(jié)果RDD,這將極大地提升性能。Spark把這個叫做流水線(pipeline)優(yōu)化。

  • Transformation和Action(RDD的操作)

    對RDD的操作包含Transformation(返回值還是一個RDD)和Action(返回值不是一個RDD)兩種。RDD的操作流程如圖11所示。其中Transformation操作是Lazy的,也就是說從一個RDD轉(zhuǎn)換生成另一個RDD的操作不是馬上執(zhí)行,Spark在遇到Transformations操作時只會記錄需要這樣的操作,并不會去執(zhí)行,需要等到有Actions操作的時候才會真正啟動計算過程進(jìn)行計算。Actions操作會返回結(jié)果或把RDD數(shù)據(jù)寫到存儲系統(tǒng)中。Actions是觸發(fā)Spark啟動計算的動因。

    圖11 RDD操作示例

    RDD看起來與Scala集合類型沒有太大差別,但數(shù)據(jù)和運行模型大相迥異。

    val file = sc.textFile("hdfs://...")val errors = file.filter(_.contains("ERROR"))errors.cache()errors.count()
    1. textFile算子從HDFS讀取日志文件,返回file(作為RDD)。
    2. filter算子篩出帶“ERROR”的行,賦給errors(新RDD)。filter算子是一個Transformation操作。
    3. cache算子緩存下來以備未來使用。
    4. count算子返回errors的行數(shù)。count算子是一個Action操作。
    Transformation操作可以分為如下幾種類型:
    • 視RDD的元素為簡單元素。

      輸入輸出一對一,且結(jié)果RDD的分區(qū)結(jié)構(gòu)不變,主要是map。

      輸入輸出一對多,且結(jié)果RDD的分區(qū)結(jié)構(gòu)不變,如flatMap(map后由一個元素變?yōu)橐粋€包含多個元素的序列,然后展平為一個個的元素)。

      輸入輸出一對一,但結(jié)果RDD的分區(qū)結(jié)構(gòu)發(fā)生了變化,如union(兩個RDD合為一個,分區(qū)數(shù)變?yōu)閮蓚€RDD分區(qū)數(shù)之和)、coalesce(分區(qū)減少)。

      從輸入中選擇部分元素的算子,如filter、distinct(去除重復(fù)元素)、subtract(本RDD有、其他RDD無的元素留下來)和sample(采樣)。

    • 視RDD的元素為Key-Value對。

      對單個RDD做一對一運算,如mapValues(保持源RDD的分區(qū)方式,這與map不同);

      對單個RDD重排,如sort、partitionBy(實現(xiàn)一致性的分區(qū)劃分,這個對數(shù)據(jù)本地性優(yōu)化很重要);

      對單個RDD基于key進(jìn)行重組和reduce,如groupByKey、reduceByKey;

      對兩個RDD基于key進(jìn)行join和重組,如join、cogroup。

      后三種操作都涉及重排,稱為shuffle類操作。

    Action操作可以分為如下幾種:

    • 生成標(biāo)量,如count(返回RDD中元素的個數(shù))、reduce、fold/aggregate(返回幾個標(biāo)量)、take(返回前幾個元素)。
    • 生成Scala集合類型,如collect(把RDD中的所有元素導(dǎo)入Scala集合類型)、lookup(查找對應(yīng)key的所有值)。
    • 寫入存儲,如與前文textFile對應(yīng)的saveAsTextFile。
    • 還有一個檢查點算子checkpoint。當(dāng)Lineage特別長時(這在圖計算中時常發(fā)生),出錯時重新執(zhí)行整個序列要很長時間,可以主動調(diào)用checkpoint把當(dāng)前數(shù)據(jù)寫入穩(wěn)定存儲,作為檢查點。
  • Shuffle

    Shuffle是MapReduce框架中的一個特定的phase,介于Map phase和Reduce phase之間,當(dāng)Map的輸出結(jié)果要被Reduce使用時,每一條輸出結(jié)果需要按key哈希,并且分發(fā)到對應(yīng)的Reducer上去,這個過程就是shuffle。由于shuffle涉及到了磁盤的讀寫和網(wǎng)絡(luò)的傳輸,因此shuffle性能的高低直接影響到了整個程序的運行效率。

    下圖清晰地描述了MapReduce算法的整個流程。

    圖12 算法流程

    概念上shuffle就是一個溝通數(shù)據(jù)連接的橋梁,實際上shuffle這一部分是如何實現(xiàn)的呢,下面就以Spark為例講解shuffle在Spark中的實現(xiàn)。

    Shuffle操作將一個Spark的Job分成多個Stage,前面的stages會包括一個或多個ShuffleMapTasks,最后一個stage會包括一個或多個ResultTask。

  • Spark Application的結(jié)構(gòu)

    Spark Application的結(jié)構(gòu)可分為兩部分:初始化SparkContext和主體程序。

    • 初始化SparkContext:構(gòu)建Spark Application的運行環(huán)境。

      構(gòu)建SparkContext對象,如:

      new SparkContext(master, appName, [SparkHome], [jars])

      參數(shù)介紹:

      master:連接字符串,連接方式有l(wèi)ocal、yarn-cluster、yarn-client等。

      appName:構(gòu)建的Application名稱。

      SparkHome:集群中安裝Spark的目錄。

      jars:應(yīng)用程序代碼和依賴包。

    • 主體程序:處理數(shù)據(jù)

    提交Application的描述請參見:https://archive.apache.org/dist/spark/docs/3.1.1/submitting-applications.html

  • Spark shell命令

    Spark基本shell命令,支持提交Spark應(yīng)用。命令為:

    ./bin/spark-submit \  --class <main-class> \  --master <master-url> \  ... # other options  <application-jar> \  [application-arguments]

    參數(shù)解釋:

    --class:Spark應(yīng)用的類名。

    --master:Spark用于所連接的master,如yarn-client,yarn-cluster等。

    application-jar:Spark應(yīng)用的jar包的路徑。

    application-arguments:提交Spark應(yīng)用的所需要的參數(shù)(可以為空)。

  • Spark JobHistory Server

    用于監(jiān)控正在運行的或者歷史的Spark作業(yè)在Spark框架各個階段的細(xì)節(jié)以及提供日志顯示,幫助用戶更細(xì)粒度地去開發(fā)、配置和調(diào)優(yōu)作業(yè)。

Mapreduce框架常見問題

更多常見問題 >>
  • MapReduce是Hadoop的核心,是Google提出的一個軟件架構(gòu),用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運算。概念“Map(映射)”和“Reduce(化簡)”,及他們的主要思想,都是從函數(shù)式編程語言借來的,還有從矢量編程語言借來的特性。

  • 框架管理器協(xié)同TBE為神經(jīng)網(wǎng)絡(luò)生成可執(zhí)行的離線模型。在神經(jīng)網(wǎng)絡(luò)執(zhí)行之前,框架管理器與昇騰AI處理器緊密結(jié)合生成硬件匹配的高性能離線模型,并拉通了流程編排器和運行管理器使得離線模型和昇騰AI處理器進(jìn)行深度融合。

  • MapReduce服務(wù)(MRS)打造了高可靠、高安全、易使用的運行維護(hù)平臺,對外提供大容量的數(shù)據(jù)存儲和分析能力,可解決各大企業(yè)的數(shù)據(jù)存儲和處理需求

  • MapReduce服務(wù)(MapReduce Service)提供租戶完全可控的企業(yè)級大數(shù)據(jù)集群云服務(wù),輕松運行Hadoop、Spark、HBase、KafKa、Storm等大數(shù)據(jù)組件。

  • GaussDB是華為自主創(chuàng)新研發(fā)的分布式關(guān)系型數(shù)據(jù)庫。具備企業(yè)級復(fù)雜事務(wù)混合負(fù)載能力,同時支持分布式事務(wù),同城跨AZ部署,數(shù)據(jù)0丟失,支持1000+節(jié)點的擴(kuò)展能力,PB級海量存儲。

  • 學(xué)完本課程后,您將能夠:描述深度學(xué)習(xí)框架是什么;列舉主流深度學(xué)習(xí)框架有哪些;了解Pytorch的特點;了解TensorFlow的特點;區(qū)別TensorFlow 1.X與2.X版本;掌握TensorFlow 2的基本語法與常用模塊;掌握MNIST手寫體數(shù)字識別實驗的流程。

更多相關(guān)專題