Thanks for the quick answer! My initial test still fail, but I may have done something wrong here. I will do a more thorough test asap.
On Mon, Jun 2, 2014 at 2:53 PM, Micah Whitacre <[email protected]> wrote: > I don't believe it is a known issue. I modified an AvroParquetPipelineIT[1] > to verify the output to a target using a source.. > > @Test > public void toAvroParquetFileTargetFromParquet() throws Exception { > GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); > savedRecord.put("name", "John Doe"); > savedRecord.put("age", 42); > savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); > populateGenericParquetFile(Lists.newArrayList(savedRecord), > Person.SCHEMA$); > > Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, > tmpDir.getDefaultConfiguration()); > PCollection<Person> genericCollection = pipeline.read( > new AvroParquetFileSource<Person>(new > Path(avroFile.getAbsolutePath()), Avros.records(Person.class))); > File outputFile = tmpDir.getFile("output"); > Target parquetFileTarget = new > AvroParquetFileTarget(outputFile.getAbsolutePath()); > pipeline.write(genericCollection, parquetFileTarget); > pipeline.run(); > > Person person = genericCollection.materialize().iterator().next(); > > PCollection<Person> persistedCollection = pipeline.read( > new AvroParquetFileSource<Person>(new > Path(outputFile.getAbsolutePath()), Avros.records(Person.class))); > Person persistedPerson = > persistedCollection.materialize().iterator().next(); > > Path parquetFile = new Path(new File(outputFile, > "part-m-00000.parquet").getPath()); > > AvroParquetReader<Person> reader = new > AvroParquetReader<Person>(parquetFile); > > try { > Person readPerson = reader.read(); > assertThat(readPerson, is(person)); > assertThat(readPerson, is(persistedPerson)); > } finally { > reader.close(); > } > } > > The tests passes without any issues. There have been an number of fixes > since the 0.8.0-cdh4.3.0 version. You might try upgrading to the latest > version available (0.8.2+71-cdh4.6.0) and see if the problem still exists. > If it does still exist a junit/integration test would be helpful to debug > this issue. > > > [1] - > https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120 > > > On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <[email protected]> wrote: >> >> Hi >> >> Im trying to read and write data using the avro+parquet combo that >> ships with crunch 0.8.0-cdh4.3.0. >> >> - The writer job looks like this. >> >> PCollection<String> lines = ... >> PCollection<User> p = lines.parallelDo(new DoFn<String, User>() { >> @Override >> public void process(String input, Emitter<User> emitter) { >> User user = User.newBuilder().setName(input).build(); >> emitter.emit(user); >> } >> }, Avros.records(User.class)); >> >> AvroParquetFileSourceTarget fout = new >> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class)); >> pipeline.write(p, fout); >> >> - The reader job looks like this. >> >> AvroParquetFileSource<User> file = new >> AvroParquetFileSource<User>(out, Avros.records(User.class)); >> PCollection<User> users = pipeline.read(file); >> // this line fails with a ClassCastException >> PCollection<String> lines = users.parallelDo(new DoFn<User, String>() { >> @Override >> public void process(User user, Emitter<String> emitter) { >> emitter.emit(user.getName().toString()); >> } >> }, Writables.strings()); >> >> >> However, the reader fails with a java.lang.ClassCastException? Is this >> a know issue or am I doing something wrong? >> >> Cheers, >> -Kristoffer >> >> >> java.lang.ClassCastException: >> org.apache.avro.generic.GenericData$Record cannot be cast to >> mapred.jobs.User >> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22) >> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) >> at >> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >> at org.apache.crunch.MapFn.process(MapFn.java:34) >> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) >> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110) >> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) >> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140) >> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672) >> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330) >> at org.apache.hadoop.mapred.Child$4.run(Child.java:268) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:396) > >
