Hey Lucy, the returned results of a PCollection.materialize can't be serialized inside of a DoFn or record type, even if the contents themselves are serializable (one of many mistakes I made writing Crunch.) That said, there are a couple of options:
1) Read the data in from materialize() and store the results in a collection type that is serializable, like a List. 2) If you don't want to read the data into the client (and I can totally understand why you wouldn't want to), but you do want to read it inside of a MapReduce job via a DoFn, you can use the ReadableData<T> structure, which is returned by the PCollection.asReadable(boolean) method, where the arg indicates whether or not the PCollection should be materialized before it's read (true), or if we should try to execute any DoFns we need to filter/transform the PCollection from within the MR job itself (false). In general, "true" is the safer option, but "false" could be faster. J On Wed, Apr 29, 2015 at 4:40 PM, Lucy Chen <[email protected]> wrote: > Hi Micah, > > Here are the exception that I got. Thank for your response. If you > know a solution, please let me know. > > Lucy > > 15/04/29 23:37:07 INFO Configuration.deprecation: mapred.job.tracker is > deprecated. Instead, use mapreduce.jobtracker.address > Exception in thread "main" org.apache.crunch.CrunchRuntimeException: > java.io.NotSerializableException: > org.apache.crunch.materialize.MaterializableIterable > at org.apache.crunch.impl.mr.MRPipeline.plan(MRPipeline.java:113) > at org.apache.crunch.impl.mr.MRPipeline.runAsync(MRPipeline.java:132) > at org.apache.crunch.impl.mr.MRPipeline.run(MRPipeline.java:120) > at > org.apache.crunch.impl.dist.DistributedPipeline.done(DistributedPipeline.java:117) > at > com.apple.rsp.ensemble.ContentModelGenerator.main(ContentModelGenerator.java:105) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at org.apache.hadoop.util.RunJar.main(RunJar.java:212) > Caused by: java.io.NotSerializableException: > org.apache.crunch.materialize.MaterializableIterable > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346) > at java.util.ArrayList.writeObject(ArrayList.java:710) > at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346) > at java.util.ArrayList.writeObject(ArrayList.java:710) > at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346) > at org.apache.crunch.util.DistCache.write(DistCache.java:55) > at > org.apache.crunch.impl.mr.plan.JobPrototype.serialize(JobPrototype.java:246) > at org.apache.crunch.impl.mr.plan.JobPrototype.build(JobPrototype.java:217) > at > org.apache.crunch.impl.mr.plan.JobPrototype.getCrunchJob(JobPrototype.java:134) > at org.apache.crunch.impl.mr.plan.MSCRPlanner.plan(MSCRPlanner.java:197) > at org.apache.crunch.impl.mr.MRPipeline.plan(MRPipeline.java:111) > > On Wed, Apr 29, 2015 at 3:52 PM, Micah Whitacre <[email protected]> > wrote: > >> Lucy, >> Can you provide a stack trace of the exceptions you are getting? The >> error might be with how Avro is trying to create a Schema for your class >> using ReflectData[1] >> >> [1] - >> https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/reflect/ReflectData.html >> >> On Wed, Apr 29, 2015 at 5:39 PM, Lucy Chen <[email protected]> >> wrote: >> >>> Hi, >>> >>> I have my own class defined as follows: >>> >>> public class A implements java.io.Serializable, Cloneable{ >>> >>> private String id; >>> >>> private String name; >>> >>> private Map<String, Float> codes; >>> >>> >>> public A() >>> >>> { >>> >>> this(null, null, new HashMap<String, Float>()); >>> >>> } >>> >>> >>> ............................ >>> >>> } >>> >>> >>> and will generate a PCollection<PTypeA> DataA, PType<A> PTypeA = >>> Avros.records(A.class); >>> >>> >>> Then when it is materialized by Iterable<PTypeA> >>> DataA_mat.materialize(), it will throw a not serializable exception or not >>> materializable exception. I think if I include Set<> as a member of class A >>> instead of Map, it will fail too; >>> >>> >>> Can any one knows how we can define the class appropriately if we want >>> to materialize DataA here? Or is there another serialization way instead of >>> Avros.records so that materialize() will work as well? I can only make it >>> work for a class excluding a Set or Map type member. That makes a lot of >>> trouble for me. >>> >>> >>> Thanks. >>> >>> >>> Lucy >>> >> >> > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
