Hey Jeff,

Okay cool-- I think I've managed to create a simple test that replicates
the behavior you're seeing. I can run this test a few different times, and
sometimes I'll get the correct output, but other times I'll get an error
b/c no records are processed. I'm going to investigate further and see if I
can identify the source of the randomness.

public class RecordDropIT {
  @Rule
  public TemporaryPath tmpDir = TemporaryPaths.create();

  @Test
  public void testMultiReadCount() throws Exception {
    int numReads = 2;
    MRPipeline p = new MRPipeline(RecordDropIT.class,
tmpDir.getDefaultConfiguration());
    Path shakes = tmpDir.copyResourcePath("shakes.txt");
    TableSource<LongWritable, Text> src = From.formattedFile(shakes,
TextInputFormat.class, LongWritable.class, Text.class);
    List<Iterable<Integer>> values = Lists.newArrayList();
    for (int i = 0; i < numReads; i++) {
      PCollection<Integer> cnt = p.read(src).parallelDo(new
LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
      values.add(cnt.materialize());
    }
    for (Iterable<Integer> iter : values) {
      System.out.println(Iterables.getOnlyElement(iter));
    }
    p.done();
  }

  public static class LineCountFn<T> extends DoFn<T, Integer> {

    private int count = 0;

    @Override
    public void process(T input, Emitter<Integer> emitter) {
      count++;
    }

    @Override
    public void cleanup(Emitter<Integer> emitter) {
      emitter.emit(count);
    }
  }
}


On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <[email protected]> wrote:

> Hi Josh,
>
> Thanks so much for your suggestions.
>
> The counts are determined with two methods, I am using a simple pig script
> to count records, and I am also tabulating up the size in bytes of all hdfs
> output files. Both measures show dropped records / fewer than expected
> output bytes.
>
> To your second point I will go back and do a sweep for that, but I am
> fairly sure no DoFns are making use of intermediate state values without
> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
> think it has bitten us before.
>
> Thanks !
>
> Jeff
>
> On Monday, July 27, 2015, Josh Wills <[email protected]> wrote:
>
>> One more thought-- are any of these DoFns keeping records around as
>> intermediate state values w/o using PType.getDetachedValue to make copies
>> of them?
>>
>> J
>>
>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <[email protected]> wrote:
>>
>>> Hey Jeff,
>>>
>>> Are the counts determined by Counters? Or is it the length of the output
>>> files? Or both?
>>>
>>> J
>>>
>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <[email protected]> wrote:
>>>
>>>> Out of curiosity, any reason you went with multiple reads as opposed to
>>>> just performing multiple operations on the same PTable? parallelDo returns
>>>> a new object rather than modifying the initial one, so a single collection
>>>> can start multiple execution flows.
>>>>
>>>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <[email protected]> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We have observed and replicated strange behavior with our crunch
>>>>> application while running on MapReduce via the AWS ElasticMapReduce
>>>>> service. Running a very simple job which is mostly map only, we see that 
>>>>> an
>>>>> undetermined subset of records are getting dropped. Specifically, we
>>>>> expect 30,136,686 output records and have seen output on different trials
>>>>> (running over the same data with the same binary):
>>>>>
>>>>> 22,177,119 records
>>>>> 26,435,670 records
>>>>> 22,362,986 records
>>>>> 29,798,528 records
>>>>>
>>>>> These are all the things about our application which might be unusual
>>>>> and relevant:
>>>>>
>>>>> - We use a custom file input format, via From.formattedFile. It looks
>>>>> like this (basically a carbon copy
>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>>
>>>>> import org.apache.hadoop.io.LongWritable;
>>>>> import org.apache.hadoop.io.Text;
>>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>>
>>>>> import java.io.IOException;
>>>>>
>>>>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, 
>>>>> Text> {
>>>>>
>>>>>   @Override
>>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>>       InterruptedException {
>>>>>     return new LineRecordReader();
>>>>>   }
>>>>> }
>>>>>
>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many 
>>>>> times, for the job in question it is called ~160 times as the input is 
>>>>> ~100 different files. Each file ranges in size from 100MB-8GB. Our job 
>>>>> only uses this input format for all input files.
>>>>>
>>>>> - For some files org.apache.crunch.Pipeline#read is called twice one the 
>>>>> same file, and the resulting PTables are processed in different ways.
>>>>>
>>>>> - It is only the data from these files which 
>>>>> org.apache.crunch.Pipeline#read has been called on more than once during 
>>>>> a job that have dropped records, all other files consistently do not have 
>>>>> dropped records
>>>>>
>>>>> Curious if any Crunch users have experienced similar behavior before, or 
>>>>> if any of these details about my job raise any red flags.
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Jeff Quinn
>>>>>
>>>>> Data Engineer
>>>>>
>>>>> Nuna
>>>>>
>>>>>
>>>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>>>> may contain information that is confidential, proprietary in nature,
>>>>> protected health information (PHI), or otherwise protected by law from
>>>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>>>> are not the intended recipient, you are hereby notified that any use,
>>>>> disclosure or copying of this email, including any attachments, is
>>>>> unauthorized and strictly prohibited. If you have received this email in
>>>>> error, please notify the sender of this email. Please delete this and all
>>>>> copies of this email from your system. Any opinions either expressed or
>>>>> implied in this email and all attachments, are those of its author only,
>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>
>>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
> *DISCLAIMER:* The contents of this email, including any attachments, may
> contain information that is confidential, proprietary in nature, protected
> health information (PHI), or otherwise protected by law from disclosure,
> and is solely for the use of the intended recipient(s). If you are not the
> intended recipient, you are hereby notified that any use, disclosure or
> copying of this email, including any attachments, is unauthorized and
> strictly prohibited. If you have received this email in error, please
> notify the sender of this email. Please delete this and all copies of this
> email from your system. Any opinions either expressed or implied in this
> email and all attachments, are those of its author only, and do not
> necessarily reflect those of Nuna Health, Inc.
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Reply via email to