Wednesday, June 10, 2015

Skip Hadoop mapper when condition is satisfied

If conf is an instance of JobConf:

conf.setMapRunnerClass(MyMapRunner.class);

A sample of your MyMapRunner, 

public class FindAndExitMapRunner<K1, V1, K2, V2> extends MapRunner<K1, V1, K2, V2> {
private Mapper<K1, V1, K2, V2> mapper;
private boolean incrProcCount;
public static final String FIND_INDICATE = "Exit after finding at least one data";
public static final String REACH_MAX = "Exit after reaching max lines";
 
@SuppressWarnings("unchecked")
public void configure(JobConf job) {
this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
//increment processed counter only if skipping feature is enabled
this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
SkipBadRecords.getAutoIncrMapperProcCount(job);
}
@Override
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter) throws IOException {
   try {
    // allocate key & value instances that are re-used for all entries
    K1 key = input.createKey();
    V1 value = input.createValue();
    while (input.next(key, value)) {
    // map pair to output
    mapper.map(key, value, output, reporter);
    if(incrProcCount) {
    reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
    SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
    }
    }
    } catch (IOException e) {
    // break if exception is thrown
    if (!FIND_INDICATE.equalsIgnoreCase(e.getMessage()) && 
    !REACH_MAX.equalsIgnoreCase(e.getMessage())) {
    // re-throw except if it is not what we expect.
    throw e;
    }
} finally {
    mapper.close();
    }
}

protected Mapper<K1, V1, K2, V2> getMapper() {
return mapper;
}
}

Then, you throw exception in your mapper when your condition is satisfied:

throw new Exception(MyMapRunner.FIND_INDICATE);