I am introducing Google's MapReduce programming model in my Information Retrieval class this Fall, as implemented by the Hadoop framework. I settled on two helpful technologies.
First, Cloudera offers a virtual machine based on Ubuntu with Hadoop pre-installed and configured. It can run in either local/standalone mode or pseudo-distributed mode. I have run it in both the VMWare player and Virtual Box on my Mac.
Second, since the rest of my class emphasizes Python, I was looking at the Python interface options as an alternative to the normally Java-based Hadoop. I came across Dumbo, which not only provides a Python interface, but also significantly streamlines the code compared to the Java interface. Also, it is easy to change between local/standalone mode, where debug is easier, to distributed mode, for deployment. This makes Dumbo very attractive in educational settings.
Dumbo is super simple to install in the Cloudera VM:
wget -O ez_setup.py http://bit.ly/ezsetup
sudo python ez_setup.py dumbo
The Dumbo documentation is somewhat limited, but here is the main reference (be sure to checkout the blog link on that page for additional info).
For example, here is the canonical WordCount example written in Python/Dumbo:
def mapper(key,value):
for word in value.split(): yield word,1
def reducer(key,values):
yield key,sum(values)
if __name__ == "__main__":
import dumbo
dumbo.run(mapper,reducer)
And here is that example written in the standard Hadoop Java interface:
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}