Wordcount Mapreduce program – using Hadoop new API 2

By | October 25, 2014

Below is the classic wordcount example, using new api.
If you are using maven, you can use the pom.xml given here. Change it according to the hadoop distribution/version you are using.

Input Text:


$vim input.txt
cat dog apple cat horse orange apple

$hadoop fs -mkdir -p /user/dummyuser/wordcount/input
$hadoop fs -put input.txt /user/dummyuser/wordcount/input/

Driver Class:


package org.puneetha.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordcountDriver extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		if (args.length != 2) {
			System.out.println("Usage: [input] [output]");
			System.exit(-1);
		}

		Job job = Job.getInstance(getConf());
		job.setJobName("wordcount");
		job.setJarByClass(WordcountDriver.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setMapperClass(WordcountMapper.class);
		job.setCombinerClass(WordcountReducer.class);
		job.setReducerClass(WordcountReducer.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		Path inputFilePath = new Path(args[0]);
		Path outputFilePath = new Path(args[1]);

		/* This line is to accept the input recursively */
		FileInputFormat.setInputDirRecursive(job, true);

		FileInputFormat.addInputPath(job, inputFilePath);
		FileOutputFormat.setOutputPath(job, outputFilePath);

		/*
		 * Delete output filepath if already exists
		 */
		FileSystem fs = FileSystem.newInstance(getConf());

		if (fs.exists(outputFilePath)) {
			fs.delete(outputFilePath, true);
		}

		return job.waitForCompletion(true) ? 0: 1;
	}

	public static void main(String[] args) throws Exception {
		WordcountDriver wordcountDriver = new WordcountDriver();
		int res = ToolRunner.run(wordcountDriver, args);
		System.exit(res);
	}
}

Mapper Class:


package org.puneetha.wordcount;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordcountMapper extends Mapper<longWritable, Text, Text, IntWritable> {

	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();

	@Override
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();

		StringTokenizer tokenizer = new StringTokenizer(line);
		while (tokenizer.hasMoreTokens()) {
			word.set(tokenizer.nextToken());
			context.write(word, one);
		}
	}

	public void run(Context context) throws IOException, InterruptedException {
		setup(context);
		while (context.nextKeyValue()) {
			map(context.getCurrentKey(), context.getCurrentValue(), context);
		}
		cleanup(context);
	}

}

Reducer Class:


package org.puneetha.wordcount;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountReducer extends
		Reducer<text, IntWritable, Text, IntWritable> {

	private IntWritable totalWordCount = new IntWritable();

	@Override
	public void reduce(final Text key, final Iterable<intWritable> values,
			final Context context) throws IOException, InterruptedException {

		int sum = 0;
		Iterator<intWritable> iterator = values.iterator();

		while (iterator.hasNext()) {
			sum += iterator.next().get();
		}

		totalWordCount.set(sum);
		// context.write(key, new IntWritable(sum));
		context.write(key, totalWordCount);
	}
}

pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
	<modelVersion>4.0.0</modelVersion>


	<groupId>Wordcount</groupId>
	<artifactId>org.puneetha</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>Wordcount</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<hadoop.version>2.5.0-cdh5.2.0</hadoop.version>
		<log4j.version>1.2.17</log4j.version>
		<maven_jar_plugin.version>2.5</maven_jar_plugin.version>
	</properties>

	<dependencies>
		<!-- Log4j - Logging -->
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>${log4j.version}</version>
		</dependency>


		<!-- Cloudera Core Dependencies -->
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>${hadoop.version}</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>${maven_jar_plugin.version}</version>
			</plugin>
		</plugins>
	</build>

	<repositories>
		<repository>
			<id>cloudera-repo</id>
			<url>http://repository.cloudera.com/artifactory/cloudera-repos/</url>
		</repository>
	</repositories>

</project>

Run as below:


$ hadoop jar org.puneetha-0.0.1-SNAPSHOT.jar org.puneetha.wordcount.WordcountDriver /user/dummyuser/wordcount/input /user/dummyuser/wordcount/output

Expected Output:


cat  2
dog  1
apple  2
horse  1
orange  1

9 thoughts on “Wordcount Mapreduce program – using Hadoop new API 2

  1. kishorer747

    It is great!!. Can you please mention how to change the reducer class for custom output, like Reducer? I want to do some appending and both by output key and values has to be Text.

    Please reply.

    Reply
    1. puneetha Post author

      Lets say:
      Mapper output: Word1, (Doc1,Doc2)
      Desired reducer output: Word1, Doc1 | Doc2
      Code looks like below in reducer:

      import java.io.IOException;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Reducer;
      
      public class WordcountReducer extends Reducer {
      	@Override
      	public void reduce(final Text key, final Iterable values,
      			final Context context) throws IOException, InterruptedException {
      
      		StringBuilder stringBuilder = new StringBuilder();
      
      		for (Text value : values) {
      			stringBuilder.append(value.toString());
      
      			if (values.iterator().hasNext()) {
      				stringBuilder.append(" | ");
      			}
      		}
      		context.write(key, new Text(stringBuilder.toString()));
      	}
      }

      If you want to add a delimited(Ex: =>) between key and value in reducer output, you can set the below property in driver class:
      job.getConfiguration().set(“mapreduce.output.textoutputformat.separator”, ” => “);
      Output looks like below then:
      Desired reducer output: Word1 => Doc1 | Doc2

      Reply
  2. Punith

    Hi Puneetha. Nice example for new API. I am just tying this code but getting Running error like,
    Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/log4j/Level
    at org.apache.hadoop.mapred.JobConf.(JobConf.java:348)
    at org.apache.hadoop.mapreduce.Job.getInstance(Job.java:72)
    at com.nielsen.WordCount.WordCountDriver.run(WordCountDriver.java:25)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
    at com.nielsen.WordCount.WordCountDriver.main(WordCountDriver.java:62)
    Can you please tell me what libraries need to be added for this code and resolve this error?

    Thanks.

    Reply
  3. mukesh somvanshi

    hii mam, i have created jar file using this code, but i have problem that how to run these jar file in hadoop command prompt.

    Reply
  4. Miral Godhani

    Hi,

    I want to customize output format of map class and input format of reduce class.
    Like I have multypal files and I want index word count. Output like this:

    Amezon { textfile1.txt=2,textfile2.txt=2}
    Total number of occurrence of word “Amezon” : 4

    Can you please help me what type of changes I need to do.

    Reply
  5. Swapnil Shinde

    Need Help

    Dear Sir/Madam,

    ​First of all I would like to thank you for providing a very good E-Resource for hadoop beginners like me.

    I want to implement following two job using MapReduce framework.

    So please see if you can provide some help or reference for it.

    Thank you in advance.

    1. Design a distributed application using MapReduce under Hadoop for finding maximum number in first and second columns in every line of a given text file.

    ​2. Design application using MapReduce under Hadoop for Character counting in a given text file.

    Reply
  6. prashant joshi

    int sum = 0;
    Iterator iterator = values.iterator();

    while (iterator.hasNext()) {
    sum += iterator.next().get();
    }
    In the above code if we use sum++ instead of sum += iterator.next().get(); then it is not giving the correct output.
    please explain .I’m had spent hours finding this but i don’t able to find this soloution .

    Reply
  7. Manoj

    cant find the method setInputDirRecursive in org.apache.hadoop.mapreduce.lib.input.FileInputFormat.
    I’m using API of Hadoop Version 2.0.0 CDH4.3.0.
    can you please update method for this version?

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *