Thanks David for a step-by-step response but this makes error threshold, a per mapper threshold. Is there a way to make it per job so that all mappers share this value and increment it as a shared counter ?
On Tue, Nov 15, 2011 at 8:12 AM, David Rosenstrauch <dar...@darose.net>wrote: > On 11/14/2011 06:06 PM, Mapred Learn wrote: > >> Hi, >> >> I have a use case where I want to pass a threshold value to a map-reduce >> job. For eg: error records=10. >> >> I want map-reduce job to fail if total count of error_records in the job >> i.e. all mappers, is reached. >> >> How can I implement this considering that each mapper would be processing >> some part of the input data ? >> >> Thanks, >> -JJ >> > > 1) Pass in the threshold value as configuration value of the M/R job. > (i.e., job.getConfiguration().setInt(**"error_threshold", 10) ) > > 2) Make your mappers implement the Configurable interface. This will > ensure that every mapper gets passed a copy of the config object. > > 3) When you implement the setConf() method in your mapper (which > Configurable will force you to do), retrieve the threshold value from the > config and save it in an instance variable in the mapper. (i.e., int > errorThreshold = conf.getInt("error_threshold") ) > > 4) In the mapper, when an error record occurs, increment a counter and > then check if the counter value exceeds the threshold. If so, throw an > exception. (e.g., if (++numErrors >= errorThreshold) throw new > RuntimeException("Too many errors") ) > > The exception will kill the mapper. Hadoop will attempt to re-run it, but > subsequent attempts will also fail for the same reason, and eventually the > entire job will fail. > > HTH, > > DR >