2.1总体架构:
任务调度的两个主要模块: DAGScheduler (负责划分不同阶段)和TaskScheduler (将不同阶段的TaskSet提交给集群中的执行程序),执行程序后的结果为dricutor
2.2计划程序实现:
1 ) org.Apache.spark.scheduler.Dag scheduler
2 ) org.Apache.spark.scheduler.scheduler back end (trait ) )。
分配当前可用的资源。 具体来说,将计算资源(Executor )分配给当前正在等待分配计算资源的Task,用分配的Executor启动Task,完成计算的调度。 org.Apache.spark.scheduler.coarsegrainedschedulerber
3 ) org.Apache.spark.scheduler.task scheduler (trait ) :从DAGScheduler接收不同Stage的任务并将其提交给不同的执行程序
org.Apache.spark.scheduler.scheduler back end # review offers调用场景:
1 )有新任务提交时
2 )任务执行失败
3 )如果计算节点(Executor不可用)
4 )某些任务执行太慢,需要重新分配资源时
调度后退是APP应用程序选择的部署模式(包括local、master、yarn和mesos的部署方法)、调度后退任务调度器(schedulerbackend-taskscheduler ) )
任务时间表逻辑示意图:
客户端端:
1 )生成作业(APP执行操作符,作业封装) )。
2 )调试计划程序划分多个级(stage(shuffermapstage和ResulStage,RDD之间的广泛依赖) (父RDD的分区对应于多个子RDD的分区) )
3 )调试计划器提交给Stage
4 )诊断程序生成需要计算的Partition的任务集
5 )任务计划器提交计算任务
6 )调度程序SchedulableBuilder调度任务
7 )任务计划器为任务分配资源
8 )排程器备份结束将任务提交给执行器执行
4.2通过诊断计划程序进一步了解
在DAGScheduler中划分的不同Stage的每个Partition上运行的Task逻辑完全相同,并封装在适当的TaskSet中
4.2.1创建Dag计划程序和任务计划程序
task scheduler=sc.createtaskscheduler (//创建task scheduler
DAGScheduler=newdagscheduler (this )创建Dag scheduler
这个构造函数的实现是defthis(sc:sparkcontext )=this ) sc,sc.taskScheduler
初始化DAGScheduler时,将创建dagschedulereventprocessactor,这是一种主要用于调用DAGScheduler来处理来自DAGScheduler的各种信息的异步并发机制
DAGSchedulerActorSupervisor只进行事件处理器的制作
4.2.2提交作业
1 ) count (操作操作员触发作业) )。
2 ) sc.runJob(sparkcontext执行job ) )。
3 ) Dag scheduler.run job (Dag scheduler执行job ) )。
4 ) Dag scheduler.submit job (Dag scheduler提交作业) )。
5 ) eventprocessactor.receive (jobsubmitted ) (加速器接收来自DAGScheduler的消息的job submitted ) ) )。
6 ) DAGScheduler.handlejobsubmitted )调用Dag scheduler的方法处理事件
dagScheduler.submitJob () )的执行过程:
1 )创建作业id
2 )制作JobWaiter拦截该Job的执行情况,如果Job的所有Task的执行成功,则Job的执行成功
3 )调试程序将该作业提交给事件处理器
eventProcessActor !JobSubmitted(jobId,rdd,func2,partitions.toArray,allowLocal,callSite,waiter,propertied)
4)dagScheduler.handleJobSubmitted() //处理该事件
4.2.3 Stage的划分
dagScheduler.handleJobSubmitted() 执行中会根据RDD创建finalStage
实现原理:
finalStage=newStage(finalRDD,partitions,size,None,jobId,callSite)//根据jobId和finalRDD创建finalStage
newStage()根据当前Stage的parent stages
val parentStages=getParentStages(rdd,jobId)//创建当前RDD所在Stage的parentStage
val id=newStageId.getAndIncrement() //创建StageId
val stage=newStage(id,rdd,numTasks,shuffleDep,parentStages,jobId,callSite); //创建stage
判断当前RDD是否被访问,如果该RDD未被访问,则判断其依赖dep,若为ShuffleDependency,则创建shuffleMapStage,否则将其父RDD压入待访问的队列中
getShuffleMapStage获取ShuffleDependency所依赖的Stage,如果没有则创建新的Stage,否则沿用此依赖的Stage
ShuffleMapTask的计算结果会传给Driver端的mapOutputTracker(保存shuffle结果的位置信息),下游的Task可以据此查询结果,实际的结果保存在registerMapOutputs
窄依赖(narrow dependency)子RDD的partition唯一对应父RDD的partition,Task可并行执行
宽依赖(wide dependency)子RDD的partition对应父RDD的多个partition,Task不可并行
Action触发的Job总是从最后一个RDD开始划分Stage(从后向前划分),当执行到dagScheduler.handleJobSubmitted()时开始划分Stage
4.2.4 任务生成
finalStage=newStage(finalRDD,partitions,size,None,jobId,callSite)创建好finalStage后生成ActiveJob
val job=new ActiveJob(jobId,finalStage,func,partitions,callsite,listener,properties);//准备计算该finalStage
handleJobSubmitted调用submitStage提交Stage,只有其parenStage提交完成后才能提交此Stage(即从前向后提交Stage)
submitStage的实现原理:
1)val jobId=activeJobForStage(stage) //创建JobId
2)getMissingParentStages(stage)//返回该stage的parent stage
3)若此stage的parent为空则直接提交该Stage submitMissingTasks(stage,jodId.get);否则递归提交该stage的parent stagesubmitStage(parent)
submitMissingTasks(stage,jobId.get)向TaskScheduler提交stage的Tasks,其实现原理:
1)取得需要计算的partitons
2)若为finalStage则对应的Task为ResultTask,则会判断该Partition的ResultTask是否结束,如果已经结束则无需计算;若为其他Stage则对应的Task是ShuffleMapTask,只需要判断该Stage是够存在缓存结果,在判断哪些partition需要计算后,会为每个partition生成Task,然后封装成一个TaskSet提交给TaskScheduler
4.3 任务调度实现详解
4.3.1 TaskScheduler的创建
sc.createTaskScheduler() //创建TaskScheduler的实现原理
Standalone模式创建TaskScheduler和SchedulerBackend的原理
master match{
case SPARK_REGEX(sparkUrl)=>
val scheduler = new TaskSchedulerImple(sc)//创建TaskScheduler
val masterUrls=sparkUrl.split(“,”).map(“spark://”+_)//得到多个masterUrls Master/Worker模式
val backend=new SparkDeploySchedulerBackend(scheduler,sc,masterUrls)//根据masterUrls创建SchedulerBackend
scheduler.initialize(backend) //初始化schedulerbackend
(backend,scheduler) //返回值
}
当TaskScheduler创建完成后,会周期性的调用org.apache.spark.scheduler.TaskSetManager.checkSpeculatableTasks检查是否存在推测执行的Task(解决慢任务)
4.3.2 Task的提交
Driver端调用栈:
1)TaskSchedulerImpl.submitTasks//封装TaskSet并创建TaskSetManager
val manager=newTaskSetManager(this,taskSet,maxTaskFailures)
manager用于监控Task的执行状态并采取相应的措施,比如失败重试,慢任务的推测性执行
2)SchedulableBuilder.addTaskSetManager()
SchedulableBuilder是Application级别的调度器,支持FIFO和FAIR公平调度策略,从而决定TaskSetManager的调度顺序,然后由TaskSetManager根据就近原则(数据本地性)确定Task运行在哪个Executor上
3)CoarseGrainedSchedulerBackend.reviveOffers //SchedulerBackend接收TaskSet
4)CoarseGrainedSchedulerBackend.DriverActor.makeOffers
5)TaskSchedulerImpl.resourceOffers //为Task分配Executor资源,输入为Executor的列表(可用资源),输出TaskDescription的二维数组(TaskID,ExecutorID,Task执行环境的依赖关系)
6)CoarseGrainedSchedulerBackend.DriverActor.launchTasksExecutor启动并执行Task
4.3.3 任务调度的具体实现
根据schedulingMode的不同创建FIFOSchedulableBuilder or FairSchedulableBuilder,对于FIFO而言rootPool包含一组TaskSetManager;对于FAIR而言rootPool包含一组Pool。这些Pool构成调度树,每个叶子节点即为TaskSetManager
FIFO调度策略:对于任意Task而言,从JobID较小的开始调度,若属于同一个Job则从StageID小的开始调度
FAIR调度策略:首先挂到rootPool虾米那的pool先确定调度顺序,然后在每个pool内部使用相同的算法确定TaskSetManager的调度顺序,一旦确定调度顺序,TaskScheduler便可以将Task发送给Executor执行
4.3.4 Task运行结果的处理
Task在Executor上执行完成后,会通过向Driver发送StatusUpdate的消息来通知Driver任务的状态更新为TaskState.FINISHED,并通知TaskScheduler,TaskScheduler重新分配计算任务给该Executor(资源复用)
TaskSchedulerImpl.statusUpdate根据Task的状态信息若为FINISHED则标记该任务结束并处理任务的计算结果;否则标记该任务结束并处理任务失败的情况
TaskScheduler处理每次结果都由一个Daemon线程池负责,默认的线程池是4个线程
Executor回传计算结果给Driver时的策略:
1)若结果大于1GB则直接丢弃
2)若结果较大,则以tid为key存入BlockManager
3)若结果不大,则直接传给Driver
taskResultGetter.enqueueSuccessfulTask(taskSet,tid,state,serializedData)的核心部分是
scheduler.handleSuccessfulTask(taskSetManager,tid,result)负责处理计算结果的调用栈过程:
1)TaskSchedulerImpl.handleSuccessfulTask
2)TaskSetManager.handleSuccessfulTask
3)DAGScheduler.taskEnded
4)DAGScheduler.eventProcessActor
5)DAGScheduler.handleTaskCompletion
对于ResultTask则调用JobWaiter告知调用者任务已结束,当所有Task都执行结束后则标记Job结束;对于ShuffleMapTask则需要将结果保存到Stage,当Stage的所有Task都执行结束后则将整体结果注册到MapOutputTrackerMaster中
若Stage中所有Task都返回了,但部分数据为空,则需要重新提交submitStage(stage),若该Stage成功返回,且其不存在parent stage或者parent stage均提交了,则该stage可以提交submitMissiontTasks(stage,jobId),若该stage为finalstage,则整个Job执行完成
taskResultGetter.enqueueFailedTask(taskSetManager,tid,taskState)的核心部分是
scheduler.handleFailedTask(taskSetManager,tid,taskState,reason)
taskSetManager.handleFailedTask(tid,taskState,reason)
TaskSetManager会根据失败的原因采用不同的动作,允许最多重试4次(即会标记重试任务为等待调度),若超过最大重试次数则直接标记失败