1. ContainerAllocator概述
ContainerAllocator负责与ResourceManager通信,为作业申请资源。作业的每个任务资源需求可描述为四元组<Priority, hostname,capability,containers>,分别表示作业优先级、期望资源所在的host,资源量(当前仅支持内存),container数目,比如:
<10, “node1”, “memory:1G”, 3>//优先级是一个正整数,优先级值越小,优先级越高
<10, “node2”, “memory:2G”, 10>
<2, “*”, “memory:1G”, 20> //*表示这样的资源可来自任意一个节点,即不考虑数据本地性
ContainerAllocator周期性通过心跳与ResourceManager通信,ResourceManager每次会返回已经分配的container列表,完成的container列表等信息。
2. ContainerAllocator工作流程
当用户提交作业之后,MRAppMaster会为之初始化,并创建一系列map task和reduce task,由于reduce task依赖于map task之间结果,所以reduce task会延后调度。在ContainerAllocator中,当map task数目完成一定比例(由mapreduce.job.reduce.slowstart.completedmaps指定,默认是0.05,即5%)且Reduce Task可允许占用的资源(Reduce Task可占用资源比由yarn.app.mapreduce.am.job.reduce.rampup.limit指定)能够折合成整数个任务时,才会调度Reduce
Task。
考虑到Map Task和Reduce Task之间的依赖关系,因此,它们之间的数据结构转移也是不一样的,对于Map Task而言,会依次转移到以下几个数据结构中:
scheduled->assigned->completed
对于Reduce Task而言,则按照以下流程:
pending->scheduled->assigned->completed
其中,pengding表示等待ContainerAllocator发送资源请求,scheduled表示已经将资源请求发送给RM,但还没有收到分配的资源,assigned是已经收到RM分配的资源,completed已经未完成。
Reduce Task之所有多出一个pending,主要是为了根据Map Task情况调整Reduce Task状态(在pengding和scheduled中相互转移)。进一步说,这主要是为了防止Map Task饿死,因为在YARN中不再有map slot和reduce slot的概念(这两个概念从一定程度上减少了作业饿死的可能性),只有内存、CPU等真实的资源,需要由ApplicationMaster控制资源申请的顺序,以防止可能产生的作业饿死。
此外,ContainerAllocator将所有任务划分成三类,分别是failed Map、Map和Reduce,并分别赋予它们优先级5、20和10,也就是说,当三种任务同时有资源需求是,会优先分配给failed map,然后是reduce,最后是map。
总结起来,ContainerAllocator工作流程如下:
步骤1 将所有map task的资源需求一次性发送给RM
步骤2 如果达到了Reduce task调度条件,则开始为Reduce Task申请资源。
步骤3 如果为某个task申请到了资源,则取消其他重复资源的申请。由于在HDFS中,任何一个任务通常有三备份,而对于一个任务而言,考虑到rack和any级别的本地性,它可能会对应7个资源请求,分别是:
<20, “node1”, “memory:1G”, 1>
<20, “node2”, “memory:1G”, 1>
<20, “node3”, “memory:1G”, 1>
<20, “rack1”, “memory:1G”, 1>
<20, “rack2”, “memory:1G”, 1>
<20, “rack3”, “memory:1G”, 1>
<20, “*”, “memory:1G”, 1>
一旦该任务获取了以上任何一种资源,则会取消其他6个的资源申请。
步骤4 如果任务运行失败,则会重新为该任务申请资源。
步骤5 如果一个任务运行速度过慢,则会为其额外申请资源以启动备份任务(如果启动了推测执行功能)。
步骤6 如果一个节点失败的任务数目过多,则会撤销对该节点的所有资源申请请求。
3. ContainerAllocator类图
ContainerAllocator实际上是一接口,它只定义了三个事件:CONTAINER_REQ,,CONTAINER_DEALLOCATE和CONTAINER_FAILED,分别表示请求container,释放container和container运行失败。
ContainerAllocator的实现是RMContainerAllocator,它只接收和处理ContainerAllocator接口中定义的三种事件,它的运行是这三种事件驱动的。
RMContainerAllocator中最核心的框架是维护了一个心跳信息,在RMCommunicator类中实现如下:
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(rmPollInterval);
try {
heartbeat();
} catch (YarnException e) {
LOG.error("Error communicating with RM: " + e.getMessage() , e);
return;
} catch (Exception e) {
LOG.error("ERROR IN CONTACTING RM. ", e);
}
} catch (InterruptedException e) {
LOG.warn("Allocated thread interrupted. Returning.");
return;
}
}
其中,heartbeat()函数定义(在RMContainerAllocator类中)如下:
protected synchronized void heartbeat() throws Exception {
LOG.info("Before Scheduling: " + getStat());
List<Container> allocatedContainers = getResources();
LOG.info("After Scheduling: " + getStat());
if (allocatedContainers.size() > 0) {
LOG.info("Before Assign: " + getStat());
scheduledRequests.assign(allocatedContainers);
LOG.info("After Assign: " + getStat());
}
……
}
其中,getResources()函数用于向RM发送心跳信息,并处理心跳应答。需要注意的是,有些情况下,心跳信息中并不包含新的资源请求信息,即空的心跳信息,这有以下几个作用:
(1)周期性发送心跳,告诉RM自己还活着。
(2)周期性询问RM,以获取新分配的资源和各个container运行状况。
assign()函数是将收到的container分配给某个任务,如果这个container无法分配下去(比如内存空间不够),则是在下次心跳中通知RM释放该container,如果container可以分下去,则会释放对应任务的其他资源请求,同时会向TaskAttempt发送一个TA_ASSIGNED事件,以通知ContainerLauncher启动container。
分享到:
相关推荐
YARN(MRv2)搭建
yarn-v0.23.2.tar.gz 在安装ambari,源码编译的时候下载的文件有问题 手动下载 地址 https://github.com/yarnpkg/yarn/releases/download/v0.23.2/yarn-v0.23.2.tar.gz
脚本使用:vim编辑脚本,按照自己的配置修改主机号,我的是hadoop1、2是NN;hadoop2、3是Spark Master;hadoop3还是RM;hadoop4、5、6是DN、NM、Spark Worker。编辑完成后在满足“前提”的任意一台主机运行均可。 ...
yarn-v1.22.5.tar.gz
hadoop YARN应用开发与核心源码剖析
官网直接安装的不支持vite2+vue3的 主要修复: 1.build或者dev项目时不报错,兼容vite2,vue3; 2.加入deep监听watch,直接在父组件中修改图表中的config参数即可完成图表中的数据变更。 yarn npm cnpm pnpm可通用...
yarn-workspace-plugin-since
详细的描述了yarn的框架,对yarn的实现代码进行了详细的分析
从架构、设计模式和代码对Yarn进行的详细剖析和原理机制的分析
Hadoop的2.0版本的yarn的框架介绍啊 Hadoop yarnYARN 本身框架的优势是扩展性与支持多计算模型。对于扩展性目前主要体现在计算节点规模上,以前 JobTracker-TaskTracker 模型下最多大约在 5000 台机器左右,对于 ...
SPARK2_ON_YARN-2.4.0 jar包下载
这将使git存储库中的.yarn/cache不存在。 它使用并重写历史记录,因此使用后果自负。 它的工作方式是先将带有--mirror标志的--mirror到临时路径中。 然后,它将计算.yarn/cache中文件的git对象ID列表,并将其删除。...
Hadoop技术内幕深入解析YARN架构设计与实现原理
Flink 2018峰会 阿里大牛的技术, 在线教程有github:**,第7个文档 详细的讲解Flink和YARN及kubenete的集成,值得收藏
Yarn框架代码详细分析V0.3.pdf
YARN配置、启动与验证 YARN配置、启动与验证 序号 任务名称 任务一 YARN组件参数配置 任务二 MapReduce组件参数配置 任务三 配置SSH无密钥登录(slave1为主节点) 任务四 分发YARN与MapReduce配置文件 任务五 启动...
赠送jar包:hadoop-yarn-api-2.5.1.jar; 赠送原API文档:hadoop-yarn-api-2.5.1-javadoc.jar; 赠送源代码:hadoop-yarn-api-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-api-2.5.1.pom; 包含翻译后...
npm install -g yarn yarn install 安装失败,使用官方下载的yarn.lock文件
深入解析YARN架构设计与实现原理,深入解析YARN架构设计与实现原理深入解析YARN架构设计与实现原理深入解析YARN架构设计与实现原理
3. 开发环境:yarn/npm run dev; 生产环境: yarn/npm run prd; 三. 接口文档(token无论request或者responed均放在请求头authorization中,文档中不再体现) 1. 新增用户 接口说明:创建一个新user 接口地址:/user/...