Sorry deleted a line by accident but there should be a

//v.setDatehour(l2.getDatehour);

in "This doesn't work" block of code.

From: [email protected]
To: [email protected]
Subject: RE: Multiple Writes in a single pipeline.
Date: Fri, 22 Aug 2014 19:03:37 +0000




Had to do a lot of bisecting to finally get to where the issue seems to be, 
going to take some time to get a separate junk of code that I can share out but 
let me try to talk through the problem, maybe it's obvious.

I have raw text data that gets parsed into Log avro classes, which then go 
through another DoFn to get converted into Visit avros classes and I want to 
write out both collections of Log and Visit records. Basically the issue seems 
to happen in the DoFn that takes in Log and creates Visit. If I try to set any 
of the fields in Visit using fields from Log then I end up writing out no 
files, if I set the fields in Visit with literals then I have output.

Here's the code, I thought this might be a detached values issue and tried that 
as well but it didn't seem to help. Originally the code just used "l" directly 
in process() and I didn't bother with "l2".

public static PTable<Visit, Pair<Long, Long>> parseVisit(PCollection<Log> logs) 
{ 
      return logs.parallelDo("second-parse", new DoFn<Log, 
Pair<Visit,Pair<Long, Long>>>() {
        private PType<Log> ptype;
        @Override
        public void initialize() {
          this.ptype = Avros.specifics(Log.class);
          this.ptype.initialize(getConfiguration());
        }
         @Override                                                              
                                                                                
  
         public void process(Log l, Emitter<Pair<Visit, Pair<Long,Long>>> 
emitter) { 
             Log l2 = ptype.getDetachedValue(l);
             increment("test","visitcount");                                    
                                                                                
              
             // This works
             Visit v = new Visit();                                             
                                                                                
                                                                            
             v.setDate(1234);
             v.setDatehour("123");
             v.setUser("123");
             emitter.emit(Pair.of(v, Pair.of(1L, 1L)));

             // This doesn't work
             //Visit v = new Visit();
             //v.setDate(l2.getDate());
             //v.setUser(l2.getUser());
             //emitter.emit(Pair.of(v, Pair.of(1L, 1L)));

           }                                                                    
                                                                                
                                                                                
                                
      }, Avros.tableOf(Avros.specifics(Visit.class), Avros.pairs(Avros.longs(), 
Avros.longs())));   
}

Thanks! 




From: [email protected]
Date: Fri, 22 Aug 2014 08:57:13 -0700
Subject: Re: Multiple Writes in a single pipeline.
To: [email protected]

Many thanks for taking the time, would like to get this resolved before the 
next release.
J

On Fri, Aug 22, 2014 at 8:55 AM, Danny Morgan <[email protected]> wrote:





Hi Josh,

The Log and Visits classes are all in the same jar, the classloader fix is in 
place but I still get the issue without setting the class loader.

I'll put together the smallest reproduction I can and send out the code.



Danny

From: [email protected]
Date: Fri, 22 Aug 2014 08:42:06 -0700
Subject: Re: Multiple Writes in a single pipeline.


To: [email protected]

Hey Danny,
I wrote the test that I inlined below and it ran successfully for me against 
master and the 0.10 branch, so there must be something more subtle going on 
here-- are the Log and Visit classes created in different jars? I'm assuming 
the classloader fix is in play here and I'm wondering if there is something 
weird there.





import org.apache.crunch.MapFn;import org.apache.crunch.PCollection;import 
org.apache.crunch.Pipeline;import org.apache.crunch.impl.mr.MRPipeline;import 
org.apache.crunch.io.From;




import org.apache.crunch.io.To;import org.apache.crunch.test.Employee;import 
org.apache.crunch.test.Person;import org.apache.crunch.test.TemporaryPath;




import org.apache.crunch.test.TemporaryPaths;import 
org.apache.crunch.types.avro.Avros;import 
org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import 
org.apache.hadoop.fs.Path;




import org.junit.Rule;import org.junit.Test;
public class MultiAvroOutputIT {
  @Rule  public transient TemporaryPath tmpDir = TemporaryPaths.create();





  @Test  public void testMultiAvroWrite() throws Exception {    Path person = 
tmpDir.getPath("person");    Path employee = tmpDir.getPath("employee");




    Pipeline p = new MRPipeline(MultiAvroOutputIT.class, 
tmpDir.getDefaultConfiguration());    PCollection<String> shakes = 
p.read(From.textFile(tmpDir.copyResourcePath("shakes.txt")));





    shakes.parallelDo(new PersonFn(), Avros.specifics(Person.class))        
.write(To.avroFile(person));    shakes.parallelDo(new EmployeeFn(), 
Avros.specifics(Employee.class))




        .write(To.avroFile(employee));    p.run();
    FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());    
System.out.println("Person");




    for (FileStatus fstat : fs.listStatus(person)) {      
System.out.println(fstat.getPath() + ": " + fstat.getLen());    }    
System.out.println("Employee");




    for (FileStatus fstat : fs.listStatus(employee)) {      
System.out.println(fstat.getPath() + ": " + fstat.getLen());    }
    p.done();  }





  static class PersonFn extends MapFn<String, Person> {    @Override    public 
Person map(String input) {      return new Person();    }  }





  static class EmployeeFn extends MapFn<String, Employee> {    @Override    
public Employee map(String input) {      return new Employee();    }




  }
}

On Fri, Aug 22, 2014 at 8:12 AM, Josh Wills <[email protected]> wrote:





That is super-interesting; let me try to replicate it in a test.

J

On Fri, Aug 22, 2014 at 7:26 AM, Danny Morgan <[email protected]> wrote:









This issue looks similar to https://issues.apache.org/jira/browse/CRUNCH-67

It turns out even if I get rid of the reduce phase and do just this:







 PTable<String, String> lines = this.read(mySource);
 PCollection<Log> parsed = lines.parallelDo("initial-parsing", new myParser(), 
Avros.specifics(Log.class));







 PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing", 
new VisitsExtractor(),
          Avros.tableOf(Avros.specifics(Visit.class), 
Avros.pairs(Avros.longs(), Avros.longs())));







visits.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);
this.done();

The plan shows I should be writing to two different targets in a single map 
phase however only "/raw" as data written out to it and "/visits" just contains 
a _SUCCESS file and no data.







Might this be an issue writing out to two different Avro types in the same 
phase?

Thanks Again,

Danny


From: [email protected]






To: [email protected]
Subject: RE: Multiple Writes in a single pipeline.
Date: Fri, 22 Aug 2014 02:02:20 +0000




Hi Josh,


From: [email protected]
Date: Thu, 21 Aug 2014 17:40:25 -0700
Subject: Re: Multiple Writes in a single pipeline.






To: [email protected]
The two different executions you have are doing different things, however. In 
the first one, Crunch is running a single MapReduce job where the /raw 
directory is written as a mapper side-output, and the /visits directory is 
being written out on the reduce side (or at least, should be-- is there any 
evidence of a failure in the job in the logs? Are bytes being written out from 
the reducer?)








No evidence of any failures in the logs, the single mapper and reducers both 
succeed. The mapper definitely writes to HDFS the reducer does not, here are 
the relevant counters from the reducer:







FILE: Number of bytes read





6





FILE: Number of bytes written





91811





FILE: Number of large read operations





0FILE: Number of read operations





0





FILE: Number of write operations





0




HDFS: Number of bytes read





6205





HDFS: Number of bytes written





0





HDFS: Number of large read operations





0





HDFS: Number of read operations





4HDFS: Number of write operations





2
I couldn't find anything related on the crunch jira.

For this problem, I think it would be more efficient to write the parsed -> 
/raw output first, call run(), then do the agg -> /visits output followed by 
done(), which would mean that you would only need to parse the raw input once, 
instead of twice.







Would the first option be more efficient if it worked?



A helpful trick for seeing how the Crunch planner is mapping your logic into 
MapReduce jobs is to look at the plan dot file via one of the following 
mechanisms:
1) Instead of calling Pipeline.run(), call Pipeline.runAsync() and then call 
the getPlanDotFile() method on the returned PipelineExecution object. You can 
print the dot file to a file and use a dot file viewer to look at how the DoFns 
are broken up into MR jobs and map/reduce phases.







2) Call MRPipeline.plan() directly, which returns a MRExecutor object that also 
implements PipelineExecution. (The difference being that calling 
MRPipeline.plan will not start the jobs running, whereas calling runAsync will.)






I ran the two different version through dot and you're right they are two 
complete different executions, pretty cool!

Thanks!

                                                                                
  







-- 
Director of Data ScienceClouderaTwitter: @josh_wills









-- 
Director of Data ScienceClouderaTwitter: @josh_wills





                                          

                                                                                
  

Reply via email to