名扬数据:Hadoop源码分析之心跳机制
一.心跳机制
1. hadoop集群是master/slav模式,master包括Namenod和Jobtrackslave包括Datanod和Tasktrack
2. mast启动的时候,会开一个ipc serv那里,等待slave心跳。
3. slav启动时,会连接master并每隔3秒钟主动向master发送一个“心跳”这个时间可 以通过”heartbeat.recheck.interv属性来设置。将自己的状态信息告诉master然后master也是通过这个心跳的返回值,向slave节点传达指令。
4. 需要指出的namenod与datanod之间的通信,jobtrack与tasktrack之间的通信,都是通过“心跳”完成的
二.DatanodNamenod心跳源码分析
既然“心跳”Datanod主动给Namenod发送的那Datanod怎么样发送的呢?下面贴出Datanode.class中的关键代码:
代码一:
/**
* 循环调用“发送心跳”方法,直到shutdown
* 调用远程Namenod方法
*/
public void offerServic throws Exception {
???
while shouldRun {
try {
long startTime = now;
// heartBeatInterv启动Datanod时根据配置文件设置的心跳间隔时间
if startTime - lastHeartbeat > heartBeatInterv {
lastHeartbeat = startTime;
//Datanod发送心跳
DatanodeCommand[] cmds = namenode.sendHeartbeatdnRegistration,
data.getCapac,
data.getDfsUs,
data.getRemain,
xmitsInProgress.get,
getXceiverCount;
myMetrics.addHeartBeatnow - startTim;
if !processCommandcmd
continue;
}
???
}
} // while shouldRun
} // offerService
需要注意的发送心跳的对象并不是datanod而是一个名为namenod对象,难道在datanod端就直接有个namenod引用吗?其实不然,来看看这个namenod吧:
代码二:
public DatanodeProtocol namenode = null;
namenod其实是一个DatanodeProtocol引用,对hadoop RPC机制分析的文章中我提到过,这是一个Datanod和Namenod通信的协议,其中有许多未实现的接口方法,sendHeartbeat就是其中的一个。下面看看这个namenod对象是怎么被实例化的吧:
代码三:
this.namenode = DatanodeProtocol
RPC.waitForProxiDatanodeProtocol.class,
DatanodeProtocol.versionID,
nameNodeA ddr,
conf;
其实这个namenod并不是Namenod一个对象,而只是一个Datanod端对Namenod代理对象,正是这个代理完成了心跳”代理的底层实现就是RPC机制了
三.TasktrackJobtrack心跳源码分析
同样我从Tasktrack入手,下面贴出Tasktracker.class关键代码:
代码四:
代码一:
State offerServic throws Exception {
long lastHeartbeat = System.currentTimeMilli;
while running && !shuttingDown {
???
// 发送心跳,调用代码二
HeartbeatResponse heartbeatResponse = transmitHeartBeatnow;
???
return State.NORMA L;
}
代码二:
HeartbeatResponse transmitHeartBeatlong now throws IOException {
???
HeartbeatResponse heartbeatResponse = jobClient.heartbeatstatus,
justStarted,
justInited,
askForNewTask,
heartbeatResponseId;
???
return heartbeatResponse;
}
其实我觉得分析到这里大家就可以自己分析了jobClient也是一个协议:
代码五:
InterTrackerProtocol jobClient;
该协议用于定义Tasktrack和Jobtrack通信。同样,也是一个代理对象:
代码六:
this.jobClient = InterTrackerProtocol
UserGroupInformation.getLoginUs.doA
new PrivilegedExceptionA ction<Object> {
public Object run throws IOException {
return RPC.waitForProxiInterTrackerProtocol.class,
InterTrackerProtocol.versionID,
jobTrackA ddr, fConf;
}
};
终于,hadoop底层通信整个系列的源码分析全部完成了可以好好地复习学校的功课了