`

YARN/MRv2 MRAppMaster深入剖析—作业生命周期

 
阅读更多

本节分析一个作业从开始运行到运行结束,所经历的整个过程,期间涉及到的各种事件和状态变化。

在正式讲解作业生命周期之前,先要了解MRAppMaster中作业表示方式,每个作业由若干干Map Task和Reduce Task组成,每个Task进一步由若干个TaskAttempt组成,Job、Task和TaskAttempt的生命周期均由一个状态机表示,具体可参考https://issues.apache.org/jira/browse/MAPREDUCE-279(附件中的图yarn-state-machine.job.png,yarn-state-machine.task.png和yarn-state-machine.task-attempt.png)

作业的创建入口在MRAppMaster类中,如下所示:

public class MRAppMaster extends CompositeService {

  public void start() {

    ...

    job = createJob(getConfig());//创建Job

    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);

    jobEventDispatcher.handle(initJobEvent);//发送JOB_INI,创建MapTask,ReduceTask

    startJobs();//启动作业,这是后续一切动作的触发之源

    ...

  }

protected Job createJob(Configuration conf) {

  Job newJob =

    new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),

      taskAttemptListener, jobTokenSecretManager, fsTokens, clock,

      completedTasksFromPreviousRun, metrics, committer, newApiCommitter,

      currentUser.getUserName(), appSubmitTime, amInfos, context);

  ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);

  dispatcher.register(JobFinishEvent.Type.class,

    createJobFinishEventHandler());

  return newJob;

  }

}

(1)作业/任务初始化

JobImpl会接收到.JOB_INIT事件,然后触发作业状态从NEW变为INITED,并触发函数InitTransition(),该函数会创建MapTask和

ReduceTask,代码如下:

public static class InitTransition

  implements MultipleArcTransition<JobImpl, JobEvent, JobState> {

  ...

  createMapTasks(job, inputLength, taskSplitMetaInfo);

  createReduceTasks(job);

  ...

}

其中,createMapTasks函数实现如下:

private void createMapTasks(JobImpl job, long inputLength,

  TaskSplitMetaInfo[] splits) {

  for (int i=0; i < job.numMapTasks; ++i) {

    TaskImpl task =

      new MapTaskImpl(job.jobId, i,

      job.eventHandler,

      job.remoteJobConfFile,

      job.conf, splits[i],

      job.taskAttemptListener,

job.committer, job.jobToken, job.fsTokens,

job.clock, job.completedTasksFromPreviousRun,

job.applicationAttemptId.getAttemptId(),

job.metrics, job.appContext);

job.addTask(task);

}

}

(2)作业启动

public class MRAppMaster extends CompositeService {

protected void startJobs() {

JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);

dispatcher.getEventHandler().handle(startJobEvent);

}

}

JobImpl会接收到.JOB_START事件,会触发作业状态从INITED变为RUNNING,并触发函数StartTransition(),进而触发Map Task和Reduce Task开始调度:

public static class StartTransition

implements SingleArcTransition<JobImpl, JobEvent> {

public void transition(JobImpl job, JobEvent event) {

job.scheduleTasks(job.mapTasks);

job.scheduleTasks(job.reduceTasks);

}

}

这之后,所有Map Task和Reduce Task各自负责各自的状态变化,ContainerAllocator模块会首先为Map Task申请资源,然后是Reduce Task,一旦一个Task获取到了资源,则会创建一个运行实例TaskAttempt,如果该实例运行成功,则Task运行成功,否则,Task还会启动下一个运行实例TaskAttempt,直到一个TaskAttempt运行成功或者达到尝试次数上限。当所有Task运行成功后,Job运行成功。一个运行成功的任务所经历的状态变化如下(不包含失败或者被杀死情况):

【总结】

本文分析只是起到抛砖引入的作用,读者如果感兴趣,可以自行更深入的研究以下内容:

(1)Job、Task和TaskAttempt状态机设计(分别在JobImpl、TaskImpl和TaskAttemptImpl中)

(2)在以下几种场景下,以上三个状态机的涉及到的变化:

1) kill job

2) kill task attempt

3) fail task attempt

4) container failed

5) lose node

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics