Hey Christian, Good to see you again, I hope all is well. This is a complex setup, but the good news is that we had to do it before for HBase 0.96, which also returns non-Writable values in an InputFormat. The code you're going to want to use as your reference is the HFileSource in crunch-hbase:
https://github.com/apache/crunch/blob/master/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java A few comments: 1) First, ignore the Void key; instead of trying to return a PTable, you're going to return a PCollection<NonWritableType> as the result of this source. The key to doing that is in the Converter implementation that is associated with the Source; look at the HBaseValueConverter (which is just a simple pass-through value converter) to see how we do it for HFileSource. The key thing to note in that converter is that the function applyPTypeTransforms() returns false; this means that when reading/writing data using that Converter, we don't apply the map functions from the PType associated with the source (which is the right thing to do here as well.) 2) I'm assuming that there is some hadoop Serialization that supports the non-Writable value type you're supporting that Hadoop has to be configured to read; be sure to override the configureSource() method in your Source to add those serializations to the Job configuration (again, see how it's done in HFileSource.configureSource) 3) Given all that, the PType for your non-writable class that is associated with the source should primarily be concerned with how to serialize it during a shuffle or a read/write from another input format (like Avro, or SequenceFile, or whatever), as we do in HBaseTypes. It won't actually be used for reading/writing from the custom input format. Hope that helps. J On Tue, Jun 10, 2014 at 2:42 AM, Christian Tzolov < [email protected]> wrote: > Hi all, > > I am trying to create a Crunch source for a custom InputFormat that has > structure like this: CustomInputFormat<Void, CustomNonWritableClass> > > I've tried two implementations with no success. I must be missing > something but not sure what? > > Implementation 1: Derive PType<Pair<Void, CustomNonWritableClass>> using > MapWritable as base type > ------------------------------------------------------------ > -------------------------------------------------------- > > PType<Pair<Void, CustomNonWritableClass>> derivedType = typeFamily.derived( > (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null, > null).getClass(), > new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {public > Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}}, > new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {public > MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}}, > typeFamily.records(MapWritable.class) > ); > > public class CustomDataSource extends FileTableSourceImpl<Void, > CustomNonWritableClass > { > > public CustomDataSource() { > super(new Path("xsource"), > (PTableType<Void, CustomNonWritableClass >) derivedType), > FormatBundle.forInput(CustomInputFormat.class)); > } > ... > } > > This implementation fails before submitting the job with the following > error: > > Exception in thread "main" java.lang.ClassCastException: > org.apache.crunch.types.writable.WritableType cannot be cast to > org.apache.crunch.types.PTableType > at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...) > > > > Implementation 2: Derive PType<CustomNonWritableClass> using MapWritable > as base type > > -------------------------------------------------------------------------------------------------------------------- > > public static MapWritableToCustomNonWritableClass extends > MapFn<MapWritable, CustomNonWritableClass> { > public CustomNonWritableClass map(MapWritable input) {...} > } > public static CustomNonWritableClassToMapWritable > extends MapFn<CustomNonWritableClass, MapWritable>() { > public MapWritable map(CustomNonWritableClass input) {...} > } > > PType<CustomNonWritableClass> derivedType = typeFamily.derived( > CustomNonWritableClass.class, > new MapWritableToCustomNonWritableClass(), > new CustomNonWritableClassToMapWritable(), > typeFamily.records(MapWritable.class) > ); > > public class CustomDataSource extends > FileSourceImpl<CustomNonWritableClass> { > > public CustomDataSource() { > super(new Path("xsource"), > (PTableType<Void, CustomNonWritableClass >) derivedType), > FormatBundle.forInput(CustomInputFormat.class)); > } > ... > } > > When run this gets submitted to the cluster but the MR job fails with: > > 2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290] > org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: > attempt_1401786307497_0078_m_000000_0 - exited : > java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be > cast to org.apache.hadoop.io.MapWritable > at > com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1) > at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63) > at org.apache.crunch.MapFn.process(MapFn.java:34) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110) > at > org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) > > Thanks, > Christian > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
