Anyone have any idea why the following:

RecordDeserializer<FteRecordRaw> fteSplit = new RecordDeserializer<>(
                           FteRecordRaw.class, new FteEntry().getSchema());
PCollection<String> fteStrings = pipe.read(From.textFile(fteIn, strings()));
PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
                                        records(FteRecordRaw.class));
fteStrings.write(To.textFile("/tmp/fte/fteRaw"), WriteMode.OVERWRITE);
fte.parallelDo(new RecordToDelimitedString<FteRecordRaw>(),
strings()).write(To.textFile("/tmp/fte/fteIn"), WriteMode.OVERWRITE);

Would yield

Exception in thread "main" java.lang.ClassCastException: 
org.apache.crunch.types.avro.AvroType cannot be cast to 
org.apache.crunch.types.PGroupedTableType
        at org.apache.crunch.impl.mr.plan.DoNode.toRTNode(DoNode.java:144)
        at 
org.apache.crunch.impl.mr.plan.JobPrototype.serialize(JobPrototype.java:243)
        at 
org.apache.crunch.impl.mr.plan.JobPrototype.build(JobPrototype.java:188)
        at 
org.apache.crunch.impl.mr.plan.JobPrototype.getCrunchJob(JobPrototype.java:134)
        at org.apache.crunch.impl.mr.plan.MSCRPlanner.plan(MSCRPlanner.java:200)
        at org.apache.crunch.impl.mr.MRPipeline.plan(MRPipeline.java:111)
        at org.apache.crunch.impl.mr.MRPipeline.runAsync(MRPipeline.java:132)
        at org.apache.crunch.impl.mr.MRPipeline.run(MRPipeline.java:120)
        at 
org.apache.crunch.impl.dist.DistributedPipeline.done(DistributedPipeline.java:119)
        at com.videologygroup.crunch.FteWarehouse.run(FteWarehouse.java:212)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at com.videologygroup.crunch.FteWarehouse.main(FteWarehouse.java:339)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

But when I change it to

RecordDeserializer<FteRecordRaw> fteSplit = new RecordDeserializer<>(
                           FteRecordRaw.class, new FteEntry().getSchema());
PCollection<String> fteStrings = pipe.read(From.textFile(fteIn, strings()));
PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
                                        records(FteRecordRaw.class));
//fteStrings.write(To.textFile("/tmp/fte/fteRaw"), WriteMode.OVERWRITE);
fte.parallelDo(new RecordToDelimitedString<FteRecordRaw>(),
strings()).write(To.textFile("/tmp/fte/fteIn"), WriteMode.OVERWRITE);

It runs (minus writing out that collection for debugging purposes)?
This email is intended only for the use of the individual(s) to whom it is 
addressed. If you have received this communication in error, please immediately 
notify the sender and delete the original email.

Reply via email to