Hi
Im trying to read and write data using the avro+parquet combo that
ships with crunch 0.8.0-cdh4.3.0.
- The writer job looks like this.
PCollection<String> lines = ...
PCollection<User> p = lines.parallelDo(new DoFn<String, User>() {
@Override
public void process(String input, Emitter<User> emitter) {
User user = User.newBuilder().setName(input).build();
emitter.emit(user);
}
}, Avros.records(User.class));
AvroParquetFileSourceTarget fout = new
AvroParquetFileSourceTarget<User>(out, Avros.records(User.class));
pipeline.write(p, fout);
- The reader job looks like this.
AvroParquetFileSource<User> file = new
AvroParquetFileSource<User>(out, Avros.records(User.class));
PCollection<User> users = pipeline.read(file);
// this line fails with a ClassCastException
PCollection<String> lines = users.parallelDo(new DoFn<User, String>() {
@Override
public void process(User user, Emitter<String> emitter) {
emitter.emit(user.getName().toString());
}
}, Writables.strings());
However, the reader fails with a java.lang.ClassCastException? Is this
a know issue or am I doing something wrong?
Cheers,
-Kristoffer
java.lang.ClassCastException:
org.apache.avro.generic.GenericData$Record cannot be cast to
mapred.jobs.User
at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22)
at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
at
org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
at org.apache.crunch.MapFn.process(MapFn.java:34)
at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)