Hi.
I'm trying to build a join between a csv and avro. First I get csv to
pcollection:
*final* PCollection<String> bh = pipeline.readTextFile("/pathcsv/");
Second, I read the avro file and later I do a transformation
*final* PCollection<MyAvro> gp = pipeline.read(From.*avroFile*(inputPath
,Avros.*specifics*(myAvro.*class*)));
*final* PCollection<myAvro2> vp = gp.parallelDo("trans", *new* MapTrasnf(),
Avros.*records*(myAvro2.*class*));
*Before to make the join, I extract keys:*
*final* PTable<String, String> bhSide = bh.by(*new* BHExtractor(),
Writables.*strings*());
*final* PTable<String, myAvro2> positionSide = vp.by(*new*
BHExtractorAvro(), Writables.*strings*());
Applying "by" method to Avro PCollection returns an exception and I don't
know why:
Caused by: java.lang.ClassCastException:
org.apache.crunch.types.writable.WritableType cannot be cast to
org.apache.crunch.types.avro.AvroType
at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:895)
at
org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:136)
at org.apache.crunch.impl.dist.collect.PCollectionImpl.by
(PCollectionImpl.java:270)
at com.db.myapp.driver.myapp.run(myapp.java:62)
--
Regards.
Miguel Ángel