IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark知识点整理——SparkCore部分 -> 正文阅读

[大数据]Spark知识点整理——SparkCore部分

1.分布式计算引擎的发展

  • (1) 第一代:MapReduce
    • 应用:传统的离线计算
    • 缺点:基于磁盘存储,读写性能差,灵活性差
  • (2) 第二代:Tez、Storm
    • 应用:离线计算
    • 缺点:Tez基于MR做了颗粒度拆分,提高了灵活性,但还是基于磁盘,读写性能没有改善
    • Storm应用于实时计算
  • (3) 第三代:Spark
    • 离线+实时:lambda架构
    • 目前在工作中主要使用的离线计算引擎
  • (4) 第四代:Flink
    • 所有计算全部都是通过实时来实现的:Kappa架构
    • 目前在工作中主要使用实时计算引擎

2.Spark简介

  • (1) 定义:Spark是一个高速的、统一化的分布式分析计算引擎和机器学习算法库
  • (2) 功能:
    • ① 实现离线数据批处理:SparkCore
      • 代替MapReduce
    • ② 实现交互式数据分析:SparkSQL
      • 代替Hive实现SQL分布式计算
    • ③ 实现实时数据处理:SparkStreaming / StructStreaming
      • 代替Storm/Flink
    • ④ 实现机器学习的开发:Spark ML lib
      • 代替Python中机器学习库
    • ⑤ 图计算:SparkGraphx
  • (3) 特点:
    • :所有处理和计算积极使用内存来实现
    • 通用性强:功能全面,适用于大多数大数据分析计算场景
    • 易于使用:接口丰富(Java/Python/SQL/DSL/R/Scala)
    • 随处运行:数据源接口丰富,可运行在各种资源的平台上
  • (4) 名词介绍
    • ①SparkCore:基于代码的离线批处理计算,类似于MR
    • ②SparkSQL
      • 1)离线:基于SQL实现离线批处理计算,类似于Hive
      • 2)实时:StructStreaming,使用SQL实现实时数据计算
    • ③Streaming:基于SparkCore实现实时数据计算
    • ④MLlib:机器学习算法库
    • ⑤Graphx:图计算,数据结构中的图的计算
  • (5) 使用场景
    • ①利用SparkCore和SparkSQL进行离线分析计算
    • ②利用SparkStreming和StructStreming实时分析计算(目前工作中实时主要用Flink代替Spark)

3.Spark与MapReduce对比

在这里插入图片描述

  • 问题:Spark为什么比MR快?
    • ①Spark积极使用内存式计算(RDD);
    • ②Spark使用有向无环图(DAG)执行计划,灵活性高;
    • ③Spark中的Task是线程级别:节省进程的开销,只要启动一次,直到程序结束

4.Spark的基本组成

  • (1) 基本机构组成
    在这里插入图片描述
    分布式主从架构:
    • ①主节点:master,管理节点
      • 1)管理从节点
      • 2)接受客户端请求
      • 3)管理资源
    • ②从节点:worker,计算节点
      • 1)分配资源给Execuor
      • 2)运行Executor进程,实现Task的运行
  • (2) 程序组成:任何一个程序都由两种进程构成:Driver和Executor
    • Driver进程(初始化进程或者Task管理进程)
      • 1)向主节点申请Executor资源,让主节点在从节点上根据需求配置启动对应的Executor
      • 2)解析代码逻辑:将代码中的逻辑转换为Task
        • 如果遇到RDD中的数据的使用:构建一个Job,触发Task运行
      • 3)将Task分配给Executor去运行
      • 4)监控每个Executor运行的Task状态
    • Executor进程(执行进程)
      • 1)运行在Worker上,使用Worker分配的资源等待运行Task
      • 2)所有Executor启动成功以后会向Driver进行注册
      • 3)Executor收到分配Task任务,运行Task
  • (3) 各组成部分之间的关系:
    • ①Master负责管理所有Worker资源
    • ②Driver会向Master请求启动Executor
    • ③Worker上负责运行Executor进程
    • ④Driver负责解析、分配、监控所有Task线程在Executor中运行

5.Spark中的task是怎么得到的?

  • step1:先读取数据:将数据变成一个RDD【分布式的集合】对象
  • step2:转换处理数据:将RDD不断调用转换函数进行处理,得到新的RDD
  • step3:将结果进行保存
    • Job:触发job,当用到RDD中的数据的时候,根据RDD的来源转换关系,构建DAG
    • Stage:根据Job中使用RDD的依赖关系,通过回溯算法将一个job中的逻辑关系,构建Stage,得到DAG图
      • 1)Stage的划分:按照是否产生shuffle,如果产生shuffle,就划分一个新的stage
      • 2)执行每个Stage:从Stage编号最小的开始进行执行
        • 将每个Stage转换为一个Task集合:Task集合中Task的个数由RDD的分区数据决定
    • 问题:为什么要根据shuffle划分Stage?
      答: 同一个Stage中的所有Task可以在内存中并行运行
    • Task:每个Stage会对应一定个数的Task,Task个数由Stage中RDD的分区个数决定

6.Sparkcore开发

  • (1) Sparkcore代码模板
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @Todo: SparkCore模板
 * @auther: Cjp
 * @date: 2021/7/12 16:51
 */
object SparkCoreSimpleMode {
  def main(args: Array[String]): Unit = {
    //todo:1.构建SparkContext对象
    val sc:SparkContext = {
      //构建SparkConf配置管理对象,类似于Hadoop中的configuration对象
      val conf = new SparkConf()
        .setMaster("")//指定运行模式
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))//指定运行程序的名称
        .set("key","value")//指定额外的属性
      //返回SparkContext的对象
      SparkContext.getOrCreate(conf)
    }
    //调整日志级别
    sc.setLogLevel("WARN")
    //todo:2.实现数据的转换处理
    //step1:读取数据
    //step2:处理数据
    //step3:保存结果

    //todo:3.释放SparkContext对象
    sc.stop()
  }
}
  • (2)Wordcount案例
    经典案例Wordcount——Spark版(Sparkcore/SparkDSL/SparkSQL)https://blog.csdn.net/m0_56919489/article/details/118823291
  • (3) jar包提交运行
    注:在IDEA上开发代码只是为了测试代码逻辑,正式生产和工作中程序还是要打包放到集群上进行测试和使用的
    • 基本参数选项
      • – master:指定spark程序运行环境
        • – local:本地
        • – spark://node1:7077:Standalone集群
        • – yarn:YARN集群
      • – deploy-mode:决定了Driver进程运行在本地还是某一台Worker节点上
      • – name:指定程序的名称
      • – class:指定运行哪个类
      • – jars:指定第三方jar包
      • – conf:指定配置
    • 资源选项
      • driver选项
        • – dirver-memory:分配给Driver的内存,默认分配1GB
        • – driver-cores:分配给Driver运行的CPU核数,默认分配1核
        • – supervise:故障自动重启
      • executor选项
        • – executor-memory:分配给每个Executor的内存数,默认为1G,所有集群模式都通用的选项
        • – executor-cores:分配给每个Executor的核心数,YARN集合和Standalone集群通用的选项
        • YARN:默认每个Executor分配1个core
          • – num-executors NUM:YARN模式下用于指定Executor的个数,默认启动2个
        • Standalone:默认分配所有可用的核数给每个Executor
          • – total-executor-cores NUM:Standalone模式下用于指定所有Executor所用的总CPU核数(总核数 / 每个Executor核数 = Executor的个数)
    • 示例
//(1) 本地提交
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master local[2] \
--class 
cn.test.sparkcore.SparkCoreWordCount \
~/spark-WordCount.jar \
/example/WordCount.data \
/output/WordCount
//(2) Standalone集群提交
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master spark://node1:7077 \
--class cn.test.sparkcore.SparkCoreWordCount \
hdfs://node1:8020/spark/apps/spark-WordCount.jar \
/example/WordCount.data \
/output/WordCount
//(3) 指定资源提交
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master spark://node1:7077 \
--class cn.test.sparkcore.SparkCoreWordCount \
--driver-memory 512M \
--executor-memory 512M \
--executor-cores 1 \
--total-executor-cores 2 \
hdfs://node1:8020/spark/apps/spark-WordCount.jar \
/example/WordCount.data \
/output/WordCount
//(4) Spark on YARN(client模式)
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master spark://node1:7077 \
--deploy-mode client \
--class cn.test.sparkcore.SparkCoreWordCount \
hdfs://node1:8020/spark/apps/spark-WordCount.jar \
/example/WordCount.data \
/output/WordCount
//(5) Spark on YARN(cluster模式)
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master spark://node1:7077 \
--deploy-mode cluster \
--class bigdata.itcast.cn.spark.core.wordcount.SparkCoreWordCount \
hdfs://node1:8020/spark/apps/spark-wc.jar \
/example/wc.data \
/output/wc

7.Deploymode

  • (1) 功能:决定了driver进程运行的位置
  • (2) 选项
    • ① client:默认选项,driver进程运行在客户端上
      • 问题:如果一直使用client模式,所有Spark程序的driver都运行在同一台机器,导致机器的负载比较高,Driver故障率就比较高,性能比较差
    • ② cluster:driver进程运行在Worker节点上
  • (3) Spark on YARN的client模式和cluster模式区别
    • ① client:driver和APPMaster是共存的
      • APPMaster:申请资源,运行Executor,运行在NodeManager上
      • Driver:管理所有Executor、调度Job执行,解析、分配、监控Task,运行在Client上
      • Executors:运行JVM进程,其中执行Task任务和缓存数据
    • ② cluster:driver和APPMaster合并了,整体由统一的进程来实现
      • Driver Program(AppMaster):既进行资源申请,又进行Job调度
      • Executors:运行JVM进程,其中执行Task任务和缓存数据
    • 问题:为什么要在YARN上运行SparkCore程序?
      答:为了避免同一套物理资源由多套资源管理平台管理,导致管理混乱,一般在工作中选择使用一种公共的资源管理平台来实现,这个公共的平台就是YARN,工作中一般将所有分布式计算程序全部提交在YARN上来实现运行

8.Spark运行的基本流程

在这里插入图片描述

  • step1:Driver及Executor的启动

    • (1) 程序运行启动Driver,默认启动客户端那台机器上
    • (2) Driver向Master申请资源
    • (3) 根据提交的申请,Master会在Worker上启动Executor进程
    • (4) 所有Executor会向Driver反向注册
      在这里插入图片描述
  • step2:Main方法的执行

    • (1)Driver开始解析Main方法开始的代码
    • (2)代码中出现RDD数据的使用,就会触发job
    • (3)job触发后,通过回溯算法对RDD的转换构建DAG图
      • ①回溯算法:倒推
      • ②每遇到一个Shuffle就划分一个Stage
        在这里插入图片描述
  • step3:Task的解析分配

    • (1)根据DAG从编号最小的Stage开始,转换为Task,
    • (2)每一个Stage会转换为一个TaskSet集合,TaskSet集合中会有多个Task
    • (3)Driver将TaskSet中的Task分配调度到Executor中执行
      在这里插入图片描述

9.RDD(Resilient Distributed Datasets)

  • (1) 概述:
    • ① 定义:弹性分布式数据集,具有高容错性能实现数据分布式存储的数据集合,其本质就是数据集合,拥有多个分区的数据集合

    • ② 作用:存储计算过程中产生的数据

      • 程序读取的任何数据都会进入定义的第一个RDD1中
      • 当数据开始转换处理时,RDD也会随之转换得到RDDN
    • ③ 特点

      • 核心是RDD之间的血链关系,每个RDD都会记录自己的来源
      • 优点:
        • a.分布式:RDD是有多个分区来实现分布式存储,能实现并行式的数据处理
        • b.容错能力强:通过将 RDD 间转移操作构建成有向无环图来实现的,RDD之间存在血链关系,即RDD依赖(宽依赖,窄依赖)
          • 宽依赖(Wide Dependency,也叫shuffle依赖):父 RDD的一个分区数据给了子 RDD的多个分区,涉及shuffle
            在这里插入图片描述

          • 窄依赖(Narrow Dependency):父 RDD的一个分区数据只给了子 RDD的一个分区
            在这里插入图片描述

    • 面试题:为什么要设计宽窄依赖?

      • 1)从性能的角度来考虑:
        有些不得已需要经过shuffle就用宽依赖,如果不需要经过shuffle就能完成使用窄依赖
      • 2)从数据血脉恢复角度来说
        如果宽依赖:子RDD某个分区的数据丢失,必须重新计算整个父RDD的所有分区;
        如果窄依赖:子RDD某个分区的数据丢失,只需要计算父RDD对应分区的数据即可
    • ④ RDD的数据,怎么保证安全性?
      答:血链机制:通过依赖关系来恢复RDD的数据

    • RDD的容错机制

      • 1、persist机制
        • 功能:将RDD缓存在Executor中内存或者磁盘中,避免RDD的重复构建,导致性能比较差的问题,做了缓存以后,第一次构建以后,下一次可以直接从缓存中读取
        • 应用场景
          • RDD会被使用多次【大于2次】
          • 如果RDD是经过非常复杂的过程得到的,使用了2次
        • 函数
          • cache/persist:设置缓存
          • unpersist:释放缓存
        • 缓存级别
          • 默认级别:MEMORY_ONLY
          • 常用级别:MEMORY_AND_DISK_2、
            MEMORY_AND_DISK_SER_2
  • 示例:
//只缓存在磁盘中,2表示缓存两份
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

//只缓存在内存中,2表示缓存两份,ser表示序列化以后再缓存
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

//优先缓存内存,如果内存不足,缓存磁盘
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

//堆外内存
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

//如果缓存使用结束,明确不再需要这个缓存,必须手动释放这个缓存
 wcRdd.unpersist()
  • 2、checkpoint机制
    • 功能:将RDD的数据持久化存储在HDFS上,以后读取RDD的数据,可以直接从HDFS上进行读取,不需要重新构建,数据更加安全
    • 应用:非常重要的RDD或者数据量非常大,内存空间比较小
    • 函数
      • 设置检查点目录:sc.setCheckpointDir
      • 设置检查点:rdd.checkpoint
//设置检查点:RDD数据存储的位置
sc.setCheckpointDir("datas/sparkcore/chk")
//将RDD的数据存储在HDFS中
    wcRdd.checkpoint()
  • 3、 persist与checkpoint的区别

    • a.生命周期
      • a)persist:数据自动删除:Task结束,遇到unpersist
      • b)checkpoint:只能手动删除
    • b.存储内容
      • a)persist:将整个RDD对象缓存在内存中,包含了RDD的所有内容(血脉关系和数据)
      • b)checkpoint:只存储了RDD的数据
  • (2) RDD的五大特性

    • ①A list of partitions
      每个RDD由多个分区组成
    • ②A function for computing each split
      对RDD的处理转换本质上都是RDD每个分区的转换操作(RDD是immutable,不可变,所有RDD的转换返回的是一个新的RDD)
    • ③A list of dependencies on other RDDs
      每个RDD都会记录与父RDD的依赖关系
    • ④Optionally, a Partitioner for key-value RDDs
      可选:对于二元组KV结构的RDD,在经过Shuffle的时候,可以自定义指定分区器
      Spark自带的分区器:Hash分区、范围分区
    • ⑤Optionally, a list of preferred locations to compute each split on
      可选:在Task分配计算时,可以计算最优路径解
  • (3) RDD的创建

    • ①并行化一个已经存在的集合
//新建一个1-10的集合
val seq: immutable.Seq[Int] = 1.to(10)
//将1-10的集合并行化成为一个RDD
val rdd1: RDD[Int] = sc.parallelize(seq)

sc.parallize / sc.makeRDD:【允许构建最优路径】:并行化一个集合变成一个RDD

  • ②读取外部数据创建RDD
//读取外部文件系统
val rdd2: RDD[String] = sc.textFile("datas/wordcount")
//调用Hadoop中任意一种输入类,将输入类的结果构建RDD
val rdd3 = sc.newAPIHadoopRDD("调用Hadoop中哪个输入类""返回Key类型""返回Value类型")

sc.textFile/sc.wholeTextFile/sc.newAPIHadoopRDD:读取外部数据源

  • (4) RDD分区
    RDD的分区个数决定了Task的个数,决定了并行度
    • ① 指定分区个数:在构建RDD时,传递分区个数参数
//并行化集合指定分区个数
val seq: Seq[Int] = 1.to(10)
val seqRdd: RDD[Int] = sc.parallelize(seq,numSlices = 3)
//读取外部系统指定分区个数
val inputRdd = 
sc.textFile("datas/wordcount",minPartitions = 3)
  • ② 制定分区原则:
    RDD的分区个数一般为Executor的CPU核数的2 ~ 3倍
    举例:10个Executor,每个Executor分配4coreCPU,RDD的分区数的最大范围:80 ~ 120分区

  • ③ 分区个数查看:

//获取当前RDD的分区个数
val partitions = seqRdd.getNumPartitions 
//获取当前数据对应的分区编号
TaskContext.getPartitionId()
  • ④常见的分区个数的规则
    • 1、默认:线程数,如果底层读取的是分布式系统
      自定义:指定分区个数
    • 2、分布式系统:实现1:1的分区关系
      • a.读HDFS文件:文件的一个块就自动对应RDD的一个分区
      • b.读Hbase表:表的一个region自动对应RDD的一个分区
        工作中:一般读取的都是分布式系统,一般都用默认的即可
    • 3、问题:如果RDD是通过另外一个RDD得到的,分区个数是多少?
      答:子RDD分区个数沿用父RDD的分区个数
    • 4、小文件过多,分区数太多怎么办
      答:合并读取:wholeTextFile
      示例:
//小文件读取
//有100个分区,文件中每一条数据作为一个元素
val rdd3: RDD[String] = sc.textFile("datas/ratings100")
//每个文件作为一个KV对,存储在RDD中
val rdd4: RDD[(String, String)] =
 sc.wholeTextFiles("datas/ratings100")
rdd4.take(1).foreach(tuple =>
 println(tuple._1+"---------"+tuple._2))
  • (5) RDD算子(即RDD函数)
    • 转换算子(Transformation)

      • 1、功能:实现RDD的转换操作
      • 2、特点:是lazy模式的,返回值是一个新的RDD,不会产生job和触发Task的运行
      • 3、常见算子
        • Map:构建二元组(k、v对映射)list,实现对RDD中每个元素的处理,将处理的结果放入新的RDD中进行返回
        • Flatmap:扁平化list,实现对RDD中的每个元素进行扁平化处理,将RDD中的每个集合的元素合并到一个RDD中返回
        • Filter:过滤,实现对RDD中元素的过滤,符合条件的元素会放入一个新的RDD中返回
        • Reducebykey:分组聚合
        • Sortbysortbykey:特例,经过shuffle的转换算子,使用RangePartition,做数据采样实现分区间的有序,会读取数据,触发Job去读取数据
          在这里插入图片描述
    • 触发算子(Action)

      • 1、功能:实现RDD数据的读取操作
      • 2、特点:触发job的运行,构建Task,实现物理上的RDD的数据读取、转换,返回值不为RDD类型,为普通类型或者为空
      • 3、常见算子
        • foreach:打印输出,没有返回值
        • collect:将RDD每个分区的数据放入一个数组中,返回数组
        • reduce:聚合函数,返回聚合后的结果
        • fold:聚合函数,返回聚合后的结果
        • count:统计元素的个数返回
        • Take:取RDD集合中的数据(通过单个线程从RDD的每个分区中取想要的数据)放入一个数组中,返回Driver中,返回值是数组
        • saveAsTextFile:没有返回值,保存数据到外部文件系统
          在这里插入图片描述

10.RDD常用算子

  • (1) 基本函数
    map、flatMap、filter、take、top、foreach
  • (2) 分区操作函数
    • ①函数:mapPartitions,foreachPartition
    • ②功能:对RDD的每个分区调用参数函数进行处理
    • ③场景:基于分区的资源构建
  • (3) 重分区函数
    • ①函数:repartition、coalesce
    • ②功能:调整RDD的分区个数,返回一个新的RDD
    • ③关系
      两者都用来改变RDD的partition的数量,repartition的底层调用的是coalesce方法
    • ④区别
      • repartition:用于调大分区个数,必须经过shuffle
      • coalesce(分区个数,是否经过shuffle:默认false,不shuffle):一般用于调小分区个数,如果为true,就等于repartition效果
  • (4) 聚合函数
    • ①函数
      • reduce:分区内聚合和分区间聚合逻辑一致,没有初始值
      • fold:分区内聚合和分区间聚合逻辑一致,有初始值
      • aggregate:分区内聚合和分区间聚合逻辑可以自定义,有初始值
    • ②功能:实现分布式的聚合
  • (5) PairRDD函数
    • ①特征:xxxByKey,只有二元组类型的RDD才能调用的函数
      分类:
    • 聚合函数
      • reduceByKey:用法与reduce是一致的,按照Key分组,对Value进行reduce聚合
        按照Key分组 + reduce聚合
      • aggregateByKey:用法与aggregate是一致的,按照Key分组,对Value进行aggregate聚合
        按照Key分组 + aggregate聚合
        注:reduceByKey和foldByKey,aggreageByKey底层使用的都是CombinerByKey:先分区内聚合,再分区间聚合
    • 分组函数
      • groupByKey:按照key进行分组
      • 注意:能用reduceByKey就不要使用groupByKey+reduce
        • reduceByKey:分组聚合,对每个分区内部,先分组聚合,再分区间分组聚合
        • groupByKey + reduce:将所有数据放到一起再做分组
        • reduceByKey的性能要高于groupByKey
    • 排序函数
      • sortByKey:按照Key实现排序
  • (6) 关联函数:Join
    rdd1:KV
    rdd2:KW
    rdd1.join(rdd2) = RDD:[K,(V,W)]

11.Spark读写外部数据

  • (1) 原始数据读取
    HDFS、Hbase
  • (2) 数据处理
    SparkCore
  • (3) 结果存储
    HDFS、Hbase、MySQL
  • (4) SparkCore的数据源接口
    • ①parallelize / makeRDD:将一个Scala中的集合转换为一个RDD对象
    • ②textFile / wholeTextFiles:读取外部文件系统的数据转换为一个RDD对象
    • ③newAPIHadoopRDD / newAPIHadoopFile:调用Hadoop的InputFormat来读取数据转换为一个RDD对象

12.SparkCore中的共享变量

  • (1) 共享变量
    在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量
  • (2) 广播变量(Broadcast Variables)
    • ①功能:
      • 可以将Driver中的一个变量通过广播的形式发送给Executor,将这个变量放在Executor中
      • 减少数据在网络中的IO,提高性能
      • 比如做Wordcount,过滤符号,只保留单词,统计每个单词出现的次数
        • a.常规:直接定义一个符号的集合,在每个Task中做过滤
          在这里插入图片描述

        • b.广播变量
          在这里插入图片描述

代码:

//符号的集合
val list = List("!",",","%","#")
//实现广播:构建一个广播变量
val broad = sc.broadcast(list)

val rsRdd: RDD[(String, Int)] = inputRdd
      //过滤空行
      .filter(line => line != null && line.trim.length > 0)
      //取出每个单词
      .flatMap(line => line.trim.split(" "))
      //将符号过滤掉
      .filter(word => {
          //从各自的广播变量中将数据获取出来
          val value = broad.value
          !value.contains(word) && word.trim.length >0
       })
      //转换为二元组
      .map(word => (word,1))
      //分组聚合
      .reduceByKey((temp,item) => temp+item)
  • (3) 累加器(Accumulators)
    • ①功能:
      • 实现分布式计数:一般用于结果计数或者日志调试
        • a.每个分区内部计数
        • b.再将每个分区的结果进行累加
      • 比如做Wordcount,过滤符号,只保留单词,统计每个单词出现的次数,以及统计符号的个数
        在这里插入图片描述

代码:

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @ClassName WordCountShareVariable
  * @Description TODO 实现自定义开发Wordcount程序,使用广播变量,过滤单词,并使用累加器统计符号的个数
  * @Create By     Frank
  */
object WordCountShareVariable {
  def main(args: Array[String]): Unit = {
    /**
      * step1:初始化Driver对象,SparkContext
      */
    //构建一个配置管理对象,类似于以前学的Configuration对象
    val conf = new SparkConf()
      //以当前的类名作为程序的名称
      .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      //配置本地模式运行
      .setMaster("local[2]")
//      .set("fs.defaultFS","hdfs://node1:8020")
    //构建SparkContext对象
    val sc = new SparkContext(conf)

    //调整日志级别
    sc.setLogLevel("WARN")

    /**
      * step2:读取、转换、保存
      */
    //todo:1-读取数据
    val inputRdd = sc.textFile("datas/filter/datas.input")
    //    println(s"count = ${inputRdd.count()}")
    //符号的集合
    val list = List("!",",","%","#")
    //实现广播:构建一个广播变量
    val broad = sc.broadcast(list)
    //构建一个累加器,统计所有符号的个数
    val acccnt = sc.longAccumulator("acccnt")
    //todo:2-处理数据
    val rsRdd: RDD[(String, Int)] = inputRdd
      //过滤空行
      .filter(line => line != null && line.trim.length > 0)
      //取出每个单词
      .flatMap(line => line.trim.split(" "))
      //将符号过滤掉
      .filter(word => {
          //从各自的广播变量中将数据获取出来
          val value = broad.value
          //符号出现一次,就累加计数一次
          if(value.contains(word)) acccnt.add(1L)
          !value.contains(word) && word.trim.length >0
       })
      //转换为二元组
      .map(word => (word,1))
      //分组聚合
      .reduceByKey((temp,item) => temp+item)

    //todo:3-保存结果
    rsRdd.foreach(println)
    println(s"符号的个数:${acccnt.value}")

    /**
      * step3:释放资源
      */

    Thread.sleep(10000000L)
    sc.stop()
  }
}

13.Spark shuffle

  • (1) 对比MapReduce的shuffle
    在这里插入图片描述

Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等
在这里插入图片描述

Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步
在这里插入图片描述

  • (2) Shuffle的功能
    • ① 实现全局排序、分组、分区
    • ② Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。
    • ③ Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等。
    • ④ Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步
    • ⑤ Stage划分为两种类型
      • ShuffleMapStage,在Spark 1个Job中,除了最后一个Stage之外,其他所有的Stage都是此类型
        • a.将Shuffle数据写入到本地磁盘,ShuffleWriter
        • b.在此Stage中,所有的Task称为:ShuffleMapTask
      • ResultStage,在Spark的1个Job中,最后一个Stage,对结果RDD进行操作
        • a.会读取前一个Stage中数据,ShuffleReader
        • b.在此Stage中,所有的Task任务称为ResultTask
  • (3)Spark Shuffle的发展
    • ① Spark在1.1以前的版本一直是采用Hash Shuffle的实现的方式
    • ② 到1.1版本时参考HadoopMapReduce的实现开始引入Sort Shuffle
    • ③ 在1.5版本时开始Tungsten钨丝计划,引入UnSafe Shuffle优化内存及CPU的使用
    • ④ 在1.6中将Tungsten统一到Sort Shuffle中,实现自我感知选择最佳Shuffle方式
    • ⑤ 到的2.0版本,Hash Shuffle已被删除,所有Shuffle方式全部统一到Sort Shuffle一个实现中

14.总结

  • (1) 基本概念
    • Application:用户开发的Spark应用程序,每个Spark程序包含了一个Driver进程和多个Executor进程在集群中
      • 每个程序的Driver和Executor都是独立的,不是共享的
    • Application jar:将开发好的程序打成jar包,提交集群运行,jar包中不能包含hadoop和spark的依赖jar包
    • Job:触发真正执行程序的单元,由触发函数来触发job的构建
      • 一个Application中可以有多个job
    • Stage:将一个Job中根据是否产生宽依赖来划分Stage【阶段:逻辑计划】
      • 算法:回溯算法
      • 对于整个程序来说,Stage是Application全局编号的每个阶段,为了实现不同job之间stage的结果共享
    • TaskSet:每个Stage会转换为一个TaskSet【物理计划】,Task的集合
      • 每个TaskSet中可以多个Task:Stage中的RDD的最大分区数
    • Task:物理任务,每个分区对应一个Task任务来执行
      • Task由Driver中的组件进行分配运行在Executor中,使用Executor中的资源来源
      • 每个Task使用1Core来完成运行
    • Driver:初始化进程,负责运行main方法,创建SparkContext对象
      • 申请资源:启动Executor
      • 解析、调度、监控Task
    • Executor:执行进程,运行在Worker节点上,使用Worker分配的资源负责执行Task
    • ClusterManager:分布式资源管理平台的主节点
      • Standalone:Master
      • YARN:RM
    • Worker Node:分布式资源管理平台的从节点
      • Standalone:Worker
      • YARN:NM
    • ? deploymode:决定了driver进程运行的位置,
      client【客户端】、cluster【Worker节点上】
  • (2) Spark内核调度流程(Job调度流程)
    • Step1:集群启动
      • 主——Master:管理节点
        • a.接受客户端请求
        • b.管理从节点Worker节点
        • c.资源管理
      • 从——Worker:计算节点
        • 使用自己所在节点的资源运行Executor进程:给每个Executor分配一定的资源
    • Step2:提交程序
      • Driver进程
        • a.由spark-submit脚本客户端进行启动
        • b.向主节点申请Executor资源,让主节点在从节点上根据需求配置启动对应的Executor
        • c.解析代码逻辑:将代码中的逻辑转换为Task
          • 如果遇到RDD中的数据的使用:构建一个Job,触发Task的运行
        • d.将Task分配给Executor去运行
        • e.监控每个Executor运行的Task状态
      • Executor进程
        • a.运行在Worker上,使用Worker分配的资源等待运行Task
        • b.所有Executor启动成功以后会向Driver进行注册
        • c.Executor收到分配Task任务,运行Task
    • Step3:运行程序
      • 1)Driver开始解析Main方法开始的代码
      • 2)代码中出现RDD数据的使用,就会触发job
      • 3)job触发后,DAGScheduler组件通过回溯算法对RDD的转换构建DAG图,以Shuffle划分Stage
        • DAGScheduler:专门负责构建DAG,对象封装在SparkContext
          • 根据DAG从编号最小的Stage开始,转换为TaskSet
          • 每一个Stage会转换为一个TaskSet集合,TaskSet集合中会有多个Task
          • TaskScheduler:负责调度TaskSet中Task,对象封装在SparkContext,将每个Task提交给TaskManager
            • TaskManager:负责将每个Task分配给Executor运行
          • Driver将TaskSet中的Task分配调度到Executor中执行
  • (3) Spark并行度
    • 资源并行度:Executor的个数怎么决定?
      • 原则:充分利用当前机器所有资源
      • 假设:机器10台,每台16core,32GB,现在只运行一个程序,让这个程序得到所有资源
      • CPU核数:每个Executor至少给定2Core,保证Executor中可以并行
      • 内存大小:给CPU核数的2倍
    • 数据并行度:Task的个数怎么决定?
      • 原则:Task个数由分区数决定,建议Task个数为整个Executor使用的CPU核数的2 ~ 3倍
      • 假设:启动了10个Executor,每个Executor2Core4GB
        • a.总CPU:20个
        • b. 建议:Task和分区数 = 40 ~ 60
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-17 11:59:21  更:2021-07-17 11:59:25 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 5:18:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码