I don't believe it is a known issue. I modified an
AvroParquetPipelineIT[1] to verify the output to a target using a source..
@Test
public void toAvroParquetFileTargetFromParquet() throws Exception {
GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
savedRecord.put("name", "John Doe");
savedRecord.put("age", 42);
savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
populateGenericParquetFile(Lists.newArrayList(savedRecord),
Person.SCHEMA$);
Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class,
tmpDir.getDefaultConfiguration());
PCollection<Person> genericCollection = pipeline.read(
new AvroParquetFileSource<Person>(new
Path(avroFile.getAbsolutePath()), Avros.records(Person.class)));
File outputFile = tmpDir.getFile("output");
Target parquetFileTarget = new
AvroParquetFileTarget(outputFile.getAbsolutePath());
pipeline.write(genericCollection, parquetFileTarget);
pipeline.run();
Person person = genericCollection.materialize().iterator().next();
PCollection<Person> persistedCollection = pipeline.read(
new AvroParquetFileSource<Person>(new
Path(outputFile.getAbsolutePath()), Avros.records(Person.class)));
Person persistedPerson =
persistedCollection.materialize().iterator().next();
Path parquetFile = new Path(new File(outputFile,
"part-m-00000.parquet").getPath());
AvroParquetReader<Person> reader = new
AvroParquetReader<Person>(parquetFile);
try {
Person readPerson = reader.read();
assertThat(readPerson, is(person));
assertThat(readPerson, is(persistedPerson));
} finally {
reader.close();
}
}
The tests passes without any issues. There have been an number of fixes
since the 0.8.0-cdh4.3.0 version. You might try upgrading to the latest
version available (0.8.2+71-cdh4.6.0) and see if the problem still exists.
If it does still exist a junit/integration test would be helpful to debug
this issue.
[1] -
https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120
On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <[email protected]> wrote:
> Hi
>
> Im trying to read and write data using the avro+parquet combo that
> ships with crunch 0.8.0-cdh4.3.0.
>
> - The writer job looks like this.
>
> PCollection<String> lines = ...
> PCollection<User> p = lines.parallelDo(new DoFn<String, User>() {
> @Override
> public void process(String input, Emitter<User> emitter) {
> User user = User.newBuilder().setName(input).build();
> emitter.emit(user);
> }
> }, Avros.records(User.class));
>
> AvroParquetFileSourceTarget fout = new
> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class));
> pipeline.write(p, fout);
>
> - The reader job looks like this.
>
> AvroParquetFileSource<User> file = new
> AvroParquetFileSource<User>(out, Avros.records(User.class));
> PCollection<User> users = pipeline.read(file);
> // this line fails with a ClassCastException
> PCollection<String> lines = users.parallelDo(new DoFn<User, String>() {
> @Override
> public void process(User user, Emitter<String> emitter) {
> emitter.emit(user.getName().toString());
> }
> }, Writables.strings());
>
>
> However, the reader fails with a java.lang.ClassCastException? Is this
> a know issue or am I doing something wrong?
>
> Cheers,
> -Kristoffer
>
>
> java.lang.ClassCastException:
> org.apache.avro.generic.GenericData$Record cannot be cast to
> mapred.jobs.User
> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
> at
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> at org.apache.crunch.MapFn.process(MapFn.java:34)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
> at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:396)
>