For what it's worth, the optimizer may still read the file more than once even 
if there's only one read in your code.  All depends on what else is being done.

Sent from my Verizon Wireless 4G LTE DROID
On Jul 28, 2015 1:34 PM, Everett Anderson <[email protected]> wrote:
Thanks, Josh!!

I'm curious about the fix and didn't fully understand from the description.

What's interesting about the test is that there's only one Pipeline read(), but 
then multiple parallelDo()s on the resulting table, yet you still hit the 
issue. We'd thought it must be due to the multiple reads of the same file.

Would this have happened in other places where multiple operations were 
performed on the same PTable or PCollection, or is it specific to the 
operations performed on objects created directly from a read()?



On Mon, Jul 27, 2015 at 6:49 PM, Josh Wills 
<[email protected]<mailto:[email protected]>> wrote:
That was a deeply satisfying bug. Fix is up here: 
https://issues.apache.org/jira/browse/CRUNCH-553

On Mon, Jul 27, 2015 at 6:29 PM, Jeff Quinn 
<[email protected]<mailto:[email protected]>> wrote:
Wow, thanks so much for looking into it. That minimal example seems accurate. 
Previously when we dug deeper into which records were dropped it appeared 
entire files were being dropped, not just parts of one file, so that sounds 
consistent with what you are seeing.

On Monday, July 27, 2015, Josh Wills 
<[email protected]<mailto:[email protected]>> wrote:
Hey Jeff,

Okay cool-- I think I've managed to create a simple test that replicates the 
behavior you're seeing. I can run this test a few different times, and 
sometimes I'll get the correct output, but other times I'll get an error b/c no 
records are processed. I'm going to investigate further and see if I can 
identify the source of the randomness.


public class RecordDropIT {
  @Rule
  public TemporaryPath tmpDir = TemporaryPaths.create();

  @Test
  public void testMultiReadCount() throws Exception {
    int numReads = 2;
    MRPipeline p = new MRPipeline(RecordDropIT.class, 
tmpDir.getDefaultConfiguration());
    Path shakes = tmpDir.copyResourcePath("shakes.txt");
    TableSource<LongWritable, Text> src = From.formattedFile(shakes, 
TextInputFormat.class, LongWritable.class, Text.class);
    List<Iterable<Integer>> values = Lists.newArrayList();
    for (int i = 0; i < numReads; i++) {
      PCollection<Integer> cnt = p.read(src).parallelDo(new 
LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
      values.add(cnt.materialize());
    }
    for (Iterable<Integer> iter : values) {
      System.out.println(Iterables.getOnlyElement(iter));
    }
    p.done();
  }

  public static class LineCountFn<T> extends DoFn<T, Integer> {

    private int count = 0;

    @Override
    public void process(T input, Emitter<Integer> emitter) {
      count++;
    }

    @Override
    public void cleanup(Emitter<Integer> emitter) {
      emitter.emit(count);
    }
  }
}

On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <[email protected]> wrote:
Hi Josh,

Thanks so much for your suggestions.

The counts are determined with two methods, I am using a simple pig script to 
count records, and I am also tabulating up the size in bytes of all hdfs output 
files. Both measures show dropped records / fewer than expected output bytes.

To your second point I will go back and do a sweep for that, but I am fairly 
sure no DoFns are making use of intermediate state values without 
getDetachedValue. Our team is aware of the getDetachedValue gotchas as I think 
it has bitten us before.

Thanks !

Jeff


On Monday, July 27, 2015, Josh Wills <[email protected]> wrote:
One more thought-- are any of these DoFns keeping records around as 
intermediate state values w/o using PType.getDetachedValue to make copies of 
them?

J

On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <[email protected]> wrote:
Hey Jeff,

Are the counts determined by Counters? Or is it the length of the output files? 
Or both?

J

On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <[email protected]> wrote:

Out of curiosity, any reason you went with multiple reads as opposed to just 
performing multiple operations on the same PTable? parallelDo returns a new 
object rather than modifying the initial one, so a single collection can start 
multiple execution flows.

On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <[email protected]> wrote:
Hello,

We have observed and replicated strange behavior with our crunch application 
while running on MapReduce via the AWS ElasticMapReduce service. Running a very 
simple job which is mostly map only, we see that an undetermined subset of 
records are getting dropped. Specifically, we expect 30,136,686 output records 
and have seen output on different trials (running over the same data with the 
same binary):

22,177,119 records
26,435,670 records
22,362,986 records
29,798,528 records

These are all the things about our application which might be unusual and 
relevant:

- We use a custom file input format, via From.formattedFile. It looks like this 
(basically a carbon copy of 
org.apache.hadoop.mapreduce.lib.input.TextInputFormat):


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

import java.io.IOException;

public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> createRecordReader(
      InputSplit split, TaskAttemptContext context) throws IOException,
      InterruptedException {
    return new LineRecordReader();
  }
}

- We call org.apache.crunch.Pipeline#read using this InputFormat many times, 
for the job in question it is called ~160 times as the input is ~100 different 
files. Each file ranges in size from 100MB-8GB. Our job only uses this input 
format for all input files.

- For some files org.apache.crunch.Pipeline#read is called twice one the same 
file, and the resulting PTables are processed in different ways.

- It is only the data from these files which org.apache.crunch.Pipeline#read 
has been called on more than once during a job that have dropped records, all 
other files consistently do not have dropped records

Curious if any Crunch users have experienced similar behavior before, or if any 
of these details about my job raise any red flags.

Thanks!

Jeff Quinn

Data Engineer

Nuna

DISCLAIMER: The contents of this email, including any attachments, may contain 
information that is confidential, proprietary in nature, protected health 
information (PHI), or otherwise protected by law from disclosure, and is solely 
for the use of the intended recipient(s). If you are not the intended 
recipient, you are hereby notified that any use, disclosure or copying of this 
email, including any attachments, is unauthorized and strictly prohibited. If 
you have received this email in error, please notify the sender of this email. 
Please delete this and all copies of this email from your system. Any opinions 
either expressed or implied in this email and all attachments, are those of its 
author only, and do not necessarily reflect those of Nuna Health, Inc.



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>

DISCLAIMER: The contents of this email, including any attachments, may contain 
information that is confidential, proprietary in nature, protected health 
information (PHI), or otherwise protected by law from disclosure, and is solely 
for the use of the intended recipient(s). If you are not the intended 
recipient, you are hereby notified that any use, disclosure or copying of this 
email, including any attachments, is unauthorized and strictly prohibited. If 
you have received this email in error, please notify the sender of this email. 
Please delete this and all copies of this email from your system. Any opinions 
either expressed or implied in this email and all attachments, are those of its 
author only, and do not necessarily reflect those of Nuna Health, Inc.



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>

DISCLAIMER: The contents of this email, including any attachments, may contain 
information that is confidential, proprietary in nature, protected health 
information (PHI), or otherwise protected by law from disclosure, and is solely 
for the use of the intended recipient(s). If you are not the intended 
recipient, you are hereby notified that any use, disclosure or copying of this 
email, including any attachments, is unauthorized and strictly prohibited. If 
you have received this email in error, please notify the sender of this email. 
Please delete this and all copies of this email from your system. Any opinions 
either expressed or implied in this email and all attachments, are those of its 
author only, and do not necessarily reflect those of Nuna Health, Inc.



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>


DISCLAIMER: The contents of this email, including any attachments, may contain 
information that is confidential, proprietary in nature, protected health 
information (PHI), or otherwise protected by law from disclosure, and is solely 
for the use of the intended recipient(s). If you are not the intended 
recipient, you are hereby notified that any use, disclosure or copying of this 
email, including any attachments, is unauthorized and strictly prohibited. If 
you have received this email in error, please notify the sender of this email. 
Please delete this and all copies of this email from your system. Any opinions 
either expressed or implied in this email and all attachments, are those of its 
author only, and do not necessarily reflect those of Nuna Health, Inc.
This email is intended only for the use of the individual(s) to whom it is 
addressed. If you have received this communication in error, please immediately 
notify the sender and delete the original email.

Reply via email to