This is the example of custom partitioner for classic wordcount program.
Driver Class:
We are partitioning keys based on the first letter, so we will have 27 partitions, 26 for each partition plus 1 other characters. Below are the additional things in Driver class.
- job.setNumReduceTasks(26);
- job.setPartitionerClass(WordcountPartitioner.class);
package org.puneetha.customPartitioner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordcountDriver extends Configured implements Tool { public int run(String[] args) throws Exception { if (args.length != 2) { System.out.println("Usage: [input] [output]"); System.exit(-1); } Job job = Job.getInstance(getConf()); job.setJobName("wordcount"); job.setJarByClass(WordcountDriver.class); job.setNumReduceTasks(26); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(WordcountMapper.class); job.setPartitionerClass(WordcountPartitioner.class); job.setCombinerClass(WordcountReducer.class); job.setReducerClass(WordcountReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path inputFilePath = new Path(args[0]); Path outputFilePath = new Path(args[1]); /* This line is to accept input recursively */ FileInputFormat.setInputDirRecursive(job, true); FileInputFormat.addInputPath(job, inputFilePath); FileOutputFormat.setOutputPath(job, outputFilePath); /*Delete output filepath if already exists*/ FileSystem fs = FileSystem.newInstance(getConf()); if (fs.exists(outputFilePath)) { fs.delete(outputFilePath, true); } return job.waitForCompletion(true) ? 0: 1; } public static void main(String[] args) throws Exception { WordcountDriver wordcountDriver = new WordcountDriver(); int res = ToolRunner.run(wordcountDriver, args); System.exit(res); } }
Mapper Class:
package org.puneetha.customPartitioner; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordcountMapper extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } }
Partitioner Class:
package org.puneetha.customPartitioner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * Custom partitioner to ensure that the partitioning is happening based on the * key */ public class WordcountPartitioner extends Partitioner { @Override public int getPartition(Text key, IntWritable value, int numPartitions) { String word = key.toString(); char letter = word.toLowerCase().charAt(0); return (int) letter - 97; } /* Other Implementation of partitioner * If using this partition code, set reducer number to 27 in Driver class Ex: job.setNumReduceTasks(27); @Override public int getPartition(Text key, IntWritable value, int numPartitions) { String word = key.toString(); char letter = word.toLowerCase().charAt(0); int partitionNumber = 0; switch(letter) { case 'a': partitionNumber = 1; break; case 'b': partitionNumber = 2; break; case 'c': partitionNumber = 3; break; case 'd': partitionNumber = 4; break; case 'e': partitionNumber = 5; break; case 'f': partitionNumber = 6; break; case 'g': partitionNumber = 7; break; case 'h': partitionNumber = 8; break; case 'i': partitionNumber = 9; break; case 'j': partitionNumber = 10; break; case 'k': partitionNumber = 11; break; case 'l': partitionNumber = 12; break; case 'm': partitionNumber = 13; break; case 'n': partitionNumber = 14; break; case 'o': partitionNumber = 15; break; case 'p': partitionNumber = 16; break; case 'q': partitionNumber = 17; break; case 'r': partitionNumber = 18; break; case 's': partitionNumber = 19; break; case 't': partitionNumber = 20; break; case 'u': partitionNumber = 21; break; case 'v': partitionNumber = 22; break; case 'w': partitionNumber = 23; break; case 'x': partitionNumber = 24; break; case 'y': partitionNumber = 25; break; case 'z': partitionNumber = 26; break; default: partitionNumber = 0; break; } return partitionNumber; }*/ }
Reducer Class:
package org.puneetha.customPartitioner; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountReducer extends Reducer { private IntWritable totalWordCount = new IntWritable(); @Override public void reduce(final Text key, final Iterable values, final Context context) throws IOException, InterruptedException { int sum = 0; Iterator iterator = values.iterator(); while (iterator.hasNext()) { sum += iterator.next().get(); } totalWordCount.set(sum); context.write(key, totalWordCount); } }
Run as below:
$ hadoop jar org.puneetha-0.0.1-SNAPSHOT.jar org.puneetha.customPartitioner.WordcountDriver /user/dummyuser/wordcount/input /user/dummyuser/wordcount/output
How will you reduce the partitions(should not create part file itself) if a record not starts with a particular letter ?