Ah, that's a good one. Glad it's working now.
On Wed, Jun 4, 2014 at 7:38 AM, Kristoffer Sjögren <[email protected]> wrote: > Sorry for the late reply. > > It was my fault. A refactoring changed the java package name without > changing Avro schema namespace in the $SCHEMA field, which caused Avro > to fallback on generic records. Works find now! > > On Tue, Jun 3, 2014 at 1:15 PM, Kristoffer Sjögren <[email protected]> > wrote: > > Thanks for the quick answer! My initial test still fail, but I may > > have done something wrong here. I will do a more thorough test asap. > > > > On Mon, Jun 2, 2014 at 2:53 PM, Micah Whitacre <[email protected]> > wrote: > >> I don't believe it is a known issue. I modified an > AvroParquetPipelineIT[1] > >> to verify the output to a target using a source.. > >> > >> @Test > >> public void toAvroParquetFileTargetFromParquet() throws Exception { > >> GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); > >> savedRecord.put("name", "John Doe"); > >> savedRecord.put("age", 42); > >> savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", > "Jane")); > >> populateGenericParquetFile(Lists.newArrayList(savedRecord), > >> Person.SCHEMA$); > >> > >> Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, > >> tmpDir.getDefaultConfiguration()); > >> PCollection<Person> genericCollection = pipeline.read( > >> new AvroParquetFileSource<Person>(new > >> Path(avroFile.getAbsolutePath()), Avros.records(Person.class))); > >> File outputFile = tmpDir.getFile("output"); > >> Target parquetFileTarget = new > >> AvroParquetFileTarget(outputFile.getAbsolutePath()); > >> pipeline.write(genericCollection, parquetFileTarget); > >> pipeline.run(); > >> > >> Person person = genericCollection.materialize().iterator().next(); > >> > >> PCollection<Person> persistedCollection = pipeline.read( > >> new AvroParquetFileSource<Person>(new > >> Path(outputFile.getAbsolutePath()), Avros.records(Person.class))); > >> Person persistedPerson = > >> persistedCollection.materialize().iterator().next(); > >> > >> Path parquetFile = new Path(new File(outputFile, > >> "part-m-00000.parquet").getPath()); > >> > >> AvroParquetReader<Person> reader = new > >> AvroParquetReader<Person>(parquetFile); > >> > >> try { > >> Person readPerson = reader.read(); > >> assertThat(readPerson, is(person)); > >> assertThat(readPerson, is(persistedPerson)); > >> } finally { > >> reader.close(); > >> } > >> } > >> > >> The tests passes without any issues. There have been an number of fixes > >> since the 0.8.0-cdh4.3.0 version. You might try upgrading to the latest > >> version available (0.8.2+71-cdh4.6.0) and see if the problem still > exists. > >> If it does still exist a junit/integration test would be helpful to > debug > >> this issue. > >> > >> > >> [1] - > >> > https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120 > >> > >> > >> On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <[email protected]> > wrote: > >>> > >>> Hi > >>> > >>> Im trying to read and write data using the avro+parquet combo that > >>> ships with crunch 0.8.0-cdh4.3.0. > >>> > >>> - The writer job looks like this. > >>> > >>> PCollection<String> lines = ... > >>> PCollection<User> p = lines.parallelDo(new DoFn<String, User>() { > >>> @Override > >>> public void process(String input, Emitter<User> emitter) { > >>> User user = User.newBuilder().setName(input).build(); > >>> emitter.emit(user); > >>> } > >>> }, Avros.records(User.class)); > >>> > >>> AvroParquetFileSourceTarget fout = new > >>> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class)); > >>> pipeline.write(p, fout); > >>> > >>> - The reader job looks like this. > >>> > >>> AvroParquetFileSource<User> file = new > >>> AvroParquetFileSource<User>(out, Avros.records(User.class)); > >>> PCollection<User> users = pipeline.read(file); > >>> // this line fails with a ClassCastException > >>> PCollection<String> lines = users.parallelDo(new DoFn<User, String>() { > >>> @Override > >>> public void process(User user, Emitter<String> emitter) { > >>> emitter.emit(user.getName().toString()); > >>> } > >>> }, Writables.strings()); > >>> > >>> > >>> However, the reader fails with a java.lang.ClassCastException? Is this > >>> a know issue or am I doing something wrong? > >>> > >>> Cheers, > >>> -Kristoffer > >>> > >>> > >>> java.lang.ClassCastException: > >>> org.apache.avro.generic.GenericData$Record cannot be cast to > >>> mapred.jobs.User > >>> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22) > >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) > >>> at > >>> > org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) > >>> at org.apache.crunch.MapFn.process(MapFn.java:34) > >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) > >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110) > >>> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) > >>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140) > >>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672) > >>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330) > >>> at org.apache.hadoop.mapred.Child$4.run(Child.java:268) > >>> at java.security.AccessController.doPrivileged(Native Method) > >>> at javax.security.auth.Subject.doAs(Subject.java:396) > >> > >> >
