麻绳先生

做一些记录性的工作

spark第一次笔记

hadoop历史

2011年发布1.0版本,2012年发布稳定版,2013发布2.x版本(Yarn)

缺点

  • 基于数据集的计算,所以面向数据。从存储介质中获取数据,然后进行计算,最后将结果存储到介质中。所以主要应用于一次性计算,不适合数据挖掘和机器学习这些迭代计算和图形挖掘计算。
  • mapreduce基于文件存储介质的操作,性能差
  • mapreduce和hadoop紧密耦合

Yarn版本

ResourceManager、ApplicationMaster、Driver、NodeManager

spark历史

2013年6月正式发布,spark基于hadoop1.x架构思想设计思想。spark计算基于内存,并且基于Scala语法开发,所以天生适合迭代式计算。

HDFS-Yarn-Spark

Spark下载地址

http://spark.apache.org

重要角色

Driver(驱动器)

Executer(执行器)

Spark-Yarn运行模式简图

Spark-Yarn运行模式简图

Standalone模式

RDD

RDD(Resilient Distributed Dataset)称为弹性分布式数据集,是Spark中最基本的数据或计算抽象。代码中是一个抽象类,代表一个不可变、可分区、里面的元素可进行并行计算的集合。

RDD属性

  • 一组分区Partition,即数据集的基本组成单位
  • 一个计算每个分区的函数
  • RDD之间的依赖关系
  • 一个Partitioner,即RDD的分片函数
  • 一个列表,存储存取每个Partiton的优先位置preferred location

大数据窍门:移动数据不如移动计算

RDD特点

RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必须的信息,RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。

Spark中所有RDD方法都称为算子,共分为两大类,转化算子和行动算子。

缓存

如果应用程序多次使用同一个RDD,可以将该RDD缓存起来。

RDD的创建

Spark中创建RDD的方法有三种:从集合中创建、从外部存储创建、从其他RDD创建。

集合创建

1
2
3
//前者调用后者
sc.makeRDD(List(1, 2, 3, 4))
sc.parallelize(Array(1, 2, 3, 4))

外部存储创建

1
2
3
//默认情况下,可以读取项目路径,也可以读取其他HDFS路径
val value:RDD[String] = sc.textField("in")
//涉及hadoop读取文件的分片规则

RDD的转换

Value类型

map(function)

返回一个新的RDD,该RDD由每一个输入元素经过function函数转换后组成。例如通过一个数组RDD得到一个每个元素被乘以2的新的RDD。

mapPartitions

mapPartitions效率优于map算子,减少了发送到执行器执行交互次数。但是可能会出现内存溢出OOM。一次处理一个分区的数据。

mapPartitionsWithIndex(func)

作用是,func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) -> Iterator[U]。

Driver和Executor关系

Driver就是创建Spark上下文对象的应用程序,Executor就是用于提取任务并执行。

flatMap
glom

将每一个分区形成一个数组,形成新的RDD类型为RDD[Array[T]]。

groupBy & filter

分组,按照传入函数的返回值进行分组,将相同的key对应的值放入一个迭代器。

sample(withRepalcement, fraction, seed)

以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示抽出的数据是否放回,true表示有放回,seed用于指定随机数生成器种子。

distinct
coalesce
repartition & sortBy

双Value类型交互

union
cartesian

Key-Value类型

partitionBy
groupBykey
reduceBykey
aggregateBykey
foldByKey & combinByKey
sortByKey & mapValueByKey

方法不同,底层实现和代码的执行位置很不同,
是的Driver执行还是在Executor执行?

RDD的依赖

窄依赖

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖被比喻为独生子女。

宽依赖

宽依赖指的是每一个父RDD的Partition可以被多个子RDD的Partition使用。

DAG

任务划分

宽依赖放在不同的stage,窄依赖放在同一个stage。

  1. Application:初始化一个SparkContext即生成一个Application;
  2. Job:一个Action算子就会生成一个Job;
  3. Stage:根据RDD依赖关系,将Job划分成不同的Stage,一个宽依赖就划分一个新的Stage;
  4. Task:Stage是一个TaskSet,将Stage划分的结果发送到不同Executor执行即为一个Task;
  5. 上述四者每一层都是一对n的关系;

RDD的缓存

通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中。但是并不是这两个方法调用时立刻缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,供后面重用。

此外还有checkpoint功能,通过函数setCheckpointDir(dir)设置。

一般在血缘关系比较长的时候使用。

RDD分区器

Spark三大数据结构

  • RDD:分布式数据集
  • 广播变量:分布式只读共享变量 broadcast,调优策略
  • 累加器:分布式只写共享变量,LongAccumulator

Spark中的数量

  • Executor:默认有2个;可以通过提交参数设置;
  • partition:默认情况,读取文件采用Hadoop的切片规则,如果读取内存中的数据,可以根据特定的算法设定,可以通过其他算子进行改变;多个阶段的场合,下一个阶段分区的数量受到上一个阶段分区的影响;
  • Stage:ResultStage + Shuffle依赖的数量,划分阶段的目的就是为了任务执行的等待,因为Shuffle的过程需要落盘;
  • Task:原则上一个分区就是一个任务,但实际中,可以动态调整;