Rename reducer output part file – using Mapreduce code (with new hadoop api 2)

By | October 25, 2014

Below is the code to rename our reducer output part file name from “part-*” to “customName-*”.

I am using the classic wordcount example(You can check out the basic implementation here)

Driver Class:
In Driver class:

  • LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); – for avoiding the creation of empty default partfiles
  • MultipleOutputs.addNamedOutput(job, “text”, TextOutputFormat.class,Text.class, IntWritable.class); – for adding new name to the part file

package org.puneetha.wordcountRenameOutputPartFile;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordcountDriver extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		if (args.length != 2) {
			System.out.println("Usage: [input] [output]");
			System.exit(-1);
		}

		Job job = Job.getInstance(getConf());
		job.setJobName("wordcount");
		job.setJarByClass(WordcountDriver.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setMapperClass(WordcountMapper.class);
		job.setCombinerClass(WordcountReducer.class);
		job.setReducerClass(WordcountReducer.class);

		job.setInputFormatClass(TextInputFormat.class);

		/*
		 * Using MultipleOutputs creates zero-sized default output Ex:
		 * part-r-00000. To prevent this use LazyOutputFormat
		 */
		// job.setOutputFormatClass(TextOutputFormat.class);
		LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
		MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,Text.class, IntWritable.class);

		Path inputFilePath = new Path(args[0]);
		Path outputFilePath = new Path(args[1]);

		/* This line is to accept input recursively */
		FileInputFormat.setInputDirRecursive(job, true);

		FileInputFormat.addInputPath(job, inputFilePath);
		FileOutputFormat.setOutputPath(job, outputFilePath);

		/*
		 * Delete output filepath if already exists
		 */
		FileSystem fs = FileSystem.newInstance(getConf());

		if (fs.exists(outputFilePath)) {
			fs.delete(outputFilePath, true);
		}

		return job.waitForCompletion(true) ? 0: 1;
	}

	public static void main(String[] args) throws Exception {
		WordcountDriver wordcountDriver = new WordcountDriver();
		int res = ToolRunner.run(wordcountDriver, args);
		System.exit(res);
	}
}

Mapper Class:
No changes in Mappr class.


package org.puneetha.wordcountRenameOutputPartFile;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordcountMapper extends
		Mapper<longWritable, Text, Text, IntWritable> {

	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();

	@Override
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();

		StringTokenizer tokenizer = new StringTokenizer(line);
		while (tokenizer.hasMoreTokens()) {
			word.set(tokenizer.nextToken());
			context.write(word, one);
		}
	}

	public void run(Context context) throws IOException, InterruptedException {
		setup(context);
		while (context.nextKeyValue()) {
			map(context.getCurrentKey(), context.getCurrentValue(), context);
		}
		cleanup(context);
	}
}

Reducer Class:

  • Instead of context.write, use multipleOutputs.write(key, totalWordCount, generateFileName(key,totalWordCount ));
  • generateFileName method is used to generate desirable output filenames

package org.puneetha.wordcountRenameOutputPartFile;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class WordcountReducer extends Reducer<text, IntWritable, Text, IntWritable> {
	
	private MultipleOutputs<text, IntWritable> multipleOutputs;
	private IntWritable totalWordCount = new IntWritable();

	@Override
	public void reduce(final Text key, final Iterable<intWritable> values,
			final Context context) throws IOException, InterruptedException {

		int sum = 0;
		Iterator<intWritable> iterator = values.iterator();

		while (iterator.hasNext()) {
			sum += iterator.next().get();
		}

		totalWordCount.set(sum);
		// context.write(key, new IntWritable(sum));
		//context.write(key, totalWordCount);
		multipleOutputs.write(key, totalWordCount, generateFileName(key,totalWordCount ));
	}
	
	String generateFileName(Text key, IntWritable value){
		return key.toString() + "_" + value.toString();		
	}
	
	@Override
	public void setup(Context context){
		multipleOutputs = new MultipleOutputs<text, IntWritable>(context);
	}
	
	@Override
	public void cleanup(final Context context) throws IOException, InterruptedException{
		multipleOutputs.close();
	}
}

Run as below:


$ hadoop jar org.puneetha-0.0.1-SNAPSHOT.jar org.puneetha.wordcountRenameOutputPartFile.WordcountDriver /user/dummyuser/wordcount/input /user/dummyuser/wordcount/output

Expected output filenames: Ex: key_value-m-00000


apple_2-m-00000
cat_2-m-00000
dog_1-m-00000
horse_1-m-00000
orange_1-m-00000

Reference: http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html

3 thoughts on “Rename reducer output part file – using Mapreduce code (with new hadoop api 2)

  1. Ravi theja B

    Amazing code. But how come the files names in reducer code is coming as -m-00000 instead of -r-00000. My reducer output is not as I expected. Key related values are not appearing in the specified file name files. Can you help me?

    Reply
  2. Sudarshan

    How to remove r-00000 extention from reducer output in mapreduce

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *