Zeros Tech Zeros Tech
首页
架构
大数据
数据库
  • 面试

    • Java面试
    • 大数据面试
    • 架构面试
语言
运维
关于
  • 网站
  • 资源
  • Vue资源
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

迹_Jason

全栈工程师
首页
架构
大数据
数据库
  • 面试

    • Java面试
    • 大数据面试
    • 架构面试
语言
运维
关于
  • 网站
  • 资源
  • Vue资源
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • clickhouse

  • deep_learning

  • delta

  • doris

  • es

  • flink

  • hadoop

  • hbase

  • hive

  • kafka

  • kerberos

  • kudu

  • kylin

  • Livy

  • phoneix

  • ranger

  • spark

    • Spark 安装
    • spark_基础概念
    • spark_配置
    • Spark_SQL
    • Spark_Structured
    • spark入门
      • 拓展
      • 特点
        • Spark Core
        • Spark SQL
        • Spark Streaming
        • Mlib
        • Graphx
        • Cluster Manager
        • 应用场景
      • Partition、Executor、Job、Stage、Task
      • 使用
        • 启动
        • 启动参数
      • DataSet
        • 缓存
      • Streaming
        • StreamingContext
        • DStream
        • Transformation
        • Output Operation
        • updateStateByKey
        • foreachRDD
        • Window Operation
        • checkPoint
        • cache和persist
      • Broadcast
      • RDD
        • RDD创建
        • RDD API
        • RDD操作
        • Shuffle 操作
        • 共享变量
      • Spark && Yarn
        • Spark任务提交到Yarn
      • Spark Streaming+Kafka
      • 日志
        • 日志输出的级别
        • Spark 进化史
    • Spark性能调优
    • SparkQ&A
    • Structured Streaming和 Streaming
  • tidb

  • time_series

  • zeppelin

  • 大数据
  • spark
迹_Jason
2021-07-18

spark入门

[TOC]

# Spark入门

# 拓展

Spark结构化数据处理:Spark SQL、DataFrame和Dataset (opens new window)

# 特点

  • Spark是快速
    • 比Hadoop快速
    • Spark扩充了流行的MapReduce计算模式
    • Spark是基于内存运算
  • Spark是通用
    • 容纳了分布式系统拥有的功能
    • 批处理、迭代式计算、交互查询、流处理
    • 降低维护成本
  • Spark是开放
    • 支持Kafka和Hadoop

# Spark Core

  • 包含Spark基础功能,包含任务调度、内存管理、容错机制
  • 内部定义类RDDs(弹性分布式数据集)
  • 提供了很多API来创建和操作RDDs
  • 为其他组件提供服务

# Spark SQL

  • Spark处理结构化数据的库
  • 企业中处理报表数据

# Spark Streaming

  • 操作实时流API

# Mlib

  • 一个机器学习功能包
  • 支持集群横向拓展

# Graphx

  • 处理图的库,并进行图的并行计算
  • 继承了RDD API

# Cluster Manager

  • 集群管理
  • 常见集成Hadoop Yarn,Apache Mesos

# 应用场景

  • 对时效性比较高的场景
  • 机器学习

# Partition、Executor、Job、Stage、Task

  • Partition是作为数据的分区,Partition之间数据互相隔离
  • Executor是执行器,每个节点都存在多个Executor,每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task
  • Job,spark action的触发会生成一个job, Job会提交给DAGScheduler,分解成Stage
  • Stage,DAGScheduler 根据shuffle将job划分为不同的stage,同一个stage中包含多个task,这些tasks有相同的 shuffle dependencies。
  • Task,被送到executor上的工作单元,task简单的说每一个partition上数据处理单元,每个Stage中的Task值小于等于Partition。当数据量不是很大的时候,就会出现小于Partition的情况出现

每一次 task 只能处理一个 partition 的数据

数据集合被Partition分成若干部分,把Partition比作砖堆,Executor好比就是搬运工,负责搬运这一堆堆砖。Spark中存在action和shuffle,当遇到action的操作时候,就会创建Job,每个Job由若干Stage组成,划分Stage的是Shuffle数,Task作为这些行为的执行者,其值与Partition有关

Executor推荐值 = concurrentJobs * Partition

concurrentJobs 调整数据处理的并发度,数据消费还是有序的,其值需要小于等于cpu核数

文章 (opens new window)

#

# 使用

# 启动

启动master ./sbin/start-master.sh
启动work ./sbin/spark-class
提交作业 ./bin/spark-submit
1
2
3

打包程序、提交任务到集群 (opens new window) maven方式打包 (opens new window)

../bin/spark-submit --master yarn-client --jars ../lib/kafka_2.10-0.8.2.1.jar --class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic
1

shell方式

./spark-shell --master local[2]
1
  • 在使用shell的时候,SparkContext实例是已经创建了的,不用再编写代码创建
  • 添加代码的时候,注意不要忘了import 导入相应的包
  • local[K],K为worker线程数,理想情况下这个值设为机器的core数量

# 启动参数

spark参数调优 (opens new window)

# DataSet

在2.0之后,RDD被替换为DataSet方式,其性能优于RDD

# 缓存

频繁重复调用的时候,通过使用缓存的方式,可以提高数据的处理的性能

# Streaming

Spark Streaming 接受到实时数据流,吧数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给Spark Engine处理。

# StreamingContext

  • 一旦Context被启动,新的流处理计算就不能被添加处理的
  • 一旦Context被stop,就不能再restart
  • 一个JVM只能有一个StreamingContext
  • 调用stop()方法时候,会通知sparkContext,如果只想停止StreamingContext,就在调用stop(false)
  • 一个SparkContext可以创建多个StreamingContext

# 累加器

累加器(Accumulators)是通过一个组合和交替的操作,只能“加”的变量,因此,它能够有效地支持并行。他们常被用于实现计数器(例如在MapReduce中)或求和。Spark原生支持数值类型的计数器,程序开发人员可以增加对新类型的支持。

val accum = sc.longAccumulator("My Accumulator")
accum.add(1)
accum.value
1
2
3

# DStream

DStream代表了一连串的,不间断的数据流。

对DStream操作算子,比如Map/flatMap,其实底层会被翻译为对DStream中的每个RDD都做相同的操作,因为一个DStream是由不同批次的RDD所构成的。

# Input DStream

Input DStream是从源头产生的DStream。

每一个Input DStream都有一个Receivers(除了文件系统的数据),将数据保存到内存中。

如果存在Receivers的时候,在启动Spark程序时,不能使用local/local[1],因为一个线程被Receivers占用,没有线程去执行任务

# Transformation

与RDD操作类似,允许从Input DStream中来的数据被修改为新的DStream

# Output Operation

将DStream输出到文件系统,或者其他数据库

# updateStateByKey

带状态的算子操作

在使用带状态的算子操作时,必须要设置StramingContext.checkpoint()

updateStateByKey (opens new window)

# foreachRDD

// 遍历RDD数据
dstream.foreachRDD { rdd =>
// 遍历分区
  rdd.foreachPartition {partitionOfRecords =>
    val connection = createNewConnection()
    // 遍历每个分区下的数据,record的是一条数据
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}
1
2
3
4
5
6
7
8
9
10

foreachRDD (opens new window)

# Window Operation

定时的进行一个时间段内的数据处理 window length : 窗口长度

sliding interval:窗口的间隔

这两个参数和我们的batch size有关系:倍数

e.g. 每个多久试计算摸个范围内的数据:每隔10秒计算前10分钟的wc

window Operation (opens new window)

# checkPoint

【没有checkPoint的时候】如果出现executor发生宕机,恢复数据需要根据RDD的依赖关系从头到尾计算一遍(依赖链)。

【使用Checkpoint】checkpoint操作会每隔一段时间去记录RDD的中间计算结果,将其保存到高可用的地方(HDFS),这样就提高的数据恢复的效率。

【checkPoint与cache的区别】 checkPoint会将数据保存高可用的环境,保证数据的不丢失。cache会将数据保存到缓存到内存中,一旦executor宕机,会导致数据的丢失

【什么时候该启用checkpoint呢?】 满足以下任一条件:

  • 使用了中间状态[stateful]转换 - 如果 application 中使用了updateStateByKey或reduceByKeyAndWindow等 stateful 操作,必须提供 checkpoint 目录来允许定时的 RDD checkpoint
  • 希望能从意外中恢复 driver

Spark Streaming 会 checkpoint 两种类型的数据。

  1. Metadata(元数据) checkpointing - 保存定义了 Streaming 计算逻辑至类似 HDFS 的支持容错的存储系统。用来恢复 driver,元数据包括: 配置 - 用于创建该 streaming application 的所有配置 DStream 操作 - DStream 一些列的操作 未完成的 batches - 那些提交了 job 但尚未执行或未完成的 batches
  2. Data checkpointing - 保存已生成的RDDs至可靠的存储。这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖链随着时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链

总之,metadata checkpointing 主要用来恢复 driver;而 RDD数据的 checkpointing 对于stateful 转换操作是必要的。

【局限性】当数据被保存到HDFS之类的高可用文件系统,保存的是类checkPoint对象序列化后的数据,在Spark Streaming程序重新编译后,再去反序列化会报错,必须新建一个StreamingContext

# cache和persist

RDD 可以使用 persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。

在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据.这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法.

cache调用的就是persist方法,默认存储级别只存储在内存中。

cache和persist的区别:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。

MEMORY_ONLY 默认选项,RDD的(分区)数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不足,某些分区的数据将不会被缓存,需要在使用的时候根据血统重新计算。

MYMORY_AND_DISK RDD的数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不中,某些分区的数据会被存储至磁盘,使用的时候从磁盘读取。

MEMORY_ONLY_SER RDD的数据(Java对象)序列化之后存储于JVM的内存中(一个分区的数据为内存中的一个字节数组),相比于MEMORY_ONLY能够有效节约内存空间(特别是使用一个快速序列化工具的情况下),但读取数据时需要更多的CPU开销;如果内存空间不足,处理方式与MEMORY_ONLY相同。

MEMORY_AND_DISK_SER 相比于MEMORY_ONLY_SER,在内存空间不足的情况下,将序列化之后的数据存储于磁盘。

DISK_ONLY 仅仅使用磁盘存储RDD的数据(未经序列化)。 MEMORY_ONLY_2,MEMORY_AND_DISK_2, 以MEMORY_ONLY_2为例,MEMORY_ONLY_2相比于MEMORY_ONLY存储数据的方式是相同的,不同的是会将数据备份到集群中两个不同的节点,其余情况类似。

OFF_HEAP(experimental) RDD的数据序例化之后存储至Tachyon。相比于MEMORY_ONLY_SER,OFF_HEAP能够减少垃圾回收开销、使得Spark Executor更“小”更“轻”的同时可以共享内存

# Broadcast

Broadcast意为广播,向集群excutor广播一个只读变量。RDD不会被直接广播,需要通过collect之后,才能进行广播操作。

数据会被块管理器,切分为多个块,保存到当前任务的存储空间中,块大小没有人为4MB,支持使用压缩方式,默认是snappy压缩方式。

当Executor第一次使用广播变量时,会从Driver或者是就近的Executor中获取所需的广播变量,并将其数据加载到自己的本地,等下次读取的时候,就直接本地读取。

  • Broadcast

拓展阅读:

  • Spark2.2 广播变量broadcast原理及源码剖析 (opens new window)

# RDD

  • 弹性分布式数据集
  • spark计算操作对象

# RDD创建

Spark中创建RDD的创建方式大概可以分为三种:(1)、从集合中创建RDD;(2)、从外部存储创建RDD;(3)、从其他RDD创建。

  • 集合中创建RDD Spark主要提供了两中函数:parallelize和makeRDD
// 第一个参数为数据集合,第二个是分片数
sc.parallelize(Array(1,2,3,4),4)

1
2
3

e.g.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

先加载哪个分片是不确定的,随机

并行集合中一个很重要参数是 partitions(分区)的数量,它可用来切割 dataset(数据集)。Spark 将在集群中的每一个分区上运行一个任务。通常您希望群集中的每一个 CPU 计算 2-4 个分区。一般情况下,Spark 会尝试根据您的群集情况来自动的设置的分区的数量。当然,您也可以将分区数作为第二个参数传递到 parallelize (e.g. sc.parallelize(data, 10)) 方法中来手动的设置它。注意: 代码中的一些地方会使用 term slices (a synonym for partitions) 以保持向后兼容.

# RDD API

Spark RDD API详解(一) Map和Reduce (opens new window)

spark rdd详解二(transformation与action操作) (opens new window)

# RDD操作

RDD支持两种操作:transformation和action。

transformation 是将一种DataSet转换为另一种DataSet

action 对DataSet进行一些操作,得出一个结果

Spark 中所有的 transformations 都是 lazy(懒加载的), 因此它不会立刻计算出结果. 相反, 他们只记得应用于一些基本数据集的转换 (例如. 文件). 只有当需要返回结果给驱动程序时,transformations 才开始计算. 这种设计使 Spark 的运行更高效. 例如, 我们可以了解到,map 所创建的数据集将被用在 reduce 中,并且只有 reduce 的计算结果返回给驱动程序,而不是映射一个更大的数据集.

默认情况下,每次你在 RDD 运行一个 action 的时, 每个 transformed RDD 都会被重新计算。但是,您也可用 persist (或 cache) 方法将 RDD persist(持久化)到内存中;在这种情况下,Spark 为了下次查询时可以更快地访问,会把数据保存在集群上。此外,还支持持续持久化 RDDs 到磁盘,或复制到多个结点。

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
//  在 reduce 之前,这将导致 lineLengths 在第一次计算之后就被保存在 memory 中。
lineLengths.persist()
val totalLength = lineLengths.reduce((a, b) => a + b)
1
2
3
4
5

# Shuffle 操作

shuffle 是spark 重新分配数据的一种机制,使得这些数据可以跨不同的区域进行分组。这通常涉及在 executors 和 机器之间拷贝数据,这使得 shuffle 成为一个复杂的、代价高的操作。

尽管每个分区新 shuffle 的数据集将是确定的,分区本身的顺序也是这样,但是这些数据的顺序是不确定的。如果希望 shuffle 后的数据是有序的,可以使用:

  • mapPartitions 对每个 partition 分区进行排序,例如, .sorted
  • repartitionAndSortWithinPartitions 在分区的同时对分区进行高效的排序.
  • sortBy 对 RDD 进行全局的排序

触发的 shuffle 操作包括 repartition 操作,如 repartition 和 coalesce, ‘ByKey 操作 (除了 counting 之外) 像 groupByKey 和 reduceByKey, 和 join 操作, 像 cogroup 和 join.

# 共享变量

# Spark && Yarn

# Spark任务提交到Yarn

  • 集群模式执行 SparkPi 任务,指定资源使用,指定eventLog目录
spark-submit  --class org.apache.spark.examples.SparkPi \
--master yarn \
--conf spark.eventLog.dir=hdfs://dbmtimehadoop/tmp/spark2 \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
$SPARK_HOME/examples/jars/spark-examples*.jar \
10
1
2
3
4
5
6
7
8
9
10

numExecutors是执行的execute实例数,一个partition一个execute

# Spark Streaming+Kafka

Spark踩坑记——Spark Streaming+Kafka (opens new window)

# 日志

# 日志输出的级别

import org.apache.log4j.{Level, Logger}

object Example {
  Logger.getLogger("org").setLevel(Level.ERROR)

  def main(args: Array[String]) {
    ......
  }
}
1
2
3
4
5
6
7
8
9

日志管理 (opens new window)

livy 作为提交方式,自定义日志文件

{
  "name": "DMP EXPER ODS",
  "file": "/dmp/zyztest2/storage-ods-1.8.0-SNAPSHOT-jar-with-dependencies.jar",
  "className": "com.tairanchina.csp.dmp.components.storage.ods.ODSMain",
  "args": ["-c","http://10.200.23.146:8089/management-admin/","-p","exper","-l","feature_kerberos","--krb5ConfPath","/etc/krb5.conf","--principal","testdcpods","--keyTab","/etc/security/keytabs/testdcpods-group.keytab"],
"conf":{"spark.yarn.maxAppAttempts":1,"spark.driver.extraJavaOptions":"-Dlog4j.configuration=log4j.properties.template","spark.executor.extraJavaOptions":"-Dlog4j.configuration=log4j.properties.template"},
  "driverMemory": "1024M",
  "executorMemory": "1024M",
  "executorCores": 1,
  "numExecutors": 6
}
1
2
3
4
5
6
7
8
9
10
11

# Spark 进化史

  • [1.2] 添加Write Ahead Logs特性,防止数据的丢失
编辑 (opens new window)
上次更新: 2021/07/21, 18:22:30
Spark_Structured
Spark性能调优

← Spark_Structured Spark性能调优→

最近更新
01
权限
12-17
02
SpringGateway
12-17
03
Spock
12-17
更多文章>
Theme by Vdoing | Copyright © 2021-2021 迹_Jason | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×