SparkQ&A
# Spark QA
# Spark 是什么?
Spark 是基于 map reduce 算法模型实现的分布式计算框架,拥有Hadoop MapReduce 所具有的优点,并且解决了 Hadoop MapReduce 中的诸多缺陷。
# Spark 优化或解决了 Hadoop 哪些不足?
**减少磁盘 I/O:**随着实时大数据应用越来越多,Hadoop 作为离线的高吞吐、低响应框架已不能满足这类需求。HadoopMapReduce 的 map 端将中间输出和结果存储在磁盘中,reduce 端又需要从磁盘读写中间结果,势必造成磁盘 IO 成为瓶颈。Spark 允许将 map 端的中间输出和结果存储在内存中,reduce 端在拉取中间结果时避免了大量的磁盘 I/O。Hadoop Yarn 中的 ApplicationMaster 申请到 Container 后,具体的任务需要利用 NodeManager 从 HDFS 的不同节点下载任务所需的资源(如 Jar 包),这也增加了磁盘 I/O。Spark 将应用程序上传的资源文件缓冲到 Driver 本地文件服务的内存中,当 Executor 执行任务时直接从 Driver 的内存中读取,也节省了大量的磁盘 I/O。
**增加并行度:**由于将中间结果写到磁盘与从磁盘读取中间结果属于不同的环节,Hadoop 将它们简单的通过串行执行衔接起来。Spark 把不同的环节抽象为 Stage,允许多个 Stage 既可以串行执行,又可以并行执行。 避免重新计算:当 Stage 中某个分区的 Task 执行失败后,会重新对此 Stage 调度,但在重新调度的时候会过滤已经执行成功的分区任务,所以不会造成重复计算和资源浪费。
**可选的 Shuffle 排序:**HadoopMapReduce 在 Shuffle 之前有着固定的排序操作,而 Spark 则可以根据不同场景选择在 map 端排序或者 reduce 端排序。
**灵活的内存管理策略:**Spark 将内存分为堆上的存储内存、堆外的存储内存、堆上的执行内存、堆外的执行内存 4 个部分。Spark 既提供了执行内存和存储内存之间是固定边界的实现,又提供了执行内存和存储内存之间是“软”边界的实现。Spark 默认使用“软”边界的实现,执行内存或存储内存中的任意一方在资源不足时都可以借用另一方的内存,最大限度的提高资源的利用率,减少对资源的浪费。Spark 由于对内存使用的偏好,内存资源的多寡和使用率就显得尤为重要,为此 Spark 的内存管理器提供的 Tungsten 实现了一种与操作系统的内存 Page 非常相似的数据结构,用于直接操作操作系统内存,节省了创建的 Java 对象在堆中占用的内存,使得 Spark 对内存的使用效率更加接近硬件。Spark 会给每个 Task 分配一个配套的任务内存管理器,对 Task 粒度的内存进行管理。Task 的内存可以被多个内部的消费者消费,任务内存管理器对每个消费者进行 Task 内存的分配与管理,因此 Spark 对内存有着更细粒度的管理。
# 什么是宽依赖、窄依赖?
窄依赖指的是每一个 parent RDD 的 Partition 最多被子 RDD 的一个 Partition 使用。
宽依赖指的是多个子 RDD 的 Partition 会依赖同一个 parent RDD 的 Partition。
RDD 的每个 Partition,仅仅依赖于父 RDD 中的一个 Partition,这才是窄。
# 窄依赖类型?
OneToOneDependency 与 RangeDependency
https://blog.csdn.net/JavaMoo/article/details/78441208
# 如何区分 action 与 transformation?
如果返回的是 RDD 类型,那么这是 transformation; 如果返回的是其他数据类型,那么这是 action。
1、java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
A: 数据长时间没有进行输出操作,导致问题,可以在数据处理的时候,进行重启。
2、Spark Streaming numRecords must not be negative
A: 当 kafka topic 被删除,但 offset 不会自动进行重置,导致 spark 应用启动的时候,还是从原先记录的 offset 进行消费,但是现在这个时候的 topic 是被新建的,就可能会存在该新的 topic 中的数据 offset 值不比原先的 offset 大,从而导致两者相减为负数的情况,导致的了这个问题。
3、spark streaming assertion failed: Failed to get records for spark-executor-a-group a-topic 7 244723248 after polling for 4096
A:
Increase num.network.threads in kafka/config/server.properties, default is 3
Increase spark.streaming.kafka.consumer.poll.ms value ~! a large one ... without config spark.streaming.kafka.consumer.poll.ms, it's using the spark.network.timeout, which is 120s -- causing some problem
4、Spark 报错:exceeding memory limits. 4.9 G B of 4.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
A: 问题原因:当 executor 的内存使用大于 executor-memory 与 executor.memoryOverhead 的加和时,Yarn 就会杀掉这些 executor
解决办法:根据提示,设置–conf spark.yarn.executor.memoryOverhead=2048,单位是 M。
其实解决办法有三种:
1.添加配置项–conf spark.yarn.executor.memoryOverhead=2048,单位是 M(推荐)
2.增大配置项 executor-memory 的值
3.增大并行度,–executor-cores 3
5、
18/12/26 13:55:08 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from prod-dmp13.fengdai.org/10.211.6.13:36083 is closed
18/12/26 13:55:08 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from prod-dmp13.fengdai.org/10.211.6.13:36083 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1354)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:917)
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
A: spark.network.timeout=200s
http://support.huawei.com/hedex/pages/EDOC1100006861YZH0302P/01/EDOC1100006861YZH0302P/01/resources/zh-cn_topic_0096296861.html
6、
ERROR YarnClusterScheduler: Lost executor 2 on prod-dmp17.fengdai.org: Container marked as failed: container_e09_1545325577121_0461_01_000003 on host: prod-dmp17.fengdai.org. Exit status: 143. Diagnostics: [2018-12-28 13:08:41.906]Container killed on request. Exit code is 143
A:
spark job 运行参数优化 一般 Spark Job 很多问题都是来源于系统资源不够用,通过监控日志等判断是内存资源占用过高等导致的问题,因此尝试通过配置参数的方法来解决。 1、--conf spark.akka.frameSize=100 此参数控制 Spark 中通信消息的最大容量(如 task 的输出结果),默认为 10M,当处理大数据时,task 的输出可能会大于这个值,需要根据实际数据设置一个更高的值。 2、--conf spark.yarn.executor.memoryOverhead=4096 executor 堆外内存设置,如果程序使用了大量的堆外内存,就该增大此配置。
http://www.voidcn.com/article/p-qkzmdghu-yk.html