Yeah, what I mean is that this some other part of the pipeline logic is
getting conflated with the trivial output of this PCollection and is
causing some sort of problem. My first guess is that there is a bug in
handling PCollections that are simultaneously inputs and outputs of a
pipeline, like fteStrings. If you add a trivial identity transform, e.g.,

fteStrings.filter(FilterFns.ACCEPT_ALL()).write(To.textFile("/tmp/fte/fteRaw"),
WriteMode.OVERWRITE);

Does it work?

On Thu, Feb 19, 2015 at 9:57 AM, David Ortiz <[email protected]>
wrote:

>  There is plenty more pipeline logic around.  That snippet is the
> entirety of that PCollection’s usage though.
>
>
>
> *From:* Josh Wills [mailto:[email protected]]
> *Sent:* Thursday, February 19, 2015 12:55 PM
> *To:* [email protected]
> *Subject:* Re: Weird error writing a collection of Strings
>
>
>
> The likely cause of that exception is trying to write a PGroupedTable out
> to disk directly w/o ungrouping it first, but I don't see any GBK
> operations in the snippet you sent-- I assume there's more pipeline logic
> around?
>
>
>
> J
>
>
>
> On Thu, Feb 19, 2015 at 9:49 AM, David Ortiz <[email protected]>
> wrote:
>
>  Anyone have any idea why the following:
>
>
>
> RecordDeserializer<FteRecordRaw> fteSplit = *new* RecordDeserializer<>(
>
>                            FteRecordRaw.*class*, *new*
> FteEntry().getSchema());
>
> PCollection<String> fteStrings = pipe.read(From.*textFile*(fteIn,
> *strings*()));
>
> PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
>
>                                         *records*(FteRecordRaw.*class*));
>
> fteStrings.write(To.textFile("/*tmp*/*fte*/fteRaw"), WriteMode.OVERWRITE);
>
> fte.parallelDo(*new* RecordToDelimitedString<FteRecordRaw>(),
>
> *strings*()).write(To.*textFile*("/tmp/fte/fteIn"), WriteMode.*OVERWRITE*
> );
>
>
>
> Would yield
>
>
>
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.crunch.types.avro.AvroType cannot be cast to
> org.apache.crunch.types.PGroupedTableType
>
>         at org.apache.crunch.impl.mr.plan.DoNode.toRTNode(DoNode.java:144)
>
>         at
> org.apache.crunch.impl.mr.plan.JobPrototype.serialize(JobPrototype.java:243)
>
>         at
> org.apache.crunch.impl.mr.plan.JobPrototype.build(JobPrototype.java:188)
>
>         at
> org.apache.crunch.impl.mr.plan.JobPrototype.getCrunchJob(JobPrototype.java:134)
>
>         at
> org.apache.crunch.impl.mr.plan.MSCRPlanner.plan(MSCRPlanner.java:200)
>
>         at org.apache.crunch.impl.mr.MRPipeline.plan(MRPipeline.java:111)
>
>         at
> org.apache.crunch.impl.mr.MRPipeline.runAsync(MRPipeline.java:132)
>
>         at org.apache.crunch.impl.mr.MRPipeline.run(MRPipeline.java:120)
>
>         at
> org.apache.crunch.impl.dist.DistributedPipeline.done(DistributedPipeline.java:119)
>
>         at
> com.videologygroup.crunch.FteWarehouse.run(FteWarehouse.java:212)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
>
>         at
> com.videologygroup.crunch.FteWarehouse.main(FteWarehouse.java:339)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:606)
>
>         at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
>
>
>
> But when I change it to
>
>
>
> RecordDeserializer<FteRecordRaw> fteSplit = *new* RecordDeserializer<>(
>
>                            FteRecordRaw.*class*, *new*
> FteEntry().getSchema());
>
> PCollection<String> fteStrings = pipe.read(From.*textFile*(fteIn,
> *strings*()));
>
> PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
>
>                                         *records*(FteRecordRaw.*class*));
>
> //fteStrings.write(To.textFile("/*tmp*/*fte*/fteRaw"),
> WriteMode.OVERWRITE);
>
> fte.parallelDo(*new* RecordToDelimitedString<FteRecordRaw>(),
>
> *strings*()).write(To.*textFile*("/tmp/fte/fteIn"), WriteMode.*OVERWRITE*
> );
>
>
>
> It runs (minus writing out that collection for debugging purposes)?
>
> *This email is intended only for the use of the individual(s) to whom it
> is addressed. If you have received this communication in error, please
> immediately notify the sender and delete the original email.*
>
>
>
>
>
> --
>
> Director of Data Science
>
> Cloudera <http://www.cloudera.com>
>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>   *This email is intended only for the use of the individual(s) to whom
> it is addressed. If you have received this communication in error, please
> immediately notify the sender and delete the original email.*
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Reply via email to