On Thu, Jun 12, 2014 at 10:58 AM, Christian Tzolov < [email protected]> wrote:
> Hi Josh, > > Good to see you too. Thanks for the HFileSource reference. The converter > did the trick and i can read the data using the custom inputformat. > Now I am struggling with the NonWritableType as i have no control over the > implementation and the later provides no means for serialization. There are > couple of ideas to explore but those are not crunch related. > > Thanks again for the helpful information. > > Cheers, Chris > > P.S. Between your post and today my son was born! Have a drink or two on > our behalf ;) > Hey, that's awesome! Congratulations!! > > > On Tue, Jun 10, 2014 at 5:26 PM, Josh Wills <[email protected]> wrote: > >> Hey Christian, >> >> Good to see you again, I hope all is well. This is a complex setup, but >> the good news is that we had to do it before for HBase 0.96, which also >> returns non-Writable values in an InputFormat. The code you're going to >> want to use as your reference is the HFileSource in crunch-hbase: >> >> >> https://github.com/apache/crunch/blob/master/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java >> >> A few comments: >> 1) First, ignore the Void key; instead of trying to return a PTable, >> you're going to return a PCollection<NonWritableType> as the result of this >> source. The key to doing that is in the Converter implementation that is >> associated with the Source; look at the HBaseValueConverter (which is just >> a simple pass-through value converter) to see how we do it for HFileSource. >> The key thing to note in that converter is that the function >> applyPTypeTransforms() returns false; this means that when reading/writing >> data using that Converter, we don't apply the map functions from the PType >> associated with the source (which is the right thing to do here as well.) >> 2) I'm assuming that there is some hadoop Serialization that supports the >> non-Writable value type you're supporting that Hadoop has to be configured >> to read; be sure to override the configureSource() method in your Source to >> add those serializations to the Job configuration (again, see how it's done >> in HFileSource.configureSource) >> 3) Given all that, the PType for your non-writable class that is >> associated with the source should primarily be concerned with how to >> serialize it during a shuffle or a read/write from another input format >> (like Avro, or SequenceFile, or whatever), as we do in HBaseTypes. It won't >> actually be used for reading/writing from the custom input format. >> >> Hope that helps. >> >> J >> >> >> On Tue, Jun 10, 2014 at 2:42 AM, Christian Tzolov < >> [email protected]> wrote: >> >>> Hi all, >>> >>> I am trying to create a Crunch source for a custom InputFormat that has >>> structure like this: CustomInputFormat<Void, CustomNonWritableClass> >>> >>> I've tried two implementations with no success. I must be missing >>> something but not sure what? >>> >>> Implementation 1: Derive PType<Pair<Void, CustomNonWritableClass>> using >>> MapWritable as base type >>> ------------------------------------------------------------ >>> -------------------------------------------------------- >>> >>> PType<Pair<Void, CustomNonWritableClass>> derivedType = >>> typeFamily.derived( >>> (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null, >>> null).getClass(), >>> new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {public >>> Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}}, >>> new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {public >>> MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}}, >>> typeFamily.records(MapWritable.class) >>> ); >>> >>> public class CustomDataSource extends FileTableSourceImpl<Void, >>> CustomNonWritableClass > { >>> >>> public CustomDataSource() { >>> super(new Path("xsource"), >>> (PTableType<Void, CustomNonWritableClass >) derivedType), >>> FormatBundle.forInput(CustomInputFormat.class)); >>> } >>> ... >>> } >>> >>> This implementation fails before submitting the job with the following >>> error: >>> >>> Exception in thread "main" java.lang.ClassCastException: >>> org.apache.crunch.types.writable.WritableType cannot be cast to >>> org.apache.crunch.types.PTableType >>> at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...) >>> >>> >>> >>> Implementation 2: Derive PType<CustomNonWritableClass> using MapWritable >>> as base type >>> >>> -------------------------------------------------------------------------------------------------------------------- >>> >>> public static MapWritableToCustomNonWritableClass extends >>> MapFn<MapWritable, CustomNonWritableClass> { >>> public CustomNonWritableClass map(MapWritable input) {...} >>> } >>> public static CustomNonWritableClassToMapWritable >>> extends MapFn<CustomNonWritableClass, MapWritable>() { >>> public MapWritable map(CustomNonWritableClass input) {...} >>> } >>> >>> PType<CustomNonWritableClass> derivedType = typeFamily.derived( >>> CustomNonWritableClass.class, >>> new MapWritableToCustomNonWritableClass(), >>> new CustomNonWritableClassToMapWritable(), >>> typeFamily.records(MapWritable.class) >>> ); >>> >>> public class CustomDataSource extends >>> FileSourceImpl<CustomNonWritableClass> { >>> >>> public CustomDataSource() { >>> super(new Path("xsource"), >>> (PTableType<Void, CustomNonWritableClass >) derivedType), >>> FormatBundle.forInput(CustomInputFormat.class)); >>> } >>> ... >>> } >>> >>> When run this gets submitted to the cluster but the MR job fails with: >>> >>> 2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290] >>> org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: >>> attempt_1401786307497_0078_m_000000_0 - exited : >>> java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be >>> cast to org.apache.hadoop.io.MapWritable >>> at >>> com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1) >>> at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63) >>> at org.apache.crunch.MapFn.process(MapFn.java:34) >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110) >>> at >>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) >>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) >>> >>> Thanks, >>> Christian >>> >> >> >> >> -- >> Director of Data Science >> Cloudera <http://www.cloudera.com> >> Twitter: @josh_wills <http://twitter.com/josh_wills> >> > > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
