Controlling Hadoop MapReduce Job recursion

9th April 2011·3 min read

This post is related to the previous post.

Sometimes you coming along problems that need to be solved in a recursive manner. For example the graph exploration algorithm in my previous post.
You have to chain the jobs and let the next job work on the output of the previous job. And of course you need a breaking condition. This could either be a fixed limit of “how many recursions it should do” or “how many recursion it really does”.
Let me focus at the second breaking condition along with my graph exploration example.

Counter
First off you should know that in Hadoop you have counters, you may see them after a job ran or in the Webinterface of the Jobtracker. “Famous” counters are the “Map input records” or the “Reduce output records”.
The best of all is that we can setup our own counters, just with the use of enums.

How to setup Counter?
The simplest approach is to just define an enum like this:

public enum UpdateCounter {  
  UPDATED  
 }  

Now you can manipulate the counter using:

context.getCounter(UpdateCounter.UPDATED).increment(1);  

“context” is the context object you get from your mapper or your reducer.
This line will obviously increment your update counter by 1.

How to fetch the counter?

This is as easy as setting up an enum. You are submitting a job like this:

Configuration conf = new Configuration();  
Job job = new Job(conf);  
job.setJobName("Graph explorer");  

job.setMapperClass(DatasetImporter.class);  
job.setReducerClass(ExplorationReducer.class);  
// leave out the stuff with paths etc.  
job.waitForCompletion(true);  

Be sure that the job has finished, using waitForCompletion is recommended. Querying the counter during runtime can end in strange results ;)
You can access your counter like this:

long counter = job.getCounters().findCounter(ExplorationReducer.UpdateCounter.UPDATED)  
    .getValue();  

How to get the recursion running?

Now we know how to get the counter. Now setting up a recursion is quite simple. The only thing that you should watch for is already existing paths from older job runs.
Look at this snippet:

// variable to keep track of the recursion depth  
int depth = 0;  
// counter from the previous running import job  
long counter = job.getCounters().findCounter(ExplorationReducer.UpdateCounter.UPDATED)  
    .getValue();  
  
  depth++;  
  while (counter > 0) {  
   // reuse the conf reference with a fresh object  
   conf = new Configuration();  
   // set the depth into the configuration  
   conf.set("recursion.depth", depth + "");  
   job = new Job(conf);  
   job.setJobName("Graph explorer " + depth);  
  
   job.setMapperClass(ExplorationMapper.class);  
   job.setReducerClass(ExplorationReducer.class);  
   job.setJarByClass(ExplorationMapper.class);  
   // always work on the path of the previous depth  
   in = new Path("files/graph-exploration/depth\_" + (depth - 1) + "/");  
   out = new Path("files/graph-exploration/depth\_" + depth);  
  
   SequenceFileInputFormat.addInputPath(job, in);  
   // delete the outputpath if already exists  
   if (fs.exists(out))  
    fs.delete(out, true);  
  
   SequenceFileOutputFormat.setOutputPath(job, out);  
   job.setInputFormatClass(SequenceFileInputFormat.class);  
   job.setOutputFormatClass(SequenceFileOutputFormat.class);  
   job.setOutputKeyClass(LongWritable.class);  
   job.setOutputValueClass(VertexWritable.class);  
   // wait for completion and update the counter  
   job.waitForCompletion(true);  
   depth++;  
   counter = job.getCounters().findCounter(ExplorationReducer.UpdateCounter.UPDATED)  
     .getValue();  
  }  

Note that if you never incremented your counter it will be always 0. Or it could be null of you never used it in your mapper or reducer.

Full sourcecodes can always be found here:
http://code.google.com/p/hama-shortest-paths/


Thomas Jungblut

I'm Thomas Jungblut - welcome to my personal blog. Here you'll find a lot of posts around all the things I'm interested in writing about. Big Data, Bulk Synchronous Parallel, MapReduce, Machine Learning, Clustering, Graph Theory, Natural Language Processing, Computer Science and Open Source in general.

© Thomas Jungblut 2021. Built with Gatsby