conf.setMapRunnerClass(MyMapRunner.class);
A sample of your MyMapRunner,
public class FindAndExitMapRunner<K1, V1, K2, V2> extends MapRunner<K1, V1, K2, V2> {
private Mapper<K1, V1, K2, V2> mapper;
private boolean incrProcCount;
public static final String FIND_INDICATE = "Exit after finding at least one data";
public static final String REACH_MAX = "Exit after reaching max lines";
@SuppressWarnings("unchecked")
public void configure(JobConf job) {
this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
//increment processed counter only if skipping feature is enabled
this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 &&
SkipBadRecords.getAutoIncrMapperProcCount(job);
}
@Override
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter) throws IOException {
try {
// allocate key & value instances that are re-used for all entries
K1 key = input.createKey();
V1 value = input.createValue();
while (input.next(key, value)) {
// map pair to output
mapper.map(key, value, output, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
}
}
} catch (IOException e) {
// break if exception is thrown
if (!FIND_INDICATE.equalsIgnoreCase(e.getMessage()) &&
!REACH_MAX.equalsIgnoreCase(e.getMessage())) {
// re-throw except if it is not what we expect.
throw e;
}
} finally {
mapper.close();
}
}
protected Mapper<K1, V1, K2, V2> getMapper() {
return mapper;
}
}
Then, you throw exception in your mapper when your condition is satisfied:
throw new Exception(MyMapRunner.FIND_INDICATE);
No comments:
Post a Comment