That was a deeply satisfying bug. Fix is up here: https://issues.apache.org/jira/browse/CRUNCH-553
On Mon, Jul 27, 2015 at 6:29 PM, Jeff Quinn <[email protected]> wrote: > Wow, thanks so much for looking into it. That minimal example > seems accurate. Previously when we dug deeper into which records were > dropped it appeared entire files were being dropped, not just parts of one > file, so that sounds consistent with what you are seeing. > > On Monday, July 27, 2015, Josh Wills <[email protected]> wrote: > >> Hey Jeff, >> >> Okay cool-- I think I've managed to create a simple test that replicates >> the behavior you're seeing. I can run this test a few different times, and >> sometimes I'll get the correct output, but other times I'll get an error >> b/c no records are processed. I'm going to investigate further and see if I >> can identify the source of the randomness. >> >> public class RecordDropIT { >> @Rule >> public TemporaryPath tmpDir = TemporaryPaths.create(); >> >> @Test >> public void testMultiReadCount() throws Exception { >> int numReads = 2; >> MRPipeline p = new MRPipeline(RecordDropIT.class, >> tmpDir.getDefaultConfiguration()); >> Path shakes = tmpDir.copyResourcePath("shakes.txt"); >> TableSource<LongWritable, Text> src = From.formattedFile(shakes, >> TextInputFormat.class, LongWritable.class, Text.class); >> List<Iterable<Integer>> values = Lists.newArrayList(); >> for (int i = 0; i < numReads; i++) { >> PCollection<Integer> cnt = p.read(src).parallelDo(new >> LineCountFn<Pair<LongWritable, Text>>(), Writables.ints()); >> values.add(cnt.materialize()); >> } >> for (Iterable<Integer> iter : values) { >> System.out.println(Iterables.getOnlyElement(iter)); >> } >> p.done(); >> } >> >> public static class LineCountFn<T> extends DoFn<T, Integer> { >> >> private int count = 0; >> >> @Override >> public void process(T input, Emitter<Integer> emitter) { >> count++; >> } >> >> @Override >> public void cleanup(Emitter<Integer> emitter) { >> emitter.emit(count); >> } >> } >> } >> >> >> On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <[email protected]> wrote: >> >>> Hi Josh, >>> >>> Thanks so much for your suggestions. >>> >>> The counts are determined with two methods, I am using a simple pig >>> script to count records, and I am also tabulating up the size in bytes of >>> all hdfs output files. Both measures show dropped records / fewer than >>> expected output bytes. >>> >>> To your second point I will go back and do a sweep for that, but I am >>> fairly sure no DoFns are making use of intermediate state values without >>> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I >>> think it has bitten us before. >>> >>> Thanks ! >>> >>> Jeff >>> >>> >>> On Monday, July 27, 2015, Josh Wills <[email protected]> wrote: >>> >>>> One more thought-- are any of these DoFns keeping records around as >>>> intermediate state values w/o using PType.getDetachedValue to make copies >>>> of them? >>>> >>>> J >>>> >>>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <[email protected]> >>>> wrote: >>>> >>>>> Hey Jeff, >>>>> >>>>> Are the counts determined by Counters? Or is it the length of the >>>>> output files? Or both? >>>>> >>>>> J >>>>> >>>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <[email protected]> >>>>> wrote: >>>>> >>>>>> Out of curiosity, any reason you went with multiple reads as opposed >>>>>> to just performing multiple operations on the same PTable? parallelDo >>>>>> returns a new object rather than modifying the initial one, so a single >>>>>> collection can start multiple execution flows. >>>>>> >>>>>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <[email protected]> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> We have observed and replicated strange behavior with our crunch >>>>>>> application while running on MapReduce via the AWS ElasticMapReduce >>>>>>> service. Running a very simple job which is mostly map only, we see >>>>>>> that an >>>>>>> undetermined subset of records are getting dropped. Specifically, we >>>>>>> expect 30,136,686 output records and have seen output on different >>>>>>> trials >>>>>>> (running over the same data with the same binary): >>>>>>> >>>>>>> 22,177,119 records >>>>>>> 26,435,670 records >>>>>>> 22,362,986 records >>>>>>> 29,798,528 records >>>>>>> >>>>>>> These are all the things about our application which might be >>>>>>> unusual and relevant: >>>>>>> >>>>>>> - We use a custom file input format, via From.formattedFile. It >>>>>>> looks like this (basically a carbon copy >>>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat): >>>>>>> >>>>>>> import org.apache.hadoop.io.LongWritable; >>>>>>> import org.apache.hadoop.io.Text; >>>>>>> import org.apache.hadoop.mapreduce.InputSplit; >>>>>>> import org.apache.hadoop.mapreduce.RecordReader; >>>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext; >>>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; >>>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; >>>>>>> >>>>>>> import java.io.IOException; >>>>>>> >>>>>>> public class ByteOffsetInputFormat extends >>>>>>> FileInputFormat<LongWritable, Text> { >>>>>>> >>>>>>> @Override >>>>>>> public RecordReader<LongWritable, Text> createRecordReader( >>>>>>> InputSplit split, TaskAttemptContext context) throws IOException, >>>>>>> InterruptedException { >>>>>>> return new LineRecordReader(); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many >>>>>>> times, for the job in question it is called ~160 times as the input is >>>>>>> ~100 different files. Each file ranges in size from 100MB-8GB. Our job >>>>>>> only uses this input format for all input files. >>>>>>> >>>>>>> - For some files org.apache.crunch.Pipeline#read is called twice one >>>>>>> the same file, and the resulting PTables are processed in different >>>>>>> ways. >>>>>>> >>>>>>> - It is only the data from these files which >>>>>>> org.apache.crunch.Pipeline#read has been called on more than once >>>>>>> during a job that have dropped records, all other files consistently do >>>>>>> not have dropped records >>>>>>> >>>>>>> Curious if any Crunch users have experienced similar behavior before, >>>>>>> or if any of these details about my job raise any red flags. >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> Jeff Quinn >>>>>>> >>>>>>> Data Engineer >>>>>>> >>>>>>> Nuna >>>>>>> >>>>>>> >>>>>>> *DISCLAIMER:* The contents of this email, including any >>>>>>> attachments, may contain information that is confidential, proprietary >>>>>>> in >>>>>>> nature, protected health information (PHI), or otherwise protected by >>>>>>> law >>>>>>> from disclosure, and is solely for the use of the intended >>>>>>> recipient(s). If >>>>>>> you are not the intended recipient, you are hereby notified that any >>>>>>> use, >>>>>>> disclosure or copying of this email, including any attachments, is >>>>>>> unauthorized and strictly prohibited. If you have received this email in >>>>>>> error, please notify the sender of this email. Please delete this and >>>>>>> all >>>>>>> copies of this email from your system. Any opinions either expressed or >>>>>>> implied in this email and all attachments, are those of its author only, >>>>>>> and do not necessarily reflect those of Nuna Health, Inc. >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Director of Data Science >>>>> Cloudera <http://www.cloudera.com> >>>>> Twitter: @josh_wills <http://twitter.com/josh_wills> >>>>> >>>> >>>> >>>> >>>> -- >>>> Director of Data Science >>>> Cloudera <http://www.cloudera.com> >>>> Twitter: @josh_wills <http://twitter.com/josh_wills> >>>> >>> >>> *DISCLAIMER:* The contents of this email, including any attachments, >>> may contain information that is confidential, proprietary in nature, >>> protected health information (PHI), or otherwise protected by law from >>> disclosure, and is solely for the use of the intended recipient(s). If you >>> are not the intended recipient, you are hereby notified that any use, >>> disclosure or copying of this email, including any attachments, is >>> unauthorized and strictly prohibited. If you have received this email in >>> error, please notify the sender of this email. Please delete this and all >>> copies of this email from your system. Any opinions either expressed or >>> implied in this email and all attachments, are those of its author only, >>> and do not necessarily reflect those of Nuna Health, Inc. >>> >> >> >> >> -- >> Director of Data Science >> Cloudera <http://www.cloudera.com> >> Twitter: @josh_wills <http://twitter.com/josh_wills> >> > > *DISCLAIMER:* The contents of this email, including any attachments, may > contain information that is confidential, proprietary in nature, protected > health information (PHI), or otherwise protected by law from disclosure, > and is solely for the use of the intended recipient(s). If you are not the > intended recipient, you are hereby notified that any use, disclosure or > copying of this email, including any attachments, is unauthorized and > strictly prohibited. If you have received this email in error, please > notify the sender of this email. Please delete this and all copies of this > email from your system. Any opinions either expressed or implied in this > email and all attachments, are those of its author only, and do not > necessarily reflect those of Nuna Health, Inc. > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
