That was a deeply satisfying bug. Fix is up here:
https://issues.apache.org/jira/browse/CRUNCH-553

On Mon, Jul 27, 2015 at 6:29 PM, Jeff Quinn <[email protected]> wrote:

> Wow, thanks so much for looking into it. That minimal example
> seems accurate. Previously when we dug deeper into which records were
> dropped it appeared entire files were being dropped, not just parts of one
> file, so that sounds consistent with what you are seeing.
>
> On Monday, July 27, 2015, Josh Wills <[email protected]> wrote:
>
>> Hey Jeff,
>>
>> Okay cool-- I think I've managed to create a simple test that replicates
>> the behavior you're seeing. I can run this test a few different times, and
>> sometimes I'll get the correct output, but other times I'll get an error
>> b/c no records are processed. I'm going to investigate further and see if I
>> can identify the source of the randomness.
>>
>> public class RecordDropIT {
>>   @Rule
>>   public TemporaryPath tmpDir = TemporaryPaths.create();
>>
>>   @Test
>>   public void testMultiReadCount() throws Exception {
>>     int numReads = 2;
>>     MRPipeline p = new MRPipeline(RecordDropIT.class, 
>> tmpDir.getDefaultConfiguration());
>>     Path shakes = tmpDir.copyResourcePath("shakes.txt");
>>     TableSource<LongWritable, Text> src = From.formattedFile(shakes, 
>> TextInputFormat.class, LongWritable.class, Text.class);
>>     List<Iterable<Integer>> values = Lists.newArrayList();
>>     for (int i = 0; i < numReads; i++) {
>>       PCollection<Integer> cnt = p.read(src).parallelDo(new 
>> LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
>>       values.add(cnt.materialize());
>>     }
>>     for (Iterable<Integer> iter : values) {
>>       System.out.println(Iterables.getOnlyElement(iter));
>>     }
>>     p.done();
>>   }
>>
>>   public static class LineCountFn<T> extends DoFn<T, Integer> {
>>
>>     private int count = 0;
>>
>>     @Override
>>     public void process(T input, Emitter<Integer> emitter) {
>>       count++;
>>     }
>>
>>     @Override
>>     public void cleanup(Emitter<Integer> emitter) {
>>       emitter.emit(count);
>>     }
>>   }
>> }
>>
>>
>> On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <[email protected]> wrote:
>>
>>> Hi Josh,
>>>
>>> Thanks so much for your suggestions.
>>>
>>> The counts are determined with two methods, I am using a simple pig
>>> script to count records, and I am also tabulating up the size in bytes of
>>> all hdfs output files. Both measures show dropped records / fewer than
>>> expected output bytes.
>>>
>>> To your second point I will go back and do a sweep for that, but I am
>>> fairly sure no DoFns are making use of intermediate state values without
>>> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
>>> think it has bitten us before.
>>>
>>> Thanks !
>>>
>>> Jeff
>>>
>>>
>>> On Monday, July 27, 2015, Josh Wills <[email protected]> wrote:
>>>
>>>> One more thought-- are any of these DoFns keeping records around as
>>>> intermediate state values w/o using PType.getDetachedValue to make copies
>>>> of them?
>>>>
>>>> J
>>>>
>>>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <[email protected]>
>>>> wrote:
>>>>
>>>>> Hey Jeff,
>>>>>
>>>>> Are the counts determined by Counters? Or is it the length of the
>>>>> output files? Or both?
>>>>>
>>>>> J
>>>>>
>>>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Out of curiosity, any reason you went with multiple reads as opposed
>>>>>> to just performing multiple operations on the same PTable? parallelDo
>>>>>> returns a new object rather than modifying the initial one, so a single
>>>>>> collection can start multiple execution flows.
>>>>>>
>>>>>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <[email protected]> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> We have observed and replicated strange behavior with our crunch
>>>>>>> application while running on MapReduce via the AWS ElasticMapReduce
>>>>>>> service. Running a very simple job which is mostly map only, we see 
>>>>>>> that an
>>>>>>> undetermined subset of records are getting dropped. Specifically, we
>>>>>>> expect 30,136,686 output records and have seen output on different 
>>>>>>> trials
>>>>>>> (running over the same data with the same binary):
>>>>>>>
>>>>>>> 22,177,119 records
>>>>>>> 26,435,670 records
>>>>>>> 22,362,986 records
>>>>>>> 29,798,528 records
>>>>>>>
>>>>>>> These are all the things about our application which might be
>>>>>>> unusual and relevant:
>>>>>>>
>>>>>>> - We use a custom file input format, via From.formattedFile. It
>>>>>>> looks like this (basically a carbon copy
>>>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>>>>
>>>>>>> import org.apache.hadoop.io.LongWritable;
>>>>>>> import org.apache.hadoop.io.Text;
>>>>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>>>>
>>>>>>> import java.io.IOException;
>>>>>>>
>>>>>>> public class ByteOffsetInputFormat extends 
>>>>>>> FileInputFormat<LongWritable, Text> {
>>>>>>>
>>>>>>>   @Override
>>>>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>>>>       InterruptedException {
>>>>>>>     return new LineRecordReader();
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many 
>>>>>>> times, for the job in question it is called ~160 times as the input is 
>>>>>>> ~100 different files. Each file ranges in size from 100MB-8GB. Our job 
>>>>>>> only uses this input format for all input files.
>>>>>>>
>>>>>>> - For some files org.apache.crunch.Pipeline#read is called twice one 
>>>>>>> the same file, and the resulting PTables are processed in different 
>>>>>>> ways.
>>>>>>>
>>>>>>> - It is only the data from these files which 
>>>>>>> org.apache.crunch.Pipeline#read has been called on more than once 
>>>>>>> during a job that have dropped records, all other files consistently do 
>>>>>>> not have dropped records
>>>>>>>
>>>>>>> Curious if any Crunch users have experienced similar behavior before, 
>>>>>>> or if any of these details about my job raise any red flags.
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> Jeff Quinn
>>>>>>>
>>>>>>> Data Engineer
>>>>>>>
>>>>>>> Nuna
>>>>>>>
>>>>>>>
>>>>>>> *DISCLAIMER:* The contents of this email, including any
>>>>>>> attachments, may contain information that is confidential, proprietary 
>>>>>>> in
>>>>>>> nature, protected health information (PHI), or otherwise protected by 
>>>>>>> law
>>>>>>> from disclosure, and is solely for the use of the intended 
>>>>>>> recipient(s). If
>>>>>>> you are not the intended recipient, you are hereby notified that any 
>>>>>>> use,
>>>>>>> disclosure or copying of this email, including any attachments, is
>>>>>>> unauthorized and strictly prohibited. If you have received this email in
>>>>>>> error, please notify the sender of this email. Please delete this and 
>>>>>>> all
>>>>>>> copies of this email from your system. Any opinions either expressed or
>>>>>>> implied in this email and all attachments, are those of its author only,
>>>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Director of Data Science
>>>>> Cloudera <http://www.cloudera.com>
>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Director of Data Science
>>>> Cloudera <http://www.cloudera.com>
>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>
>>>
>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>> may contain information that is confidential, proprietary in nature,
>>> protected health information (PHI), or otherwise protected by law from
>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>> are not the intended recipient, you are hereby notified that any use,
>>> disclosure or copying of this email, including any attachments, is
>>> unauthorized and strictly prohibited. If you have received this email in
>>> error, please notify the sender of this email. Please delete this and all
>>> copies of this email from your system. Any opinions either expressed or
>>> implied in this email and all attachments, are those of its author only,
>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
> *DISCLAIMER:* The contents of this email, including any attachments, may
> contain information that is confidential, proprietary in nature, protected
> health information (PHI), or otherwise protected by law from disclosure,
> and is solely for the use of the intended recipient(s). If you are not the
> intended recipient, you are hereby notified that any use, disclosure or
> copying of this email, including any attachments, is unauthorized and
> strictly prohibited. If you have received this email in error, please
> notify the sender of this email. Please delete this and all copies of this
> email from your system. Any opinions either expressed or implied in this
> email and all attachments, are those of its author only, and do not
> necessarily reflect those of Nuna Health, Inc.
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Reply via email to