Yes.  Changed it to

fteStrings.filter(FilterFns.<String>ACCEPT_ALL()).write(To.textFile(“/tmp/fte/fteRaw”),
 WriteMode.OVERWRITE);

and it is running now.

Thanks!

From: Josh Wills [mailto:[email protected]]
Sent: Thursday, February 19, 2015 1:07 PM
To: [email protected]
Subject: Re: Weird error writing a collection of Strings

Yeah, what I mean is that this some other part of the pipeline logic is getting 
conflated with the trivial output of this PCollection and is causing some sort 
of problem. My first guess is that there is a bug in handling PCollections that 
are simultaneously inputs and outputs of a pipeline, like fteStrings. If you 
add a trivial identity transform, e.g.,

fteStrings.filter(FilterFns.ACCEPT_ALL()).write(To.textFile("/tmp/fte/fteRaw"), 
WriteMode.OVERWRITE);

Does it work?

On Thu, Feb 19, 2015 at 9:57 AM, David Ortiz 
<[email protected]<mailto:[email protected]>> wrote:
There is plenty more pipeline logic around.  That snippet is the entirety of 
that PCollection’s usage though.

From: Josh Wills [mailto:[email protected]<mailto:[email protected]>]
Sent: Thursday, February 19, 2015 12:55 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Weird error writing a collection of Strings

The likely cause of that exception is trying to write a PGroupedTable out to 
disk directly w/o ungrouping it first, but I don't see any GBK operations in 
the snippet you sent-- I assume there's more pipeline logic around?

J

On Thu, Feb 19, 2015 at 9:49 AM, David Ortiz 
<[email protected]<mailto:[email protected]>> wrote:
Anyone have any idea why the following:

RecordDeserializer<FteRecordRaw> fteSplit = new RecordDeserializer<>(
                           FteRecordRaw.class, new FteEntry().getSchema());
PCollection<String> fteStrings = pipe.read(From.textFile(fteIn, strings()));
PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
                                        records(FteRecordRaw.class));
fteStrings.write(To.textFile("/tmp/fte/fteRaw"), WriteMode.OVERWRITE);
fte.parallelDo(new RecordToDelimitedString<FteRecordRaw>(),
strings()).write(To.textFile("/tmp/fte/fteIn"), WriteMode.OVERWRITE);

Would yield

Exception in thread "main" java.lang.ClassCastException: 
org.apache.crunch.types.avro.AvroType cannot be cast to 
org.apache.crunch.types.PGroupedTableType
        at org.apache.crunch.impl.mr.plan.DoNode.toRTNode(DoNode.java:144)
        at 
org.apache.crunch.impl.mr.plan.JobPrototype.serialize(JobPrototype.java:243)
        at 
org.apache.crunch.impl.mr.plan.JobPrototype.build(JobPrototype.java:188)
        at 
org.apache.crunch.impl.mr.plan.JobPrototype.getCrunchJob(JobPrototype.java:134)
        at org.apache.crunch.impl.mr.plan.MSCRPlanner.plan(MSCRPlanner.java:200)
        at org.apache.crunch.impl.mr.MRPipeline.plan(MRPipeline.java:111)
        at org.apache.crunch.impl.mr.MRPipeline.runAsync(MRPipeline.java:132)
        at org.apache.crunch.impl.mr.MRPipeline.run(MRPipeline.java:120)
        at 
org.apache.crunch.impl.dist.DistributedPipeline.done(DistributedPipeline.java:119)
        at com.videologygroup.crunch.FteWarehouse.run(FteWarehouse.java:212)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at com.videologygroup.crunch.FteWarehouse.main(FteWarehouse.java:339)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

But when I change it to

RecordDeserializer<FteRecordRaw> fteSplit = new RecordDeserializer<>(
                           FteRecordRaw.class, new FteEntry().getSchema());
PCollection<String> fteStrings = pipe.read(From.textFile(fteIn, strings()));
PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
                                        records(FteRecordRaw.class));
//fteStrings.write(To.textFile("/tmp/fte/fteRaw"), WriteMode.OVERWRITE);
fte.parallelDo(new RecordToDelimitedString<FteRecordRaw>(),
strings()).write(To.textFile("/tmp/fte/fteIn"), WriteMode.OVERWRITE);

It runs (minus writing out that collection for debugging purposes)?
This email is intended only for the use of the individual(s) to whom it is 
addressed. If you have received this communication in error, please immediately 
notify the sender and delete the original email.



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>
This email is intended only for the use of the individual(s) to whom it is 
addressed. If you have received this communication in error, please immediately 
notify the sender and delete the original email.



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>
This email is intended only for the use of the individual(s) to whom it is 
addressed. If you have received this communication in error, please immediately 
notify the sender and delete the original email.

Reply via email to