文章目录
  1. 1. NodeManager向ResourceManager汇报资源
  2. 2. ResourceManager对NodeManager注册事件的处理

最近非常好奇的一个问题是:在Yarn中,Container是如何进行轮转的?
因而在最近的一系列文章中,我们将探究Yarn的Container资源管理机制。本篇文章作为基础,分析一下NodeManager在启动时如何向ResourceManager注册资源。

NodeManager向ResourceManager汇报资源

我们首先跟踪NodeManager代码,在NodeManager类的ServiceInit函数中可以看到如下代码:

1
2
3
// StatusUpdater should be added last so that it get started last 
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);

好吧,从注释我们可以看到,nodeStatusUpdater是最后向RM注册的服务。既然找到了开端,那我们就接着往下跟踪。nodeStatusUpdater是一个NodeStatusUpdaterImpl类的对象,我们看看它的ServiceInit以及ServiceStart。在ServiceInit中,nodeStatusUpdater首先从配置文件获取了节点资源相关的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int memoryMb = 
conf.getInt(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
float vMemToPMem =
conf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);

int virtualCores =
conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);

this.totalResource = Resource.newInstance(memoryMb, virtualCores);

在ServiceStart中,NodeManager通过与RM的代理向ResourceManager注册,代码如下所示:

1
2
3
4
5
6
7
8
9
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications());
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier();

在request的构造中,我们看到有一项是totalResource,可以知道此时NodeManager已经将自身包含的资源向ResourceManager进行注册。接着NodeStatusUpdaterImpl将启动一个心跳线程定时向RM发送心跳信息,并执行RM随心跳信息返回的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM());

lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
if (!containersToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanup();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (!appsToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
}

然而,在这里我们只看到了NodeManager向ResourceManager汇报Container状态并执行ResourceManager要求清理或移除Container的操作。那么,ResourceManager要求NodeManager创建Container的操作逻辑在哪儿呢?

ResourceManager对NodeManager注册事件的处理

在RM的初始化服务过程中注册了NodeEventDispatcher服务,该服务将处理NM的注册事件。RMNodeImpl是Node注册事件的Handler,它使用了一个状态机来处理来自于NM的事件。

1
2
3
4
5
6
//Transitions from NEW state
.addTransition(NodeState.NEW, NodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition())
.addTransition(NodeState.NEW, NodeState.NEW,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())

Node注册最先由AddNodeTransition处理。这个函数通过

1
2
3
4
5
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeAddedSchedulerEvent(rmNode, containers));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));

将NM注册事件交由CapacityScheduler处理(对应NODE_ADDED事件),并用NODE_USABLE事件将该NM更新到NodeListManager中。在NODE_ADDED事件中,CapacityScheduler将该Node对应的资源更新到clusterResource中:

1
2
3
4
5
6
7
8
9
10
Resources.addTo(clusterResource, nodeManager.getTotalCapability());

// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}

root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));

NodeListManager处理NODE_USABLE事件将该Node加入到updatedNodes中。
OK,资源已经收集好了,接下来就是资源分配了。

文章目录
  1. 1. NodeManager向ResourceManager汇报资源
  2. 2. ResourceManager对NodeManager注册事件的处理