Inverted Index – Mapreduce program

By | December 5, 2014

What is Inverted Index?!
In computer science, an inverted index (also referred to as postings file or inverted file) is an index data structure storing a mapping from content, such as words or numbers, to its locations in a database file, or in a document or a set of documents. Read more here

Input files


$vi document1.txt
Hadoop World

$vi document2.txt
Hadoop World is very interesting

Driver Class:


package org.puneetha.InvertedIndex;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordcountDriver extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		if (args.length != 2) {
			System.out.println("Usage: [input] [output]");
			System.exit(-1);
		}

		Job job = Job.getInstance(getConf());
		job.setJobName("wordcount");
		job.setJarByClass(WordcountDriver.class);
		
		/* Field separator for reducer output*/
		job.getConfiguration().set("mapreduce.output.textoutputformat.separator", " | ");
		
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setMapperClass(WordcountMapper.class);
		job.setCombinerClass(WordcountReducer.class);
		job.setReducerClass(WordcountReducer.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		Path inputFilePath = new Path(args[0]);
		Path outputFilePath = new Path(args[1]);

		/* This line is to accept input recursively */
		FileInputFormat.setInputDirRecursive(job, true);

		FileInputFormat.addInputPath(job, inputFilePath);
		FileOutputFormat.setOutputPath(job, outputFilePath);


		/* Delete output filepath if already exists */
		FileSystem fs = FileSystem.newInstance(getConf());

		if (fs.exists(outputFilePath)) {
			fs.delete(outputFilePath, true);
		}
		
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		WordcountDriver wordcountDriver = new WordcountDriver();
		int res = ToolRunner.run(wordcountDriver, args);
	    System.exit(res);
	}
}

Mapper Class:


package org.puneetha.InvertedIndex;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WordcountMapper extends
		Mapper<longWritable, Text, Text, Text> {

	private Text word = new Text();
	private Text filename = new Text();

	private boolean caseSensitive = false;

	@Override
	public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String filenameStr = ((FileSplit) context.getInputSplit()).getPath().getName();
		filename = new Text(filenameStr);
		
		String line = value.toString();

		if (!caseSensitive) {
			line = line.toLowerCase();
		}

		StringTokenizer tokenizer = new StringTokenizer(line);
		while (tokenizer.hasMoreTokens()) {
			word.set(tokenizer.nextToken());			
			context.write(word, filename);
		}
	}

	@Override
	protected void setup(Context context) throws IOException, InterruptedException {
		Configuration conf = context.getConfiguration();
		this.caseSensitive = conf.getBoolean("wordcount.case.sensitive",false);
	}
}

Reeducer Class:


package org.puneetha.InvertedIndex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountReducer extends Reducer<text, Text, Text, Text> {

	@Override
	public void reduce(final Text key, final Iterable<text> values,
			final Context context) throws IOException, InterruptedException {

		StringBuilder stringBuilder = new StringBuilder();

		for (Text value : values) {
			stringBuilder.append(value.toString());

			if (values.iterator().hasNext()) {
				stringBuilder.append(" -> ");
			}
		}

		context.write(key, new Text(stringBuilder.toString()));
	}

}

If you are using maven, use this pom.xml and alter according to the cdh version you are using


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>Wordcount</groupId>
	<artifactId>org.puneetha</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>Wordcount</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<hadoop.version>2.5.0-cdh5.2.0</hadoop.version>
		<log4j.version>1.2.17</log4j.version>
		<maven_jar_plugin.version>2.4</maven_jar_plugin.version>
	</properties>

	<dependencies>
		<!-- Log4j - Logging -->
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>${log4j.version}</version>
		</dependency>

		<!-- Cloudera Core Dependencies -->
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>${hadoop.version}</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>${maven_jar_plugin.version}</version>
			</plugin>
		</plugins>
	</build>

	<repositories>
		<repository>
			<id>cloudera-repo</id>
			<url>http://repository.cloudera.com/artifactory/cloudera-repos/</url>
		</repository>
	</repositories>
</project>

Run as below:


$hadoop jar org.puneetha-0.0.1-SNAPSHOT.jar org.puneetha.InvertedIndex.WordcountDriver /hadoop/path/to/input /hadoop/path/to/output

Check the result as below:


$ hadoop fs -cat /hadoop/path/to/output/*
world | document1.txt -> document2.txt
interesting | document2.txt
hadoop | document1.txt  -> document2.txt
is | document2.txt
very | document2.txt

3 thoughts on “Inverted Index – Mapreduce program

  1. Deepanshu

    how to give the input of data set in Stack overflow mapReduce program of inverted index ?

    Reply
  2. shadowM

    We have to correct some lines in the code:
    – change text by Text, Reeducer Class, in the template
    – change longWritable by LongWritable

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *