JAVA IMPLEMENTATION OF MAP REDUCE WORDCOUNT
JAVA
IMPLEMENTATION OF MAP REDUCE WORDCOUNT
HADOOP
MAP REDUCE
Hadoop
MapReduce is a software framework for easily writing applications
which process vast amounts of data (multi-terabyte data-sets)
in-parallel on large clusters (thousands of nodes) of commodity
hardware in a reliable, fault-tolerant manner.
A
MapReduce job usually splits the input data-set into independent
chunks which are processed by the map tasks in a completely parallel
manner. The framework sorts the outputs of the maps, which are then
input to the reduce tasks. Typically both the input and the output of
the job are stored in a file-system. The framework takes care of
scheduling tasks, monitoring them and re-executes the failed tasks.
HADOOP
MAP REDUCE – WORD COUNT
The
purpose of a WordCount program is to count the number of occurrences
of each word in a given file. First data is input to the mapper in
(key, value) pairs. For our example, the key will be the line number
of input (so each line of input will go to a different mapper) and
the value will be the text present on that line. Once the mapper has
the input, it will perform some operation on it and output data again
in (key, value) pairs. In the WordCount example, the mappers will
simply output each word that occurs as a new key on that line and the
integer “1″ as the associated value (note that a single mapper
can output multiple (key, value) pairs).
One
of the main things to understand in a MapReduce is that there are a
number of Mappers running on a given input file and these Mappers
cannot interact with one another. Suppose we have two different
mappers, lets call them Mapper1 and Mapper2 that are each working on
two different lines of input from a file. If both lines of input have
the word “apple”, there is no way for Mapper2 to know that
Mapper1‘s line of input also has this word. In this setting that’s
perfectly fine because the Shuffle and Sort phase is where all the
(key, value) pairs that were output by the mappers, compares the keys
to one another and if they are equal to one another combines their
respective values into a list of values. Unequal keys are sorted.
So
if both Mapper1 and Mapper2 contained the word “apple” in their
line of text, then the (key, value) pair (apple, 1) will occur twice.
So the Shuffle and Sort phase will notice this and output the (key,
value) pair (apple, {1, 1}).
Each
reducer is then given a key and a list of values that were output by
the mappers. The goal will be to perform some operation and again
output data in (key, value) pairs. In the WordCount example, we will
use what is known as the sumReducer. It gets this name because its
job is simply to sum the values in the list of values and output the
(key, value) pair that is the original key and this sum of values.
LIST
OF JAR FILES NEEDED
activation.jar
hadoop-0.18.0-core.jar
hadoop-0.18.3-eclipse-plugin.jar
jakarta-commons-logging.jar
log4j-1.2.15.jar
mail.jar
WordCount.java
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
import javax.swing.JOptionPane;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
System.out.println(startTime);
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setJarByClass(WordCount.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setCombinerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path("foo.txt"));
FileOutputFormat.setOutputPath(conf, new Path("bar.txt"));
JobClient.runJob(conf);
FileOutputFormat.getOutputPath(conf);
long endTime = System.currentTimeMillis();
long time = endTime - startTime;
long totalTime= time/1000;
System.out.println("Time in seconds :" +totalTime);
}
}
hmm nice Brinda., good try :-)
ReplyDelete