Wow, thanks so much for looking into it. That minimal example
seems accurate. Previously when we dug deeper into which records were
dropped it appeared entire files were being dropped, not just parts of one
file, so that sounds consistent with what you are seeing.

On Monday, July 27, 2015, Josh Wills <[email protected]> wrote:

> Hey Jeff,
>
> Okay cool-- I think I've managed to create a simple test that replicates
> the behavior you're seeing. I can run this test a few different times, and
> sometimes I'll get the correct output, but other times I'll get an error
> b/c no records are processed. I'm going to investigate further and see if I
> can identify the source of the randomness.
>
> public class RecordDropIT {
>   @Rule
>   public TemporaryPath tmpDir = TemporaryPaths.create();
>
>   @Test
>   public void testMultiReadCount() throws Exception {
>     int numReads = 2;
>     MRPipeline p = new MRPipeline(RecordDropIT.class, 
> tmpDir.getDefaultConfiguration());
>     Path shakes = tmpDir.copyResourcePath("shakes.txt");
>     TableSource<LongWritable, Text> src = From.formattedFile(shakes, 
> TextInputFormat.class, LongWritable.class, Text.class);
>     List<Iterable<Integer>> values = Lists.newArrayList();
>     for (int i = 0; i < numReads; i++) {
>       PCollection<Integer> cnt = p.read(src).parallelDo(new 
> LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
>       values.add(cnt.materialize());
>     }
>     for (Iterable<Integer> iter : values) {
>       System.out.println(Iterables.getOnlyElement(iter));
>     }
>     p.done();
>   }
>
>   public static class LineCountFn<T> extends DoFn<T, Integer> {
>
>     private int count = 0;
>
>     @Override
>     public void process(T input, Emitter<Integer> emitter) {
>       count++;
>     }
>
>     @Override
>     public void cleanup(Emitter<Integer> emitter) {
>       emitter.emit(count);
>     }
>   }
> }
>
>
> On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <[email protected]
> <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote:
>
>> Hi Josh,
>>
>> Thanks so much for your suggestions.
>>
>> The counts are determined with two methods, I am using a simple pig
>> script to count records, and I am also tabulating up the size in bytes of
>> all hdfs output files. Both measures show dropped records / fewer than
>> expected output bytes.
>>
>> To your second point I will go back and do a sweep for that, but I am
>> fairly sure no DoFns are making use of intermediate state values without
>> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
>> think it has bitten us before.
>>
>> Thanks !
>>
>> Jeff
>>
>> On Monday, July 27, 2015, Josh Wills <[email protected]
>> <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote:
>>
>>> One more thought-- are any of these DoFns keeping records around as
>>> intermediate state values w/o using PType.getDetachedValue to make copies
>>> of them?
>>>
>>> J
>>>
>>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <[email protected]> wrote:
>>>
>>>> Hey Jeff,
>>>>
>>>> Are the counts determined by Counters? Or is it the length of the
>>>> output files? Or both?
>>>>
>>>> J
>>>>
>>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <[email protected]> wrote:
>>>>
>>>>> Out of curiosity, any reason you went with multiple reads as opposed
>>>>> to just performing multiple operations on the same PTable? parallelDo
>>>>> returns a new object rather than modifying the initial one, so a single
>>>>> collection can start multiple execution flows.
>>>>>
>>>>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <[email protected]> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> We have observed and replicated strange behavior with our crunch
>>>>>> application while running on MapReduce via the AWS ElasticMapReduce
>>>>>> service. Running a very simple job which is mostly map only, we see that 
>>>>>> an
>>>>>> undetermined subset of records are getting dropped. Specifically, we
>>>>>> expect 30,136,686 output records and have seen output on different trials
>>>>>> (running over the same data with the same binary):
>>>>>>
>>>>>> 22,177,119 records
>>>>>> 26,435,670 records
>>>>>> 22,362,986 records
>>>>>> 29,798,528 records
>>>>>>
>>>>>> These are all the things about our application which might be unusual
>>>>>> and relevant:
>>>>>>
>>>>>> - We use a custom file input format, via From.formattedFile. It looks
>>>>>> like this (basically a carbon copy
>>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>>>
>>>>>> import org.apache.hadoop.io.LongWritable;
>>>>>> import org.apache.hadoop.io.Text;
>>>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>>>
>>>>>> import java.io.IOException;
>>>>>>
>>>>>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, 
>>>>>> Text> {
>>>>>>
>>>>>>   @Override
>>>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>>>       InterruptedException {
>>>>>>     return new LineRecordReader();
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many 
>>>>>> times, for the job in question it is called ~160 times as the input is 
>>>>>> ~100 different files. Each file ranges in size from 100MB-8GB. Our job 
>>>>>> only uses this input format for all input files.
>>>>>>
>>>>>> - For some files org.apache.crunch.Pipeline#read is called twice one the 
>>>>>> same file, and the resulting PTables are processed in different ways.
>>>>>>
>>>>>> - It is only the data from these files which 
>>>>>> org.apache.crunch.Pipeline#read has been called on more than once during 
>>>>>> a job that have dropped records, all other files consistently do not 
>>>>>> have dropped records
>>>>>>
>>>>>> Curious if any Crunch users have experienced similar behavior before, or 
>>>>>> if any of these details about my job raise any red flags.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> Jeff Quinn
>>>>>>
>>>>>> Data Engineer
>>>>>>
>>>>>> Nuna
>>>>>>
>>>>>>
>>>>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>>>>> may contain information that is confidential, proprietary in nature,
>>>>>> protected health information (PHI), or otherwise protected by law from
>>>>>> disclosure, and is solely for the use of the intended recipient(s). If 
>>>>>> you
>>>>>> are not the intended recipient, you are hereby notified that any use,
>>>>>> disclosure or copying of this email, including any attachments, is
>>>>>> unauthorized and strictly prohibited. If you have received this email in
>>>>>> error, please notify the sender of this email. Please delete this and all
>>>>>> copies of this email from your system. Any opinions either expressed or
>>>>>> implied in this email and all attachments, are those of its author only,
>>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Director of Data Science
>>>> Cloudera <http://www.cloudera.com>
>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>> *DISCLAIMER:* The contents of this email, including any attachments, may
>> contain information that is confidential, proprietary in nature, protected
>> health information (PHI), or otherwise protected by law from disclosure,
>> and is solely for the use of the intended recipient(s). If you are not the
>> intended recipient, you are hereby notified that any use, disclosure or
>> copying of this email, including any attachments, is unauthorized and
>> strictly prohibited. If you have received this email in error, please
>> notify the sender of this email. Please delete this and all copies of this
>> email from your system. Any opinions either expressed or implied in this
>> email and all attachments, are those of its author only, and do not
>> necessarily reflect those of Nuna Health, Inc.
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

-- 
*DISCLAIMER:* The contents of this email, including any attachments, may 
contain information that is confidential, proprietary in nature, protected 
health information (PHI), or otherwise protected by law from disclosure, 
and is solely for the use of the intended recipient(s). If you are not the 
intended recipient, you are hereby notified that any use, disclosure or 
copying of this email, including any attachments, is unauthorized and 
strictly prohibited. If you have received this email in error, please 
notify the sender of this email. Please delete this and all copies of this 
email from your system. Any opinions either expressed or implied in this 
email and all attachments, are those of its author only, and do not 
necessarily reflect those of Nuna Health, Inc.

Reply via email to