揭底teragen
昨天跟同事们谈起TeraSort的一些问题,忽然想到TeraGen。之前对TeraGen有一个大概的概念-TeraSort的数据生成器,生成100byte的Record。既然好奇,这次就索性分析一下TeraGen的源码。
首先来看看TeraGen的启动代码:1
2
3
4
5
6
7
8
9
10
11
12setNumberOfRows(job, parseHumanLong(args[0]));
Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraGen");
job.setJarByClass(TeraGen.class);
job.setMapperClass(SortGenMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(RangeInputFormat.class);
job.setOutputFormatClass(TeraOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
TeraGen的输入一共有两个,num rows以及 output dir。从上面初始化代码部分可以看到,TeraGen使用的map类是SortGenMapper,不需要Reduce阶段,InputFormat类是RangeInputFormat,OutputFormat类是TeraOutputFormat。
首先,我们分析Map阶段的输入。根据MapReduce程序的执行流程,在Map阶段之前系统将首先分析输入文件,计算Split等,因而我们先看RangeInputFormat类:1
2
3
4
5
6
7
8
9
10
11
12
13
14public List<InputSplit> getSplits(JobContext job) {
long totalRows = getNumberOfRows(job);
int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
LOG.info("Generating " + totalRows + " using " + numSplits);
List<InputSplit> splits = new ArrayList<InputSplit>();
long currentRow = 0;
for(int split = 0; split < numSplits; ++split) {
long goal =
(long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
splits.add(new RangeInputSplit(currentRow, goal - currentRow));
currentRow = goal;
}
return splits;
}
先看getSplits方法,该方法由Hadoop框架调用,获得split的描述数组,每个split将对应一个map任务。从上面的代码可以看出,numSplits由NUM_MAPS输入项确定,初次之外生成数据总行数也由配置输入。对于每一个Split,函数中计算Split的初始值以及行数。也就是说,每个Map需要产生数据的开始行以及行数都由Split中保存好了。
1 | public void map(LongWritable row, NullWritable ignored, |
我们再看看map函数。按照Hadoop的执行流程,框架通过haveNextValue和getCurrentKey从RecordReader中获取输入,并传递给map函数。从TeraGen源码中的RangeRecordReader我们可以知道,getCurrentKey返回的就是当前的Row号。从Map函数可知,map使用rowid生成一个100字节的数据保存在Buffer中,并以10字节为key、90字节为value的形式写入到Context中。
在Hadoop中,当Reduce数设置为0时,map的输出将通过配置输出方式输出,在TeraGen中将使用TeraOutputFormat输出。我们看看这个输出类中的实现。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23static class TeraRecordWriter extends RecordWriter<Text,Text> {
private boolean finalSync = false;
private FSDataOutputStream out;
public TeraRecordWriter(FSDataOutputStream out,
JobContext job) {
finalSync = getFinalSync(job);
this.out = out;
}
public synchronized void write(Text key,
Text value) throws IOException {
out.write(key.getBytes(), 0, key.getLength());
out.write(value.getBytes(), 0, value.getLength());
}
public void close(TaskAttemptContext context) throws IOException {
if (finalSync) {
out.sync();
}
out.close();
}
}
我们可以看到,最终Map结束以后数据以先Key后Value的形式写入到HDFS。
按照Hadoop的框架,map函数将key/value对输出到Sort缓冲区中,当缓冲区满时spill线程按照key对数据进行排序,并输出到磁盘。至此,TeraGen的整个数据生成过程理清了。
现在还有一个问题,在计算TeraGen的进度时,是否会包括HDFS的操作时间?当任务进度为100%,是最后一个key/value对刚刚写入Context还是所有的数据都已经写入到HDFS?这个问题我们将在之后继续探究。