I dug into this a little more and it seems I can scrunch.map to Avro and String 
but not to other data types (Put, Pair, Tuple2, etc) as I get an exception [1] 
where it’s trying to create an Avro even though I’m trying to map to a 
something else.   I briefly looked into the code and found that PCollectionLike 
hard codes the TypeFamily to Avros [2].  Sean Griffin dug more deeply into the 
code and his analysis was:

>From what I can gather from studying the code, I think it would take 
>considerable enhancements to both Crunch and Scrunch to pull this off.  I 
>think you're going to have to resort to parallelDo calls..  Let's see how well 
>I can explain the problem.

map is defined as
  def map[T, To](f: S => T)(implicit pt: PTypeH[T], b: CanParallelTransform[T, 
To]): To = {
    b(this, mapFn(f), pt.get(getTypeFamily()))
  }

In our case T is type Put.  Also of interest is the getTypeFamily() return 
value that's passed into pt.get() is hard-coded to be the Avros PTypeFamily in 
the PCollectionLike.scala file, and there's nothing in the PCollection class 
that implements that trait to tell it any differently.  This means that there 
would need to be a PTypeH.get method that, when given an Avros PTypeFamily, 
knows what to do with a Put.  But there's nothing in PTypeH that supports 
generic Writeable objects, and even if there was, the AvrosTypeFamily does not 
support writables in underlying Crunch.



[1]
java.lang.RuntimeException: java.lang.InstantiationException
at 
sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:30)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:264)
at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:282)
at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:170)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:220)
at 
org.apache.crunch.io.avro.AvroFileReaderFactory$1.next(AvroFileReaderFactory.java:80)
at 
org.apache.crunch.io.impl.AutoClosingIterator.next(AutoClosingIterator.java:52)
at 
org.apache.crunch.io.CompositePathIterable$2.next(CompositePathIterable.java:99)
at com.google.common.collect.Iterators$5.next(Iterators.java:554)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)


[2] 
https://github.com/apache/crunch/blob/master/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala#L64

From: <Barry>, Nathan Barry <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, March 18, 2014 at 8:12 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: scrunch exception mapping Avros to HBase Puts

I’m reworking some existing code migrating the logic [1] to leverage scrunch 
going away from crunch based logic [2] and I’m getting exceptions when trying 
to map a scrunch PCollection of avros to a PCollection of HBase Puts – this 
happens when using scala-2.9.3 + scrunch 0.8.0 as well as scala-2.10.3 + 
scrunch 0.8.2+32, though the details in the stack are slightly different.  The 
same logic using crunch in scala does not have the same problem

In both cases the exception is stems from SpecificData.getSchema, but from 
there the 2.9/0.8 stack [3] differs slightly from the 2.10/0.8.2 stack.

Has anyone seen anything like this before?  Or know of ways to potentially work 
around it?  This is not a show stopper but the scrunch code is so much simpler 
I’d prefer to go that route if at all possible.


Thanks in advance, Nathan

[1] concepts.map(concept => conn.getSession().newPut(tableName, 
entityKey(concept.getId))).setColumn(FAMILY, COLUMN, concept).toRawPut)

[2] concepts.parallelDo("convertMyAvroToPut", new 
ConvertToPutFn[MyAvro](timestamp, tableName,

        concept => entityKey(concept.getId), (putRequest, concept)  => 
putRequest.setColumn(FAMILY, COLUMN, concept).toRawPut), 
org.apache.crunch.types.writable.Writables.writables(classOf[Put]))

[3] org.apache.avro.AvroTypeException: Map key class not String: byte[]
at 
org.apache.crunch.scrunch.ScalaSafeReflectData.createSchema(ScalaSafeReflectData.java:105)
at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:430)
at 
org.apache.avro.reflect.ReflectData$AllowNull.createFieldSchema(ReflectData.java:72)
at 
org.apache.crunch.scrunch.ScalaSafeReflectData.createSchema(ScalaSafeReflectData.java:170)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:154)
at org.apache.crunch.types.avro.Avros.reflects(Avros.java:239)
at org.apache.crunch.types.avro.Avros.containers(Avros.java:229)
at org.apache.crunch.types.avro.Avros.records(Avros.java:217)
at org.apache.crunch.types.avro.AvroTypeFamily.records(AvroTypeFamily.java:92)
at org.apache.crunch.scrunch.PTypeFamily$class.records(PTypeFamily.scala:40)
at org.apache.crunch.scrunch.Avros$.records(PTypeFamily.scala:122)
at org.apache.crunch.scrunch.PTypeH$$anon$15.get(Conversions.scala:135)
at org.apache.crunch.scrunch.PCollection.map(PCollection.scala:36)


[4] java.lang.ClassCastException: 
sun.reflect.generics.reflectiveObjects.GenericArrayTypeImpl cannot be cast to 
java.lang.Class
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:273)
at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:430)
at 
org.apache.avro.reflect.ReflectData$AllowNull.createFieldSchema(ReflectData.java:72)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:354)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:154)
at org.apache.crunch.types.avro.Avros.reflects(Avros.java:279)
at org.apache.crunch.types.avro.Avros.containers(Avros.java:269)
at org.apache.crunch.types.avro.Avros.records(Avros.java:257)
at org.apache.crunch.types.avro.AvroTypeFamily.records(AvroTypeFamily.java:93)
at org.apache.crunch.scrunch.PTypeFamily$class.records(PTypeFamily.scala:47)
at org.apache.crunch.scrunch.Avros$.records(PTypeFamily.scala:129)
at org.apache.crunch.scrunch.PTypeH$$anon$15.get(Conversions.scala:135)
at org.apache.crunch.scrunch.PCollection.map(PCollection.scala:36)



CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.

Reply via email to