What is Map reduce ?
Map reduce is a framework for distributed processing. The framework takes care of abstracting the distribution from the developers. Developers write standard java programs and the framework takes care of distributing them across the network, monitoring the processing, detecting any failures and re-running it if there is a failure. The framework can also run the same task on several nodes if one of the node is running very slowly and use the results from the faster node. This is known as speculative excecution.
How does it work ?
There are two phases to map-reduce. In the map phase, the input key value pairs are processed and a new set of key-value pairs are generated. The number of mappers depends on the volume of input data. At the end of the map phase, the intermediate keys are combined into a list and shuffled to the reducers on a per key basis. The keys are shuffled to the reducer, in a sorted order. The number of reducers can be configured by the developer. To achive the distribution of processing, the framework employs two daemons: Job tracker and Task tracker. The job tracker runs on the named node and it distributes individual map tasks and reduce tasks to nodes in the cluster. The task tracker manages these individual tasks and reports the results to the job tracker. When the task tracker is running the task, it is accessing the local slice of data on the node.
How to write map-reduce code ?
At the minimum, there are three parts to a map-reduce program : Client-side driver, Mapper and Reducer. Client-side driver, Mapper and Reducer. The mapper and the reducer code run on the hadoop cluster, where as the driver runs on the client system. The driver is used to define the job details such as location of the data, input format of the data, mapper class, reducer class, output format, etc.,. The default input format is TextInputFormat in which the line in a file is the value and the byte offset is the key. Both the mappers and reducers deal with key value pairs. Key is an object of type WritableComparable and value is an object of type Writable to support serialization across hadoop nodes. Hadoop API comes with several WritableComparable classes such as IntWritable, LongWritable, Text, etc., that can be used directly in map reduce program.
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.StringTokenizer;
public class HelloWordCount {
public static class HelloWordCountMapper
extends Mapper<Object, Text, Text, IntWritable>{
private IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String sentence = value.toString();
StringTokenizer tokens = new StringTokenizer(sentence);
while (tokens.hasMoreTokens())
{
String tkn = tokens.nextToken();
context.write(new Text(tkn), one);
}
}
}
public static class HelloWordCountReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int count = 0;
for(IntWritable value : values)
{
count = value.get() + count;
}
result.set(count);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Arguments: <in> <out>");
System.exit(2);
}
conf.set("mapred.job.tracker", "local");
conf.set("fs.default.name", "file:///");
Job job = new Job(conf, "WordCount");
job.setJarByClass(HelloWordCount.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(HelloWordCountMapper.class);
job.setCombinerClass(HelloWordCountReducer.class);
job.setReducerClass(HelloWordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
No comments:
Post a Comment