Anyone have any idea why the following:
RecordDeserializer<FteRecordRaw> fteSplit = new RecordDeserializer<>(
FteRecordRaw.class, new FteEntry().getSchema());
PCollection<String> fteStrings = pipe.read(From.textFile(fteIn, strings()));
PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
records(FteRecordRaw.class));
fteStrings.write(To.textFile("/tmp/fte/fteRaw"), WriteMode.OVERWRITE);
fte.parallelDo(new RecordToDelimitedString<FteRecordRaw>(),
strings()).write(To.textFile("/tmp/fte/fteIn"), WriteMode.OVERWRITE);
Would yield
Exception in thread "main" java.lang.ClassCastException:
org.apache.crunch.types.avro.AvroType cannot be cast to
org.apache.crunch.types.PGroupedTableType
at org.apache.crunch.impl.mr.plan.DoNode.toRTNode(DoNode.java:144)
at
org.apache.crunch.impl.mr.plan.JobPrototype.serialize(JobPrototype.java:243)
at
org.apache.crunch.impl.mr.plan.JobPrototype.build(JobPrototype.java:188)
at
org.apache.crunch.impl.mr.plan.JobPrototype.getCrunchJob(JobPrototype.java:134)
at org.apache.crunch.impl.mr.plan.MSCRPlanner.plan(MSCRPlanner.java:200)
at org.apache.crunch.impl.mr.MRPipeline.plan(MRPipeline.java:111)
at org.apache.crunch.impl.mr.MRPipeline.runAsync(MRPipeline.java:132)
at org.apache.crunch.impl.mr.MRPipeline.run(MRPipeline.java:120)
at
org.apache.crunch.impl.dist.DistributedPipeline.done(DistributedPipeline.java:119)
at com.videologygroup.crunch.FteWarehouse.run(FteWarehouse.java:212)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at com.videologygroup.crunch.FteWarehouse.main(FteWarehouse.java:339)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
But when I change it to
RecordDeserializer<FteRecordRaw> fteSplit = new RecordDeserializer<>(
FteRecordRaw.class, new FteEntry().getSchema());
PCollection<String> fteStrings = pipe.read(From.textFile(fteIn, strings()));
PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
records(FteRecordRaw.class));
//fteStrings.write(To.textFile("/tmp/fte/fteRaw"), WriteMode.OVERWRITE);
fte.parallelDo(new RecordToDelimitedString<FteRecordRaw>(),
strings()).write(To.textFile("/tmp/fte/fteIn"), WriteMode.OVERWRITE);
It runs (minus writing out that collection for debugging purposes)?
This email is intended only for the use of the individual(s) to whom it is
addressed. If you have received this communication in error, please immediately
notify the sender and delete the original email.