Terasort output replication


最近一直在关注TeraSort。
有时候你非常关注某些东西是却会忽略其他一些很基本的东西。比如TeraSort,我一直很关注如何Map更快、Shuffle更快、Reduce更快,回来却发现有个空白竟然没关注到,着实吓了一跳。

回归正题。某一天我一抬头,看到了TeraSort的网络性能图,
两头空白
为什么会是两头空白?好吧,Hadoop为Map任务做了Locality Based处理,Map阶段没有网络流量我能理解。然而,Reduce阶段呢?至少TeraSort会将输出数据写入到HDFS的,说好的3备份直接的数据传递产生的网络流量呢?
是不是输出文件的3备份除了问题呢?
Hadoop提供了hadoop fsck /FILE-PATH -files -locations -blocks查看文件块的保存地点及路径。通过查看,果然,TeraSort的输出文件块备份为1。

从代码中找找确认一下。

1
2
job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
TeraOutputFormat.setFinalSync(job, true);

从上面代码可以看到,TeraSort确实更新了dfs.replication设置,而

1
2
3
4
5
public static int getOutputReplication(JobContext job) {
return job.getConfiguration().getInt(OUTPUT_REPLICATION, 1);
}
......
static String OUTPUT_REPLICATION = "mapreduce.terasort.output.replication";

如果mapreduce.terasort.output.replication设置成3从命令行传入,期待的网络流量出来了。
找回reduce流量
应该是TeraSort觉得输出数据不重要不需要多备份吧。

有时候,太关注一些事情时,会让你忽略掉其他的很多信息。

聊聊chroot


这里有一篇很棒的文章关于chroot如何使用。

其实在Linux的源码(2.6.32)中,最后一部分是使用到了Chroot的。我们知道,Linux内核在启动过程中,完成了内存、cpu等等最基础资源的的初始化之后将要挂载Initrd,然后执行Initrd中的一系列命令。当Initrd中的任务执行完之后,Linux将要挂载硬盘文件系统,作为启动后的工作文件系统。这个过程中就使用到了chroot。

当然chroot对应的root并不是文件系统中的/root目录。我的理解是change root inode。大家知道,对Linux来说,无论是磁盘中的文件、目录都对应着一个inode。那么,文件系统中的数据是如何访问的呢?在Linux中,系统使用一个root inode保存当前文件系统的入口,当然默认情况对应的是文件系统的根目录。当用户需要访问文件系统中的某一个文件时,例如对应的路径是/a/b/c/xx时,Linux对文件路径的解析是从root inode找到a目录对应的inode,再从a目录下对应的inodes中找到目录b对应的inode,层层寻找最终找到xx文件对应的inode。从上面这个过程我们可以看到,root inode作为整个文件访问的入口的作用。

当我们使用chroot将root inode替换之后,文件系统的入口变成了整个磁盘文件系统的某一个目录时,系统中的文件访问路径自然也有了相应的局限,局限到一个某个目录中,这也就达到了chroot的种种错误隔离之类的效果。

这里说说我之前的一个疑问。我们之前做了一个多OS系统,每个OS使用宿主OS的一个目录,为了兼容整个应用执行的环境,我们需要将目标OS的文件访问路径进行chroot。其实这个场景在Linux Container中也非常常见。那么问题来了,对于某个系统来说,我们能用符号链接将宿主系统的/bin、/lib、/sbin之类的目录链接到目标系统的工作目录下以共享使用么?
答案是不行的。因为符号链接对于Linux来说是一个特殊的文件,里边保存了目标文件的的路径。那么,既然你整个文件系统的路径都已经变了,符号链接中的目录自然也找不到了。
这个问题其实使用硬链接就可以解决。硬链接相当于将目标目录的inode挂在本目录下。那么,既然inode都是一样的,系统根据该inode可以找到bin中的文件,访问路径也就没问题了。

最近非常好奇的一个问题是:在Yarn中,Container是如何进行轮转的?
因而在最近的一系列文章中,我们将探究Yarn的Container资源管理机制。本篇文章作为基础,分析一下NodeManager在启动时如何向ResourceManager注册资源。

NodeManager向ResourceManager汇报资源

我们首先跟踪NodeManager代码,在NodeManager类的ServiceInit函数中可以看到如下代码:

1
2
3
// StatusUpdater should be added last so that it get started last 
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);

好吧,从注释我们可以看到,nodeStatusUpdater是最后向RM注册的服务。既然找到了开端,那我们就接着往下跟踪。nodeStatusUpdater是一个NodeStatusUpdaterImpl类的对象,我们看看它的ServiceInit以及ServiceStart。在ServiceInit中,nodeStatusUpdater首先从配置文件获取了节点资源相关的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int memoryMb = 
conf.getInt(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
float vMemToPMem =
conf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);

int virtualCores =
conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);

this.totalResource = Resource.newInstance(memoryMb, virtualCores);

在ServiceStart中,NodeManager通过与RM的代理向ResourceManager注册,代码如下所示:

1
2
3
4
5
6
7
8
9
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications());
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier();

在request的构造中,我们看到有一项是totalResource,可以知道此时NodeManager已经将自身包含的资源向ResourceManager进行注册。接着NodeStatusUpdaterImpl将启动一个心跳线程定时向RM发送心跳信息,并执行RM随心跳信息返回的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM());

lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
if (!containersToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanup();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (!appsToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
}

然而,在这里我们只看到了NodeManager向ResourceManager汇报Container状态并执行ResourceManager要求清理或移除Container的操作。那么,ResourceManager要求NodeManager创建Container的操作逻辑在哪儿呢?

ResourceManager对NodeManager注册事件的处理

在RM的初始化服务过程中注册了NodeEventDispatcher服务,该服务将处理NM的注册事件。RMNodeImpl是Node注册事件的Handler,它使用了一个状态机来处理来自于NM的事件。

1
2
3
4
5
6
//Transitions from NEW state
.addTransition(NodeState.NEW, NodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition())
.addTransition(NodeState.NEW, NodeState.NEW,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())

Node注册最先由AddNodeTransition处理。这个函数通过

1
2
3
4
5
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeAddedSchedulerEvent(rmNode, containers));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));

将NM注册事件交由CapacityScheduler处理(对应NODE_ADDED事件),并用NODE_USABLE事件将该NM更新到NodeListManager中。在NODE_ADDED事件中,CapacityScheduler将该Node对应的资源更新到clusterResource中:

1
2
3
4
5
6
7
8
9
10
Resources.addTo(clusterResource, nodeManager.getTotalCapability());

// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}

root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));

NodeListManager处理NODE_USABLE事件将该Node加入到updatedNodes中。
OK,资源已经收集好了,接下来就是资源分配了。

昨天跟同事们谈起TeraSort的一些问题,忽然想到TeraGen。之前对TeraGen有一个大概的概念-TeraSort的数据生成器,生成100byte的Record。既然好奇,这次就索性分析一下TeraGen的源码。

首先来看看TeraGen的启动代码:

1
2
3
4
5
6
7
8
9
10
11
12
setNumberOfRows(job, parseHumanLong(args[0]));
Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraGen");
job.setJarByClass(TeraGen.class);
job.setMapperClass(SortGenMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(RangeInputFormat.class);
job.setOutputFormatClass(TeraOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;

TeraGen的输入一共有两个,num rows以及 output dir。从上面初始化代码部分可以看到,TeraGen使用的map类是SortGenMapper,不需要Reduce阶段,InputFormat类是RangeInputFormat,OutputFormat类是TeraOutputFormat。

首先,我们分析Map阶段的输入。根据MapReduce程序的执行流程,在Map阶段之前系统将首先分析输入文件,计算Split等,因而我们先看RangeInputFormat类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public List<InputSplit> getSplits(JobContext job) {
long totalRows = getNumberOfRows(job);
int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
LOG.info("Generating " + totalRows + " using " + numSplits);
List<InputSplit> splits = new ArrayList<InputSplit>();
long currentRow = 0;
for(int split = 0; split < numSplits; ++split) {
long goal =
(long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
splits.add(new RangeInputSplit(currentRow, goal - currentRow));
currentRow = goal;
}
return splits;
}

先看getSplits方法,该方法由Hadoop框架调用,获得split的描述数组,每个split将对应一个map任务。从上面的代码可以看出,numSplits由NUM_MAPS输入项确定,初次之外生成数据总行数也由配置输入。对于每一个Split,函数中计算Split的初始值以及行数。也就是说,每个Map需要产生数据的开始行以及行数都由Split中保存好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void map(LongWritable row, NullWritable ignored,
Context context) throws IOException, InterruptedException
{

if (rand == null) {
rowId = new Unsigned16(row.get());
rand = Random16.skipAhead(rowId);
checksumCounter = context.getCounter(Counters.CHECKSUM);
}
Random16.nextRand(rand);
GenSort.generateRecord(buffer, rand, rowId);
key.set(buffer, 0, TeraInputFormat.KEY_LENGTH);
value.set(buffer, TeraInputFormat.KEY_LENGTH,
TeraInputFormat.VALUE_LENGTH);
context.write(key, value);
crc32.reset();
crc32.update(buffer, 0,
TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH);
checksum.set(crc32.getValue());
total.add(checksum);
rowId.add(ONE);
}

我们再看看map函数。按照Hadoop的执行流程,框架通过haveNextValue和getCurrentKey从RecordReader中获取输入,并传递给map函数。从TeraGen源码中的RangeRecordReader我们可以知道,getCurrentKey返回的就是当前的Row号。从Map函数可知,map使用rowid生成一个100字节的数据保存在Buffer中,并以10字节为key、90字节为value的形式写入到Context中。
在Hadoop中,当Reduce数设置为0时,map的输出将通过配置输出方式输出,在TeraGen中将使用TeraOutputFormat输出。我们看看这个输出类中的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static class TeraRecordWriter extends RecordWriter<Text,Text> {
private boolean finalSync = false;
private FSDataOutputStream out;

public TeraRecordWriter(FSDataOutputStream out,
JobContext job)
{

finalSync = getFinalSync(job);
this.out = out;
}

public synchronized void write(Text key,
Text value) throws IOException
{

out.write(key.getBytes(), 0, key.getLength());
out.write(value.getBytes(), 0, value.getLength());
}

public void close(TaskAttemptContext context) throws IOException {
if (finalSync) {
out.sync();
}
out.close();
}
}

我们可以看到,最终Map结束以后数据以先Key后Value的形式写入到HDFS。

按照Hadoop的框架,map函数将key/value对输出到Sort缓冲区中,当缓冲区满时spill线程按照key对数据进行排序,并输出到磁盘。至此,TeraGen的整个数据生成过程理清了。

现在还有一个问题,在计算TeraGen的进度时,是否会包括HDFS的操作时间?当任务进度为100%,是最后一个key/value对刚刚写入Context还是所有的数据都已经写入到HDFS?这个问题我们将在之后继续探究。

最近忙的一件事是收到来自同事的需求,探索Hadoop的网络支撑环境。于是我决定先看看Hadoop对于网络拓扑的支持情况。

Hadoop Rack Awareness

在Apache的Hadoop文档中有一段对Rack Awareness的介绍。Hadoop会根据Rack的情况进行容错部署,例如至少将一个备份块放置在不同的Rack中。根据Hadoop文档描述,Hadoop中可以使用Java类以及外部脚本两种方式来指定Host到Rack的映射关系。在这里我选择了使用Script的方式。
Hadoop对Script based的host到rack映射实现在org.apache.hadoop.net.ScriptBasedMapping类中。我们摘录了其中的一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public List<String> resolve(List<String> names) {
List<String> m = new ArrayList<String>(names.size());

if (names.isEmpty()) {
return m;
}

if (scriptName == null) {
for (String name : names) {
m.add(NetworkTopology.DEFAULT_RACK);
}
return m;
}

String output = runResolveCommand(names, scriptName);
if (output != null) {
StringTokenizer allSwitchInfo = new StringTokenizer(output);
while (allSwitchInfo.hasMoreTokens()) {
String switchInfo = allSwitchInfo.nextToken();
m.add(switchInfo);
}

从代码中我们可以看出,如果没有指定外部的映射脚本(net.topology.script.file.name)的话,Hadoop将所有的机器设置在DEFAULT_RACK(/default-rack)下;否则,则将以names作为输入获得映射脚本的输出。Hadoop在调用映射脚本时以IP作为输入,输出/rack$n。
在我的简单集群中,机器一个分成两个Rack。对于Rack的映射可以简单通过IP地址进行映射,从而得到所属的Rack id。我们将映射Shell脚本放置在Hadoop根目录中,在core-site.xml配置net.topology.script.file.name路径。配置更新到集群所有节点。如果在Shell脚本中加入打印信息,我们可以获得Hadoop集群执行过程中的一些调用信息。通过hadoop dfsadmin -report中报告节点信息上我们可以查看到Hadoop集群节点的Rack信息,同时在namenode的log中也可以看到节点的详细注册信息。

ScriptBasedMapping调用

我们可以追究一下ScriptBasedMapping的调用关系。通过追踪Hadoop源码我们可以看到,在DataNodeManager的构造函数中有以下代码:

1
2
3
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);

yarn的RackResolver中也有一段相同的代码。
而我们看conf.getClass的原型可以知道,这段代码首先获得net.topology.node.switch.mapping.impl对应的类,这个类必须实现 DNSToSwitchMapping接口,如果这个类没有配置,这使用默认类ScriptBasedMapping。下面是Hadoop源码中对getClass的描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** 
* Get the value of the <code>name</code> property as a <code>Class</code>
* implementing the interface specified by <code>xface</code>.
*
* If no such property is specified, then <code>defaultValue</code> is
* returned.
*
* An exception is thrown if the returned class does not implement the named
* interface.
*
* @param name the class name.
* @param defaultValue default value.
* @param xface the interface implemented by the named class.
* @return property value as a <code>Class</code>,
* or <code>defaultValue</code>.
*/
public <U> Class<? extends U> getClass(String name,
Class<? extends U> defaultValue,
Class<U> xface) {

通过上面的代码分析我们可以得到以下两点:
1、Rackawareness在Hdfs以及Yarn中使用,用于数据及Container的分配指导;
2、Rackawareness首先判断java类实现是否存在,如果不存在再使用ScriptBasedMapping。

最近的实验中出现了iotop中观测到flush线程io百分比达到了99.99%。这个现象挺出乎意料的,因为在我的印象中flush进程是不应该有io操作的。所以我首先怀疑的是iotop中对io百分比的计算。

iotop中对IO百分比的计算

还是回到iotop的代码。在ui.py中,我们可以发现如下代码

1
2
def delay2percent(delay): # delay in ns, duration in s
return '%.2f %%' % min(99.99, delay / (duration * 10000000.0))

可见进程的io百分比的计算来自于delay占统计时间(在iotop中为1s)的比值。而delay来自于进程TaskStats的blkio_delay_total,内核中具体的定义如下:

1
2
3
4
5
6
7
/* Following four fields atomically updated using task->delays->lock */

/* Delay waiting for synchronous block I/O to complete
* does not account for delays in I/O submission
*/

__u64 blkio_count;
__u64 blkio_delay_total;

内核注释写得很明确了,统计的是同步block I/O的等待时间,并不包括I/O提交延迟。
另一个问题是blkio_delay_total是什么时候被统计的?从内核代码中看,blkio_delay_total是由两个函数delayacct_blkio_start组合使用进行统计的。一个典型也是与本问题相关的使用是io_schedule。

1
2
3
4
5
6
7
8
9
10
11
12
void __sched io_schedule(void)
{

struct rq *rq = raw_rq();

delayacct_blkio_start();
atomic_inc(&rq->nr_iowait);
current->in_iowait = 1;
schedule();
current->in_iowait = 0;
atomic_dec(&rq->nr_iowait);
delayacct_blkio_end();
}

从上面代码看,在线程io调度之前开始统计,在线程调度回来之后结束统计。因而io百分比表示的是因IO被调度出去的时间占墙钟时间的比例。
因而,flush线程这么大io百分比肯定是执行了io操作。

flush线程的io操作

按照我的理解,flush线程的工作应该是将page cache中的dirty页传递到bio层,期间不会涉及到io操作。不过iotop的结果显然被狠狠地打脸了。跟踪一下代码看看。
flush线程在mm/Backing-dev.c文件中被创建:

1
2
kthread_run(bdi_start_fn, wb, "flush-%s",
dev_name(bdi->dev))

线程函数穿越VFS接口最终到了ext4的ext4_da_writepages函数,这个函数负责最终的脏页write-back操作。整个代码过程看起来都很正常,期间还遇到了ext4的日志线程相关(jdb2)处理:

1
2
3
4
5
handle = ext4_journal_start(inode, needed_blocks);
......
ext4_journal_stop(handle);
......
jbd2_journal_force_commit_nested(sbi->s_journal);

这也很好理解,更新操作自然涉及到文件系统日志。然而,到以下几行代码时,我有点儿犯迷糊了

1
2
3
4
5
6
if (!mpd.io_done && mpd.next_page != mpd.first_page) {
if (mpage_da_map_blocks(&mpd) == 0)
mpage_da_submit_io(&mpd);
mpd.io_done = 1;
ret = MPAGE_DA_EXTENT_TAIL;
}

按常理而言,如果io_done未全部完成,继续提交就好了,这个map_blocks有些诡异。跟进去发现,这个mpage_da_map_blocks还涉及到了向磁盘申请Block相关的操作,预感这应该是问题所在了。

文件系统太复杂了,遇到了不太理解的问题,看看有没有哪位填友能帮我答疑解惑。万能的搜索引擎一搜,发现了kai_ding的文章ext4的延迟分配。好文共分享,我决定将这篇文章转载到我的博客中。
从kai_ding的文章中可以看到,ext4提供延迟分配机制:页面在写回时再分配物理磁盘块与之对应。mpage_da_map_blocks()负责分配磁盘页并建立映射关系。
这也就解释了为什么flush线程会有这么大的io百分比。但是99.99%的io占比实在是太令人发指了,相当于flush没有时间干正事了。接下来是如何想办法如何解决它。

本文转自kai_ding博客,好文跟大家分享
Ext4文件系统在应用程序调用write的时候并不为缓存页面分配对应的物理磁盘块,当文件的缓存页面真正要被刷新至磁盘中时,ext4会为所有未分配物理磁盘块的页面缓存分配尽量连续的磁盘块。
Linux文件系统Vfs层总是将应用程序的写入请求分割成页面(默认大小4KB)为单位,对于每个页面,VFS会检查其是否已经为其创建了buffer_head结构,如果没有创建,则为其创建buffer_head,否则检查每个buffer_head的状态,如该buffer_head是否已经与物理磁盘块建立映射等,这些功能是由write_begin()函数实现的,该函数是由VFS提供的一个接口,具体文件系统负责实现该接口,如ext3文件系统的ext3_write_begin()。而ext4如果采用了delay allocation特性的话,其实现的函数为ext4_da_write_begin()。
ext4_da_write_begin()会检查页面所有的buffer_head的状态,如buffer_head是否已经建立映射等,对于没有建立映射的buffer_head,需要将其与物理磁盘块建立映射关系,调用的函数是ext4_da_get_block_prep(),该函数又调用了ext4_map_blocks()来建立逻辑块和物理磁盘块的映射关系,如果启用extent特性的话,那么该函数又调用了ext4_ext_map_blocks(handle, inode, map, 0)来建立这种映射关系,之所以列举该函数的参数是要特别注意其最后一个参数0,这是一个标志位,调用者ext4_map_blocks()将该标志位设置为0,告诉被调用者,如果没有建立映射关系,那么此刻无需真正地分配物理磁盘块。这在ext4_ext_map_blocks函数中可以看到会有对该标志位的判断。就不再详细列举代码了。因为在ext4_map_blocks()中并没有建立映射关系,因此其向ext4_da_get_block_prep()返回0,表示没有映射,在ext4_da_get_block_prep()函数中判断如果返回值为0,那么为当前block设置标志位BH_New,BH_Mapped,BH_Delay(表示该块在写入的时候再进行延迟分配)。
1
2

以上我们确认了在ext4中延迟分配的前半部分,即应用程序将数据写入文件时只是简单地将数据以页面为单位写入页面缓存中,而并不真正地为其分配物理磁盘块。接下来我们要弄明白的是,ext4何时会为缓存中未分配物理磁盘块的缓存分配磁盘空间。

当刷新线程开始将脏的缓存页面写回至物理磁盘时,根据我们之前的描述,回写线程会以文件为单位进行回写,即对脏inode链表上的所有脏inode依次回写。对于每个脏inode(也即每个脏的文件),回写线程会将inode上的所有脏页面进行回写,这时候就需要判断每个脏页面的状态了。

回写线程中实现的时候将一个文件的脏页面分多次进行回写,每个回写一部分脏页面。关于回写机制可参考Linux的“脏页面回写机制”。回写脏页面最终调用到函数writepages,与writebegin一样,它也是VFS提供的一个虚拟接口,由具体文件系统负责相应的实现。对于采用延迟分配的ext4文件系统来说,该函数的具体实现是ext4_da_writepages()。该函数中实现的时候有三个要点:

要将逻辑上连续的脏且尚未建立磁盘块映射到物理页面形成一个extent,以便可采用ext4的mblock分配策略提升文件连续性,这也是我们后面要介绍的内容;
对1中连续页面形成的extent,为其进行磁盘块分配,分配采用了ext4的mblock allocation策略。
提交2中的extent至bio层完成脏页面的写入,此时已经为尚未映射的缓存页面分配了物理磁盘块。

3

上图描述了延迟分配的核心思想:等到刷新脏缓存页面时再建立脏页面与物理磁盘块之间的联系,而且,分配之前将逻辑上连续的文件块映射至物理上连续的磁盘块。在ext4_da_writepages()函数中调用了三个非常重要的函数来完成上述功能:write_cache_pages_da()负责将逻辑上连续的文件块合并成一个extent;mpage_da_map_blocks()负责为合并后的extent建立映射关系;mpage_da_submit_io()负责提交上面映射的extent。

今天将HiBench移到了Arm64平台,想看看Arm平台Hadoop的表现。毕竟是Java+Python平台,整个移植过程非常顺利。点杆执行,然后喝咖啡去了。回来一看,report下的monitor.log及monitor.html都没有生成。大呼又踩到坑了。

好吧,习惯了修修补补,让我们看看又有哪些问题。
HiBench的监控脚本最终会走到monitor.py, __main__函数走下发现输出被丢了,难怪被坑得无影无踪。

1
2
3
os.close(0)
os.close(1)
os.close(2)

把这几行注释掉,Bug现出了原形:Int类型转换Exception,“Enjoy~ 无法转换成Int”。
忽然想起公司同事有用SSH Banner做标记的情形,跟“在此撒泡尿,这段时间这个服务器归我”差不多。这个倒霉的服务器就被标上了

1
Enjoy~

仔细看了一眼发现原来HiBench使用SSH登录的时间戳作为记录时间,默认SSH登录时返回的就是登录时间,没想到一个Banner被我给撞上了。
好吧,向HiBench git上提交一个issue,patch就让他们自己打吧,我太懒了。

最近在忙着优化Hadoop集群。遇到的问题是Disk利用率一直没有达到物理带宽,其他资源也没有出现达到瓶颈的迹象。这让我颇为不爽,决定一探究竟。
首先使用的工具是iotop。熟悉iotop的同学都知道,iotop执行时在终端顶端出现Total/Actual DISK READ/WRITE四个值。Total与Actual的区别也让我费了一段时间,我的理解是:Total值是bio层从file cache到ioschedule传递的数据量;Actual是ioschedule到disk的之间的数据量。
在跟另外一个运行良好的Hadoop集群对比之后我发现这个问题集群的Actual Disk数据有所差距。于是我决定对Syscall层、file cache层的数据通量做一个全面的分析。这就要求对iotop的源码做一些小的修改。

iotop的工作原理

分析iotop的源码过程让我深刻了解了TaskStats机制。内核Document对TaskStats的描述是:

TaskStats
从上面的描述来看,TaskStats是一个用于获得每个进程执行数据的Netlink接口。iotop的源码中也是通过Netlink向TaskStats请求每个进程read_bytes、write_bytes等获得进程的I/O数据从而累加起来得到Total Disk相关数据的(Actual Disk数据则是从/proc/vmstat中的pgpgin及pgpgout获得)。
于是我分析了内核对于TaskStats的定义,看看是否有对I/O系统调用次数及系统调用数据量累积相关统计。运气还不错,内核对TaskStats的定义如下:
TaskStats-Define
从上面定义我们看到了read/write_char及read/write_syscalls几个统计变量,从内核的描述可以看出这就是对系统调用请求数据量以及系统调用次数的统计。这让我们想让iotop输出这两个信息的希望成为了可能。

对iotop稍作修改

接下来我们看看iotop对于从TaskStats请求返回的数据的处理方式。在代码中我看到了一个数据结构来记录返回数据偏移对应的内容相得情况。这就非常明了了,iotop请求返回的就是TaskStats的结构。于是我们照葫芦画瓢,在结构中添加了几项,获取想要的i/o信息。修改后结构如下:

code-add
剩下的修改就是修改一些累计方式以及显示方式了,这就没有必要详述了。

最终效果

修改之后的最终效果当然就是newiotop能够显示read/write syscall的数目及syscall请求的数据量了。如下图所示:

ui

如果有同学需要使用这些代码我很乐意分享给大家,请不要犹豫邮件、微博等跟我联系

互联网公司的工作是技术驱动的。互联网公司的高速运转给我们提供了很多深入的机会,每天的工作都能接触到错综复杂的技术,当你越是深入、越是踏实时,你接触的技术就越是细、越是复杂。
一直就打算写博客。以前没毕业时也写过一些,终究没有坚持下来。一个重要的原因当时接触的东西比较简单,不够深入就不好意思分享。想法虽然简单,却也情有可原。
博客,是跟自己在说话。无须矫情,也无法虚假。
希望我的思考能够帮助其他人。

博客文章的三个功能

审视技术的扎实程度

1
2
写作促进思考。通过文字的表达带动自己对技术的审视-故事是否完整、脉络是否清晰等。
一个完整的技术故事,对于技术的提升大有裨益。

希望能够帮助他人

1
技术是开放的。我自己深受其害的坑不希望其他人再踩一次。我也非常感激之前让我跳过很多坑的博主们。

备忘

1
除非填坑过程坎坷,很多技术细节很容易忘记。博客就像一条走过的路,偶尔回首总能温故知新。