首页 > 编程知识 正文

controller注解和restcontroller,nginx模块开发与详解

时间:2023-05-05 17:15:27 阅读:154289 作者:1932

2.1总体架构:

任务调度的两个主要模块: DAGScheduler (负责划分不同阶段)和TaskScheduler (将不同阶段的TaskSet提交给集群中的执行程序),执行程序后的结果为dricutor

2.2计划程序实现:

1 ) org.Apache.spark.scheduler.Dag scheduler

2 ) org.Apache.spark.scheduler.scheduler back end (trait ) )。

分配当前可用的资源。 具体来说,将计算资源(Executor )分配给当前正在等待分配计算资源的Task,用分配的Executor启动Task,完成计算的调度。 org.Apache.spark.scheduler.coarsegrainedschedulerber

3 ) org.Apache.spark.scheduler.task scheduler (trait ) :从DAGScheduler接收不同Stage的任务并将其提交给不同的执行程序

org.Apache.spark.scheduler.scheduler back end # review offers调用场景:

1 )有新任务提交时

2 )任务执行失败

3 )如果计算节点(Executor不可用)

4 )某些任务执行太慢,需要重新分配资源时

调度后退是APP应用程序选择的部署模式(包括local、master、yarn和mesos的部署方法)、调度后退任务调度器(schedulerbackend-taskscheduler ) )

任务时间表逻辑示意图:

客户端端:

1 )生成作业(APP执行操作符,作业封装) )。

2 )调试计划程序划分多个级(stage(shuffermapstage和ResulStage,RDD之间的广泛依赖) (父RDD的分区对应于多个子RDD的分区) )

3 )调试计划器提交给Stage

4 )诊断程序生成需要计算的Partition的任务集

5 )任务计划器提交计算任务

6 )调度程序SchedulableBuilder调度任务

7 )任务计划器为任务分配资源

8 )排程器备份结束将任务提交给执行器执行

4.2通过诊断计划程序进一步了解

在DAGScheduler中划分的不同Stage的每个Partition上运行的Task逻辑完全相同,并封装在适当的TaskSet中

4.2.1创建Dag计划程序和任务计划程序

task scheduler=sc.createtaskscheduler (//创建task scheduler

DAGScheduler=newdagscheduler (this )创建Dag scheduler

这个构造函数的实现是defthis(sc:sparkcontext )=this ) sc,sc.taskScheduler

初始化DAGScheduler时,将创建dagschedulereventprocessactor,这是一种主要用于调用DAGScheduler来处理来自DAGScheduler的各种信息的异步并发机制

DAGSchedulerActorSupervisor只进行事件处理器的制作

4.2.2提交作业

1 ) count (操作操作员触发作业) )。

2 ) sc.runJob(sparkcontext执行job ) )。

3 ) Dag scheduler.run job (Dag scheduler执行job ) )。

4 ) Dag scheduler.submit job (Dag scheduler提交作业) )。

5 ) eventprocessactor.receive (jobsubmitted ) (加速器接收来自DAGScheduler的消息的job submitted ) ) )。

6 ) DAGScheduler.handlejobsubmitted )调用Dag scheduler的方法处理事件

dagScheduler.submitJob () )的执行过程:

1 )创建作业id

2 )制作JobWaiter拦截该Job的执行情况,如果Job的所有Task的执行成功,则Job的执行成功

3 )调试程序将该作业提交给事件处理器

eventProcessActor !JobSubmitted(jobId,rdd,func2,partitions.toArray,allowLocal,callSite,waiter,propertied)

4)dagScheduler.handleJobSubmitted()  //处理该事件

 

4.2.3 Stage的划分

dagScheduler.handleJobSubmitted() 执行中会根据RDD创建finalStage

实现原理:

finalStage=newStage(finalRDD,partitions,size,None,jobId,callSite)//根据jobId和finalRDD创建finalStage

newStage()根据当前Stage的parent stages

val parentStages=getParentStages(rdd,jobId)//创建当前RDD所在Stage的parentStage

val id=newStageId.getAndIncrement() //创建StageId

val stage=newStage(id,rdd,numTasks,shuffleDep,parentStages,jobId,callSite); //创建stage

 

判断当前RDD是否被访问,如果该RDD未被访问,则判断其依赖dep,若为ShuffleDependency,则创建shuffleMapStage,否则将其父RDD压入待访问的队列中

getShuffleMapStage获取ShuffleDependency所依赖的Stage,如果没有则创建新的Stage,否则沿用此依赖的Stage

 

ShuffleMapTask的计算结果会传给Driver端的mapOutputTracker(保存shuffle结果的位置信息),下游的Task可以据此查询结果,实际的结果保存在registerMapOutputs

 

窄依赖(narrow dependency)子RDD的partition唯一对应父RDD的partition,Task可并行执行

宽依赖(wide dependency)子RDD的partition对应父RDD的多个partition,Task不可并行

 

Action触发的Job总是从最后一个RDD开始划分Stage(从后向前划分),当执行到dagScheduler.handleJobSubmitted()时开始划分Stage

 

4.2.4 任务生成

finalStage=newStage(finalRDD,partitions,size,None,jobId,callSite)创建好finalStage后生成ActiveJob

val job=new ActiveJob(jobId,finalStage,func,partitions,callsite,listener,properties);//准备计算该finalStage

handleJobSubmitted调用submitStage提交Stage,只有其parenStage提交完成后才能提交此Stage(即从前向后提交Stage)

 

submitStage的实现原理:
1)val jobId=activeJobForStage(stage) //创建JobId

2)getMissingParentStages(stage)//返回该stage的parent stage

3)若此stage的parent为空则直接提交该Stage submitMissingTasks(stage,jodId.get);否则递归提交该stage的parent stagesubmitStage(parent)

 

submitMissingTasks(stage,jobId.get)向TaskScheduler提交stage的Tasks,其实现原理:

1)取得需要计算的partitons

2)若为finalStage则对应的Task为ResultTask,则会判断该Partition的ResultTask是否结束,如果已经结束则无需计算;若为其他Stage则对应的Task是ShuffleMapTask,只需要判断该Stage是够存在缓存结果,在判断哪些partition需要计算后,会为每个partition生成Task,然后封装成一个TaskSet提交给TaskScheduler

 

4.3 任务调度实现详解

4.3.1 TaskScheduler的创建

sc.createTaskScheduler() //创建TaskScheduler的实现原理

Standalone模式创建TaskScheduler和SchedulerBackend的原理

master match{

case SPARK_REGEX(sparkUrl)=>

val scheduler = new TaskSchedulerImple(sc)//创建TaskScheduler

val masterUrls=sparkUrl.split(“,”).map(“spark://”+_)//得到多个masterUrls Master/Worker模式

val backend=new SparkDeploySchedulerBackend(scheduler,sc,masterUrls)//根据masterUrls创建SchedulerBackend

scheduler.initialize(backend) //初始化schedulerbackend

(backend,scheduler)  //返回值   

}

 

当TaskScheduler创建完成后,会周期性的调用org.apache.spark.scheduler.TaskSetManager.checkSpeculatableTasks检查是否存在推测执行的Task(解决慢任务)

 

4.3.2 Task的提交

Driver端调用栈:

1)TaskSchedulerImpl.submitTasks//封装TaskSet并创建TaskSetManager

val manager=newTaskSetManager(this,taskSet,maxTaskFailures)

manager用于监控Task的执行状态并采取相应的措施,比如失败重试,慢任务的推测性执行

2)SchedulableBuilder.addTaskSetManager()

SchedulableBuilder是Application级别的调度器,支持FIFO和FAIR公平调度策略,从而决定TaskSetManager的调度顺序,然后由TaskSetManager根据就近原则(数据本地性)确定Task运行在哪个Executor上

3)CoarseGrainedSchedulerBackend.reviveOffers  //SchedulerBackend接收TaskSet

4)CoarseGrainedSchedulerBackend.DriverActor.makeOffers
5)TaskSchedulerImpl.resourceOffers //为Task分配Executor资源,输入为Executor的列表(可用资源),输出TaskDescription的二维数组(TaskID,ExecutorID,Task执行环境的依赖关系)

6)CoarseGrainedSchedulerBackend.DriverActor.launchTasksExecutor启动并执行Task

 

4.3.3 任务调度的具体实现

根据schedulingMode的不同创建FIFOSchedulableBuilder or FairSchedulableBuilder,对于FIFO而言rootPool包含一组TaskSetManager;对于FAIR而言rootPool包含一组Pool。这些Pool构成调度树,每个叶子节点即为TaskSetManager

 

FIFO调度策略:对于任意Task而言,从JobID较小的开始调度,若属于同一个Job则从StageID小的开始调度

 

FAIR调度策略:首先挂到rootPool虾米那的pool先确定调度顺序,然后在每个pool内部使用相同的算法确定TaskSetManager的调度顺序,一旦确定调度顺序,TaskScheduler便可以将Task发送给Executor执行

 

4.3.4 Task运行结果的处理

Task在Executor上执行完成后,会通过向Driver发送StatusUpdate的消息来通知Driver任务的状态更新为TaskState.FINISHED,并通知TaskScheduler,TaskScheduler重新分配计算任务给该Executor(资源复用)

TaskSchedulerImpl.statusUpdate根据Task的状态信息若为FINISHED则标记该任务结束并处理任务的计算结果;否则标记该任务结束并处理任务失败的情况

TaskScheduler处理每次结果都由一个Daemon线程池负责,默认的线程池是4个线程

 

Executor回传计算结果给Driver时的策略:

1)若结果大于1GB则直接丢弃

2)若结果较大,则以tid为key存入BlockManager

3)若结果不大,则直接传给Driver

taskResultGetter.enqueueSuccessfulTask(taskSet,tid,state,serializedData)的核心部分是

scheduler.handleSuccessfulTask(taskSetManager,tid,result)负责处理计算结果的调用栈过程:

1)TaskSchedulerImpl.handleSuccessfulTask

2)TaskSetManager.handleSuccessfulTask

3)DAGScheduler.taskEnded

4)DAGScheduler.eventProcessActor

5)DAGScheduler.handleTaskCompletion

对于ResultTask则调用JobWaiter告知调用者任务已结束,当所有Task都执行结束后则标记Job结束;对于ShuffleMapTask则需要将结果保存到Stage,当Stage的所有Task都执行结束后则将整体结果注册到MapOutputTrackerMaster中

 

若Stage中所有Task都返回了,但部分数据为空,则需要重新提交submitStage(stage),若该Stage成功返回,且其不存在parent stage或者parent stage均提交了,则该stage可以提交submitMissiontTasks(stage,jobId),若该stage为finalstage,则整个Job执行完成

taskResultGetter.enqueueFailedTask(taskSetManager,tid,taskState)的核心部分是

scheduler.handleFailedTask(taskSetManager,tid,taskState,reason)

taskSetManager.handleFailedTask(tid,taskState,reason)

TaskSetManager会根据失败的原因采用不同的动作,允许最多重试4次(即会标记重试任务为等待调度),若超过最大重试次数则直接标记失败

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。