Sorry for the late reply. It was my fault. A refactoring changed the java package name without changing Avro schema namespace in the $SCHEMA field, which caused Avro to fallback on generic records. Works find now!
On Tue, Jun 3, 2014 at 1:15 PM, Kristoffer Sjögren <[email protected]> wrote: > Thanks for the quick answer! My initial test still fail, but I may > have done something wrong here. I will do a more thorough test asap. > > On Mon, Jun 2, 2014 at 2:53 PM, Micah Whitacre <[email protected]> wrote: >> I don't believe it is a known issue. I modified an AvroParquetPipelineIT[1] >> to verify the output to a target using a source.. >> >> @Test >> public void toAvroParquetFileTargetFromParquet() throws Exception { >> GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); >> savedRecord.put("name", "John Doe"); >> savedRecord.put("age", 42); >> savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); >> populateGenericParquetFile(Lists.newArrayList(savedRecord), >> Person.SCHEMA$); >> >> Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, >> tmpDir.getDefaultConfiguration()); >> PCollection<Person> genericCollection = pipeline.read( >> new AvroParquetFileSource<Person>(new >> Path(avroFile.getAbsolutePath()), Avros.records(Person.class))); >> File outputFile = tmpDir.getFile("output"); >> Target parquetFileTarget = new >> AvroParquetFileTarget(outputFile.getAbsolutePath()); >> pipeline.write(genericCollection, parquetFileTarget); >> pipeline.run(); >> >> Person person = genericCollection.materialize().iterator().next(); >> >> PCollection<Person> persistedCollection = pipeline.read( >> new AvroParquetFileSource<Person>(new >> Path(outputFile.getAbsolutePath()), Avros.records(Person.class))); >> Person persistedPerson = >> persistedCollection.materialize().iterator().next(); >> >> Path parquetFile = new Path(new File(outputFile, >> "part-m-00000.parquet").getPath()); >> >> AvroParquetReader<Person> reader = new >> AvroParquetReader<Person>(parquetFile); >> >> try { >> Person readPerson = reader.read(); >> assertThat(readPerson, is(person)); >> assertThat(readPerson, is(persistedPerson)); >> } finally { >> reader.close(); >> } >> } >> >> The tests passes without any issues. There have been an number of fixes >> since the 0.8.0-cdh4.3.0 version. You might try upgrading to the latest >> version available (0.8.2+71-cdh4.6.0) and see if the problem still exists. >> If it does still exist a junit/integration test would be helpful to debug >> this issue. >> >> >> [1] - >> https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120 >> >> >> On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <[email protected]> wrote: >>> >>> Hi >>> >>> Im trying to read and write data using the avro+parquet combo that >>> ships with crunch 0.8.0-cdh4.3.0. >>> >>> - The writer job looks like this. >>> >>> PCollection<String> lines = ... >>> PCollection<User> p = lines.parallelDo(new DoFn<String, User>() { >>> @Override >>> public void process(String input, Emitter<User> emitter) { >>> User user = User.newBuilder().setName(input).build(); >>> emitter.emit(user); >>> } >>> }, Avros.records(User.class)); >>> >>> AvroParquetFileSourceTarget fout = new >>> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class)); >>> pipeline.write(p, fout); >>> >>> - The reader job looks like this. >>> >>> AvroParquetFileSource<User> file = new >>> AvroParquetFileSource<User>(out, Avros.records(User.class)); >>> PCollection<User> users = pipeline.read(file); >>> // this line fails with a ClassCastException >>> PCollection<String> lines = users.parallelDo(new DoFn<User, String>() { >>> @Override >>> public void process(User user, Emitter<String> emitter) { >>> emitter.emit(user.getName().toString()); >>> } >>> }, Writables.strings()); >>> >>> >>> However, the reader fails with a java.lang.ClassCastException? Is this >>> a know issue or am I doing something wrong? >>> >>> Cheers, >>> -Kristoffer >>> >>> >>> java.lang.ClassCastException: >>> org.apache.avro.generic.GenericData$Record cannot be cast to >>> mapred.jobs.User >>> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22) >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) >>> at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>> at org.apache.crunch.MapFn.process(MapFn.java:34) >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110) >>> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) >>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140) >>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672) >>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330) >>> at org.apache.hadoop.mapred.Child$4.run(Child.java:268) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at javax.security.auth.Subject.doAs(Subject.java:396) >> >>
