Hi, If you mirror the logic of checking the error condition in both mapper and reducer (from the counters), you have a higher probability that the job will fail as early as possible. The mappers are not guaranteed to get the last updated value of a counter from all the mappers and if it slips thru this case then your reducer can handle this.
Also, you are not limited to using a single reducer as all reducers will race to fail on this condition. Otherwise you get the same parallelism in processing as you had earlier. Thanks Sudhan S On Wed, Nov 16, 2011 at 1:33 PM, Mapred Learn <mapred.le...@gmail.com>wrote: > Thanks Harsh for a descriptive response. > > This means that all mappers would finish before we can find out if there > were errors, right ? Even though first mapper might have reached this > threshold. > > Thanks, > > Sent from my iPhone > > On Nov 15, 2011, at 9:21 PM, Harsh J <ha...@cloudera.com> wrote: > > Ah so the threshold is job-level, not per task. OK. > > One other way I think would be performant, AND still able to use Hadoop > itself would be to keep one reducer for this job, and have that reducer > check if the counter of total failed records exceeds the threshold or not. > A reducer is guaranteed to have gotten the total aggregate of map side > counters since it begins only after all maps complete. The reducer can then > go ahead and fail itself to fail the job or pass through. Your maps may > output their data directly - the reducer is just to decide if the mappers > were alright (Perhaps send failed counts as KV to the reducer, to avoid > looking up Hadoop counters from within tasks -- but this would easily apply > only to Map-only jobs. For MR jobs, it may be a bit more complicated to add > this in, but surely still doable with some partitioner and comparator > tweaks). > > But, also good to fail if a single map task itself exceeds > 10. The above > is to ensure the global check, while doing this as well would ensure faster > failure depending on the situation. > > On 16-Nov-2011, at 1:16 AM, Mapred Learn wrote: > > Hi Harsh, > > My situation is to kill a job when this threshold is reached. If say > threshold is 10. And 2 mappers combined reached this value, how should I > achieve this. > > With what you are saying, I think job will fail once a single mapper > reaches that threshold. > > Thanks, > > > On Tue, Nov 15, 2011 at 11:22 AM, Harsh J < <ha...@cloudera.com> > ha...@cloudera.com> wrote: > >> Mapred, >> >> If you fail a task permanently upon encountering a bad situation, you >> basically end up failing the job as well, automatically. By controlling the >> number of retries (say down to 1 or 2 from 4 default total attempts), you >> can also have it fail the job faster. >> >> Is killing the job immediately a necessity? Why? >> >> I s'pose you could call kill from within the mapper, but I've never seen >> that as necessary in any situation so far. Whats wrong with letting the job >> auto-die as a result of a failing task? >> >> On 16-Nov-2011, at 12:38 AM, Mapred Learn wrote: >> >> Thanks David for a step-by-step response but this makes error >> threshold, a per mapper threshold. Is there a way to make it per job so >> that all mappers share this value and increment it as a shared counter ? >> >> >> On Tue, Nov 15, 2011 at 8:12 AM, David Rosenstrauch < <dar...@darose.net> >> dar...@darose.net> wrote: >> >>> On 11/14/2011 06:06 PM, Mapred Learn wrote: >>> >>>> Hi, >>>> >>>> I have a use case where I want to pass a threshold value to a >>>> map-reduce >>>> job. For eg: error records=10. >>>> >>>> I want map-reduce job to fail if total count of error_records in the job >>>> i.e. all mappers, is reached. >>>> >>>> How can I implement this considering that each mapper would be >>>> processing >>>> some part of the input data ? >>>> >>>> Thanks, >>>> -JJ >>>> >>> >>> 1) Pass in the threshold value as configuration value of the M/R job. >>> (i.e., job.getConfiguration().setInt(**"error_threshold", 10) ) >>> >>> 2) Make your mappers implement the Configurable interface. This will >>> ensure that every mapper gets passed a copy of the config object. >>> >>> 3) When you implement the setConf() method in your mapper (which >>> Configurable will force you to do), retrieve the threshold value from the >>> config and save it in an instance variable in the mapper. (i.e., int >>> errorThreshold = conf.getInt("error_threshold") ) >>> >>> 4) In the mapper, when an error record occurs, increment a counter and >>> then check if the counter value exceeds the threshold. If so, throw an >>> exception. (e.g., if (++numErrors >= errorThreshold) throw new >>> RuntimeException("Too many errors") ) >>> >>> The exception will kill the mapper. Hadoop will attempt to re-run it, >>> but subsequent attempts will also fail for the same reason, and eventually >>> the entire job will fail. >>> >>> HTH, >>> >>> DR >>> >> >> >> > >