The likely cause of that exception is trying to write a PGroupedTable out to disk directly w/o ungrouping it first, but I don't see any GBK operations in the snippet you sent-- I assume there's more pipeline logic around?
J On Thu, Feb 19, 2015 at 9:49 AM, David Ortiz <[email protected]> wrote: > Anyone have any idea why the following: > > > > RecordDeserializer<FteRecordRaw> fteSplit = *new* RecordDeserializer<>( > > FteRecordRaw.*class*, *new* > FteEntry().getSchema()); > > PCollection<String> fteStrings = pipe.read(From.*textFile*(fteIn, > *strings*())); > > PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit, > > *records*(FteRecordRaw.*class*)); > > fteStrings.write(To.textFile("/*tmp*/*fte*/fteRaw"), WriteMode.OVERWRITE); > > fte.parallelDo(*new* RecordToDelimitedString<FteRecordRaw>(), > > *strings*()).write(To.*textFile*("/tmp/fte/fteIn"), WriteMode.*OVERWRITE* > ); > > > > Would yield > > > > Exception in thread "main" java.lang.ClassCastException: > org.apache.crunch.types.avro.AvroType cannot be cast to > org.apache.crunch.types.PGroupedTableType > > at org.apache.crunch.impl.mr.plan.DoNode.toRTNode(DoNode.java:144) > > at > org.apache.crunch.impl.mr.plan.JobPrototype.serialize(JobPrototype.java:243) > > at > org.apache.crunch.impl.mr.plan.JobPrototype.build(JobPrototype.java:188) > > at > org.apache.crunch.impl.mr.plan.JobPrototype.getCrunchJob(JobPrototype.java:134) > > at > org.apache.crunch.impl.mr.plan.MSCRPlanner.plan(MSCRPlanner.java:200) > > at org.apache.crunch.impl.mr.MRPipeline.plan(MRPipeline.java:111) > > at > org.apache.crunch.impl.mr.MRPipeline.runAsync(MRPipeline.java:132) > > at org.apache.crunch.impl.mr.MRPipeline.run(MRPipeline.java:120) > > at > org.apache.crunch.impl.dist.DistributedPipeline.done(DistributedPipeline.java:119) > > at > com.videologygroup.crunch.FteWarehouse.run(FteWarehouse.java:212) > > at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) > > at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84) > > at > com.videologygroup.crunch.FteWarehouse.main(FteWarehouse.java:339) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > > at org.apache.hadoop.util.RunJar.main(RunJar.java:212) > > > > But when I change it to > > > > RecordDeserializer<FteRecordRaw> fteSplit = *new* RecordDeserializer<>( > > FteRecordRaw.*class*, *new* > FteEntry().getSchema()); > > PCollection<String> fteStrings = pipe.read(From.*textFile*(fteIn, > *strings*())); > > PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit, > > *records*(FteRecordRaw.*class*)); > > //fteStrings.write(To.textFile("/*tmp*/*fte*/fteRaw"), > WriteMode.OVERWRITE); > > fte.parallelDo(*new* RecordToDelimitedString<FteRecordRaw>(), > > *strings*()).write(To.*textFile*("/tmp/fte/fteIn"), WriteMode.*OVERWRITE* > ); > > > > It runs (minus writing out that collection for debugging purposes)? > *This email is intended only for the use of the individual(s) to whom it > is addressed. If you have received this communication in error, please > immediately notify the sender and delete the original email.* > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
