文章目录

昨天跟同事们谈起TeraSort的一些问题,忽然想到TeraGen。之前对TeraGen有一个大概的概念-TeraSort的数据生成器,生成100byte的Record。既然好奇,这次就索性分析一下TeraGen的源码。

首先来看看TeraGen的启动代码:

1
2
3
4
5
6
7
8
9
10
11
12
setNumberOfRows(job, parseHumanLong(args[0]));
Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraGen");
job.setJarByClass(TeraGen.class);
job.setMapperClass(SortGenMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(RangeInputFormat.class);
job.setOutputFormatClass(TeraOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;

TeraGen的输入一共有两个,num rows以及 output dir。从上面初始化代码部分可以看到,TeraGen使用的map类是SortGenMapper,不需要Reduce阶段,InputFormat类是RangeInputFormat,OutputFormat类是TeraOutputFormat。

首先,我们分析Map阶段的输入。根据MapReduce程序的执行流程,在Map阶段之前系统将首先分析输入文件,计算Split等,因而我们先看RangeInputFormat类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public List<InputSplit> getSplits(JobContext job) {
long totalRows = getNumberOfRows(job);
int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
LOG.info("Generating " + totalRows + " using " + numSplits);
List<InputSplit> splits = new ArrayList<InputSplit>();
long currentRow = 0;
for(int split = 0; split < numSplits; ++split) {
long goal =
(long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
splits.add(new RangeInputSplit(currentRow, goal - currentRow));
currentRow = goal;
}
return splits;
}

先看getSplits方法,该方法由Hadoop框架调用,获得split的描述数组,每个split将对应一个map任务。从上面的代码可以看出,numSplits由NUM_MAPS输入项确定,初次之外生成数据总行数也由配置输入。对于每一个Split,函数中计算Split的初始值以及行数。也就是说,每个Map需要产生数据的开始行以及行数都由Split中保存好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void map(LongWritable row, NullWritable ignored,
Context context) throws IOException, InterruptedException
{

if (rand == null) {
rowId = new Unsigned16(row.get());
rand = Random16.skipAhead(rowId);
checksumCounter = context.getCounter(Counters.CHECKSUM);
}
Random16.nextRand(rand);
GenSort.generateRecord(buffer, rand, rowId);
key.set(buffer, 0, TeraInputFormat.KEY_LENGTH);
value.set(buffer, TeraInputFormat.KEY_LENGTH,
TeraInputFormat.VALUE_LENGTH);
context.write(key, value);
crc32.reset();
crc32.update(buffer, 0,
TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH);
checksum.set(crc32.getValue());
total.add(checksum);
rowId.add(ONE);
}

我们再看看map函数。按照Hadoop的执行流程,框架通过haveNextValue和getCurrentKey从RecordReader中获取输入,并传递给map函数。从TeraGen源码中的RangeRecordReader我们可以知道,getCurrentKey返回的就是当前的Row号。从Map函数可知,map使用rowid生成一个100字节的数据保存在Buffer中,并以10字节为key、90字节为value的形式写入到Context中。
在Hadoop中,当Reduce数设置为0时,map的输出将通过配置输出方式输出,在TeraGen中将使用TeraOutputFormat输出。我们看看这个输出类中的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static class TeraRecordWriter extends RecordWriter<Text,Text> {
private boolean finalSync = false;
private FSDataOutputStream out;

public TeraRecordWriter(FSDataOutputStream out,
JobContext job)
{

finalSync = getFinalSync(job);
this.out = out;
}

public synchronized void write(Text key,
Text value) throws IOException
{

out.write(key.getBytes(), 0, key.getLength());
out.write(value.getBytes(), 0, value.getLength());
}

public void close(TaskAttemptContext context) throws IOException {
if (finalSync) {
out.sync();
}
out.close();
}
}

我们可以看到,最终Map结束以后数据以先Key后Value的形式写入到HDFS。

按照Hadoop的框架,map函数将key/value对输出到Sort缓冲区中,当缓冲区满时spill线程按照key对数据进行排序,并输出到磁盘。至此,TeraGen的整个数据生成过程理清了。

现在还有一个问题,在计算TeraGen的进度时,是否会包括HDFS的操作时间?当任务进度为100%,是最后一个key/value对刚刚写入Context还是所有的数据都已经写入到HDFS?这个问题我们将在之后继续探究。

文章目录