sparkstreaming 停止任务什么时候会停

原文链接:
 &Streaming提供了高效便捷的流式处理模式,但是在有些场景下,使用默认的配置达不到最优,甚至无法实时处理来自外部的数据,这时候我们就需要对默认的配置进行相关的修改。由于现实中场景和数据量不一样,所以我们无法设置一些通用的配置(要不然&Streaming开发者就不会弄那么多参数,直接写死不得了),我们需要根据数据量,场景的不同设置不一样的配置,这里只是给出建议,这些调优不一定试用于你的程序,一个好的配置是需要慢慢地尝试。
  1、设置合理的批处理时间(batchDuration)。
  在构建StreamingContext的时候,需要我们传进一个参数,用于设置&Streaming批处理的时间间隔。Spark会每隔batchDuration时间去提交一次Job,如果你的Job处理的时间超过了batchDuration的设置,那么会导致后面的作业无法按时提交,随着时间的推移,越来越多的作业被拖延,最后导致整个Streaming作业被阻塞,这就间接地导致无法实时处理数据,这肯定不是我们想要的。
  另外,虽然batchDuration的单位可以达到毫秒级别的,但是经验告诉我们,如果这个值过小将会导致因频繁提交作业从而给整个Streaming带来负担,所以请尽量不要将这个值设置为小于500ms。在很多情况下,设置为500ms性能就很不错了。
  那么,如何设置一个好的值呢?我们可以先将这个值位置为比较大的值(比如10S),如果我们发现作业很快被提交完成,我们可以进一步减小这个值,知道Streaming作业刚好能够及时处理完上一个批处理的数据,那么这个值就是我们要的最优值。
  2、增加Job并行度
  我们需要充分地利用集群的资源,尽可能的将Task分配到不同的节点,一方面可以充分利用集群资源;另一方面还可以及时的处理数据。比如我们使用Streaming接收来自Kafka的数据,我们可以对每个Kafka分区设置一个接收器,这样可以达到负载均衡,及时处理数据(关于如何使用Streaming读取Kafka中的数据,可以参见和)。
  再如类似reduceByKey()和Join函数都可以设置并行度参数。
  3、使用Kryo系列化。
  Spark默认的是使用Java内置的系列化类,虽然可以处理所有自继承java.io.Serializable的类系列化的类,但是其性能不佳,如果这个成为性能瓶颈,可以使用Kryo系列化类,关于如何在Spark中使用Kroy,请参见。使用系列化数据可以很好地改善GC行为。
  4、缓存需要经常使用的数据
  对一些经常使用到的数据,我们可以显式地调用rdd.cache()来缓存数据,这样也可以加快数据的处理,但是我们需要更多的内存资源。
  5、清除不需要的数据
  随着时间的推移,有一些数据是不需要的,但是这些数据是缓存在内存中,会消耗我们宝贵的内存资源,我们可以通过配置spark.cleaner.ttl为一个合理的值;但是这个值不能过小,因为如果后面计算需要用的数据被清除会带来不必要的麻烦。而且,我们还可以配置选项spark.streaming.unpersist为true(默认就是true)来更智能地去持久化(unpersist)RDD。这个配置使系统找出那些不需要经常保有的RDD,然后去持久化它们。这可以减少Spark RDD的内存使用,也可能改善垃圾回收的行为。
  6、设置合理的GC
  GC是程序中最难调的一块,不合理的GC行为会给程序带来很大的影响。在集群环境下,我们可以使用并行Mark-Sweep垃圾回收机制,虽然这个消耗更多的资源,但是我们还是建议开启。可以如下配置:
spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
  更多的关于GC行为的配置,请参考Java垃圾回收相关文章。这里就不详细介绍了。
  7、设置合理的CPU资源数
  很多情况下Streaming程序需要的内存不是很多,但是需要的CPU要很多。在Streaming程序中,CPU资源的使用可以分为两大类:(1)、用于接收数据;(2)、用于处理数据。我们需要设置足够的CPU资源,使得有足够的CPU资源用于接收和处理数据,这样才能及时高效地处理数据。
阅读(...) 评论()spark streaming处理日志落地靠谱吗? - 知乎22被浏览2596分享邀请回答41 条评论分享收藏感谢收起1添加评论分享收藏感谢收起查看更多回答&&&&&&&&&&&
本期内容:
1 解密Spark Streaming运行机制
2 解密Spark Streaming架构
  一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。
  Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。
  我们知道Spark Core处理的每一步都是基于RDD的,RDD之间有依赖关系。上图中的RDD的DAG显示的是有3个Action,会触发3个job,RDD自下向上依赖,RDD产生job就会具体的执行。从DSteam Graph中可以看到,DStream的逻辑与RDD基本一致,它就是在RDD的基础上加上了时间的依赖。RDD的DAG又可以叫空间维度,也就是说整个Spark Streaming多了一个时间维度,也可以成为时空维度。
  从这个角度来讲,可以将Spark Streaming放在坐标系中。其中Y轴就是对RDD的操作,RDD的依赖关系构成了整个job的逻辑,而X轴就是时间。随着时间的流逝,固定的时间间隔(Batch Interval)就会生成一个job实例,进而在集群中运行。
  对于Spark Streaming来说,当不同的数据来源的数据流进来的时候,基于固定的时间间隔,会形成一系列固定不变的数据集或event集合(例如来自flume和kafka)。而这正好与RDD基于固定的数据集不谋而合,事实上,由DStream基于固定的时间间隔行程的RDD Graph正是基于某一个batch的数据集的。
  从上图中可以看出,在每一个batch上,空间维度的RDD依赖关系都是一样的,不同的是这个五个batch流入的数据规模和内容不一样,所以说生成的是不同的RDD依赖关系的实例,所以说RDD的Graph脱胎于DStream的Graph,也就是说DStream就是RDD的模版,不同的时间间隔,生成不同的RDD Graph实例。
  从Spark Streaming本身出发:
  1.需要RDD DAG的生成模版:DStream Graph
  2需要基于Timeline的job控制器
  3需要inputStreamings和outputStreamings,代表数据的输入和输出
  4具体的job运行在Spark Cluster之上,由于streaming不管集群是否可以消化掉,此时系统容错就至关重要
  5事务处理,我们希望流进来的数据一定会被处理,而且只处理一次。在处理出现崩溃的情况下如何保证Exactly once的事务语意。
  从源码解读DStream
  从这里可以看出,DStream就是Spark Streaming的核心,就想Spark Core的核心是RDD,它也有dependency和compute。更为关键的是下面的代码:
这是一个HashMap,以时间为key,以RDD为value,这也正应证了随着时间流逝,不断的生成RDD,产生依赖关系的job,并通过jobScheduler在集群上运行。再次验证了DStream就是RDD的模版。
  DStream可以说是逻辑级别的,RDD就是物理级别的,DStream所表达的最终都是通过RDD的转化实现的。前者是更高级别的抽象,后者是底层的实现。DStream实际上就是在时间维度上对RDD集合的封装,DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操作就是在固定时间上操作RDD。
  总结:
  在空间维度上的业务逻辑作用于DStream,随着时间的流逝,每个Batch Interval形成了具体的数据集,产生了RDD,对RDD进行transform操作,进而形成了RDD的依赖关系RDD DAG,形成job。然后jobScheduler根据时间调度,基于RDD的依赖关系,把作业发布到Spark Cluster上去运行,不断的产生Spark作业。
资料来源于:DT_大数据梦工厂(Spark发行版本定制)
更多私密内容,请关注微信公众号:DT_Spark
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:
阅读(...) 评论()spark(27)
1、为什么引入Backpressure
默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time & batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔。这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能完全处理当前接收速率接收的数据。如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk,
则内存存放不下的数据会溢写至disk, 加大延迟)。Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate
”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力。
2、Backpressure
Spark Streaming Backpressure: 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。通过属性“spark.streaming.backpressure.enabled
”来控制是否启用backpressure机制,默认值false,即不启用。
2.1 Streaming架构如下图所示(详见Streaming数据接收过程文档和Streaming 源码解析)
2.2 BackPressure执行过程如下图所示:
  在原架构的基础上加上一个新的组件RateController,这个组件负责监听“OnBatchCompleted”事件,然后从中抽取processingDelay 及schedulingDelay信息. Estimator依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream将rate通过ReceiverTracker与ReceiverSupervisorImpl转发给BlockGenerator(继承自RateLimiter).
3、BackPressure 源码解析
3.1 RateController类体系
RatenController 继承自StreamingListener. 用于处理BatchCompleted事件。核心代码为:
* A StreamingListener that receives batch completion
updates, and maintains
* an estimate of the speed at which this stream should ingest messages,
* given an estimate computation from a `RateEstimator`
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newRate = pute(time, elems, workDelay, waitDelay)
newRate.foreach { s =&
rateLimit.set(s.toLong)
publish(getLatestRate())
def getLatestRate(): Long = rateLimit.get()
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
processingEnd &- batchCompleted.batchInfo.processingEndTime
workDelay &- batchCompleted.batchInfo.processingDelay
waitDelay &- batchCompleted.batchInfo.schedulingDelay
elems &- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
3.2 RateController的注册
JobScheduler启动时会抽取在DStreamGraph中注册的所有InputDstream中的rateController,并向ListenerBus注册监听. 此部分代码如下:
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug(&Starting JobScheduler&)
eventLoop = new EventLoop[JobSchedulerEvent](&JobScheduler&) {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError(&Error in job scheduler&, e)
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
inputDStream &- ssc.graph.getInputStreams
rateController &- inputDStream.rateController
} ssc.addStreamingListener(rateController)&/span&
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo(&Started JobScheduler&)
3.3 BackPressure执行过程分析
BackPressure 执行过程分为BatchCompleted事件触发时机和事件处理两个过程
3.3.1 BatchCompleted触发过程
对BatchedCompleted的分析,应该从JobGenerator入手,因为BatchedCompleted是批次处理结束的标志,也就是JobGenerator产生的作业执行完成时触发的,因此进行作业执行分析。
Streaming 应用中JobGenerator每个Batch Interval都会为应用中的每个Output Stream建立一个Job, 该批次中的所有Job组成一个Job Set.使用JobScheduler的submitJobSet进行批量Job提交。此部分代码结构如下所示
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, &true&)
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
case Success(jobs) =&
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =&
jobScheduler.reportError(&Error generating jobs for time & + time, e)
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
其中,sumitJobSet会创建固定数量的后台线程(具体由“spark.streaming.concurrentJobs”指定),去处理Job Set中的Job. 具体实现逻辑为:
(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo(&No jobs added for time & + jobSet.time)
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job =& jobExecutor.execute(new JobHandler(job)))
logInfo(&Added jobs for time & + jobSet.time)
其中JobHandler用于执行Job及处理Job执行结果信息。当Job执行完成时会产生JobCompleted事件. JobHandler的具体逻辑如下面代码所示:
  当Job执行完成时,向eventLoop发送JobCompleted事件。EventLoop事件处理器接到JobCompleted事件后将调用handleJobCompletion 来处理Job完成事件。handleJobCompletion使用Job执行信息创建StreamingListenerBatchCompleted事件并通过StreamingListenerBus向监听器发送。实现如下:
(job: Job, completedTime: Long) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
job.setEndTime(completedTime)
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo(&Finished job & + job.id + & from job set of time & + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo(&Total delay: %.3f s for time %s (execution: %.3f s)&.format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
job.result match {
case Failure(e) =&
reportError(&Error running job & + job, e)
3.3.2、BatchCompleted事件处理过程
StreamingListenerBus将事件转交给具体的StreamingListener,因此BatchCompleted将交由RateController进行处理。RateController接到BatchCompleted事件后将调用onBatchCompleted对事件进行处理。
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
processingEnd &- batchCompleted.batchInfo.processingEndTime
workDelay &- batchCompleted.batchInfo.processingDelay
waitDelay &- batchCompleted.batchInfo.schedulingDelay
elems &- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  onBatchCompleted会从完成的任务中抽取任务的执行延迟和调度延迟,然后用这两个参数用RateEstimator(目前存在唯一实现PIDRateEstimator,proportional-integral-derivative (PID) controller,
)估算出新的rate并发布。代码如下:
* Compute the new rate limit and publish it asynchronously.
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newRate = pute(time, elems, workDelay, waitDelay)
newRate.foreach { s =&
rateLimit.set(s.toLong)
publish(getLatestRate())
其中publish()由RateController的子类ReceiverRateController来定义。具体逻辑如下(ReceiverInputDStream中定义):
private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit =
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
publish的功能为新生成的rate 借助ReceiverTracker进行转发。ReceiverTracker将rate包装成UpdateReceiverRateLimit事交ReceiverTrackerEndpoint
def sendRateUpdate(streamUID: Int, newRate: Long):
Unit = synchronized {
if (isTrackerStarted) {
endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
ReceiverTrackerEndpoint接到消息后,其将会从receiverTrackingInfos列表中获取Receiver注册时使用的endpoint(实为ReceiverSupervisorImpl),再将rate包装成UpdateLimit发送至endpoint.其接到信息后,使用updateRate更新BlockGenerators(RateLimiter子类),来计算出一个固定的令牌间隔。
其中RateLimiter的updateRate实现如下:
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate & 0) {
if (maxRateLimit & 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
rateLimiter.setRate(newRate)
setRate的实现 如下:
public final void setRate(double permitsPerSecond) {
Preconditions.checkArgument(permitsPerSecond & 0.0
&& !Double.isNaN(permitsPerSecond), &rate must be positive&);
synchronized (mutex) {
resync(readSafeMicros());
double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerS
this.stableIntervalMicros = stableIntervalM
doSetRate(permitsPerSecond, stableIntervalMicros);
到此,backpressure反压机制调整rate结束。
4.流量控制点
  当Receiver开始接收数据时,会通过supervisor.pushSingle()方法将接收的数据存入currentBuffer等待BlockGenerator定时将数据取走,包装成block. 在将数据存放入currentBuffer之时,要获取许可(令牌)。如果获取到许可就可以将数据存入buffer, 否则将被阻塞,进而阻塞Receiver从数据源拉取数据。
* Push a single data item into the buffer.
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
//获取令牌
synchronized {
if (state == Active) {
currentBuffer += data
throw new SparkException(
&Cannot add data as BlockGenerator has not been started or has been stopped&)
throw new SparkException(
&Cannot add data as BlockGenerator has not been started or has been stopped&)
其令牌投放采用令牌桶机制进行, 原理如下图所示:
  令牌桶机制: 大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回。
  Streaming 数据流被Receiver接收后,按行解析后存入iterator中。然后逐个存入Buffer,在存入buffer时会先获取token,如果没有token存在,则阻塞;如果获取到则将数据存入buffer. 然后等价后续生成block操作。
文/曹振华(简书作者)
原文链接:/p/87e2d66d92bb
著作权归作者所有,转载请联系作者获得授权,并标注“简书作者”。
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:100837次
积分:1589
积分:1589
排名:千里之外
原创:45篇
转载:126篇
(3)(18)(14)(3)(5)(3)(1)(8)(9)(19)(10)(3)(10)(6)(1)(17)(20)(10)(2)(2)(1)(4)(5)
(window.slotbydup = window.slotbydup || []).push({
id: '4740881',
container: s,
size: '200,200',
display: 'inlay-fix'

我要回帖

更多关于 spark streaming sql 的文章

 

随机推荐