Hi. I suppose that it will be "Avros.tableOf(Avros.strings(), Avros.strings())" instead of "Avros.tableOf(String, String)" Right?
Thanks. Miguel. On Mon, Aug 1, 2016 at 9:13 PM, Micah Whitacre <[email protected]> wrote: > Try changing this from: > *final* PTable<String, String> bhSide = bh.by(*new* BHExtractor(), > Writables.*strings*()); > > to > > final PTable<String, String> bhSide = bh.parallelDo(new KeyExtractor(), > Avros.tableOf(String, String)); > > public class KeyExtractor extends MapFn<String, Pair<String, String>>{ > public Pair<String, String> map(String input){ > String key = ...; > return Pair.of(key, input); > } > } > > This will let you avoid mixing PTypeFamilies. I'm guessing you already > have most of the code but instead of just emitting the Key you emit the > pair. > > On Mon, Aug 1, 2016 at 2:06 PM, Masf <[email protected]> wrote: > >> Hi. >> Thanks for the reply. >> As you said, it works when I execute "by" method with "Avros.strings()". >> However, it fails when I try to build the join >> >> *final* JoinStrategy<String, myAvro2, String> strategy = *new* >> DefaultJoinStrategy<>(); >> *final* PTable<String, Pair<myAvro2, String>> joined = strategy.join( >> positionSide, bhSide, JoinType.*LEFT_OUTER_JOIN*); <-- It fails >> >> The exception is produced when the job executes the las statement: >> >> Caused by: java.lang.ClassCastException: >> org.apache.crunch.types.writable.WritableType cannot be cast to >> org.apache.crunch.types.avro.AvroType >> >> at >> org.apache.crunch.types.avro.Avros.createTupleSchema(Avros.java:831) >> >> at >> org.apache.crunch.types.avro.Avros.createTupleSchema(Avros.java:818) >> >> at org.apache.crunch.types.avro.Avros.pairs(Avros.java:622) >> >> at >> org.apache.crunch.types.avro.AvroTypeFamily.pairs(AvroTypeFamily.java:116) >> >> at >> org.apache.crunch.lib.join.DefaultJoinStrategy.preJoin(DefaultJoinStrategy.java:84) >> >> at >> org.apache.crunch.lib.join.DefaultJoinStrategy.join(DefaultJoinStrategy.java:73) >> >> at >> org.apache.crunch.lib.join.DefaultJoinStrategy.join(DefaultJoinStrategy.java:52) >> >> at com.db.myapp.driver.myapp.run(myapp.java:66) >> >> >> >> On Mon, Aug 1, 2016 at 7:31 PM, Micah Whitacre <[email protected]> >> wrote: >> >>> You cannot mix PTypeFamilies in a single PType. In this case change: >>> >>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* >>> BHExtractorAvro(), >>> Writables.*strings*()); >>> >>> to >>> >>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* >>> BHExtractorAvro(), >>> Avros.*strings*()); >>> >>> I'm not sure if you'll hit the same problem when you join but in the >>> code you provided that's the reason for the exception. >>> >>> On Mon, Aug 1, 2016 at 12:21 PM, Masf <[email protected]> wrote: >>> >>>> Hi. >>>> >>>> I'm trying to build a join between a csv and avro. First I get csv to >>>> pcollection: >>>> >>>> *final* PCollection<String> bh = pipeline.readTextFile("/pathcsv/"); >>>> >>>> >>>> Second, I read the avro file and later I do a transformation >>>> >>>> *final* PCollection<MyAvro> gp = pipeline.read(From.*avroFile*( >>>> inputPath,Avros.*specifics*(myAvro.*class*))); >>>> >>>> *final* PCollection<myAvro2> vp = gp.parallelDo("trans", *new* >>>> MapTrasnf(), Avros.*records*(myAvro2.*class*)); >>>> >>>> *Before to make the join, I extract keys:* >>>> >>>> *final* PTable<String, String> bhSide = bh.by(*new* BHExtractor(), >>>> Writables.*strings*()); >>>> >>>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* >>>> BHExtractorAvro(), Writables.*strings*()); >>>> >>>> Applying "by" method to Avro PCollection returns an exception and I >>>> don't know why: >>>> >>>> Caused by: java.lang.ClassCastException: >>>> org.apache.crunch.types.writable.WritableType cannot be cast to >>>> org.apache.crunch.types.avro.AvroType >>>> >>>> at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:895) >>>> >>>> at >>>> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:136) >>>> >>>> at org.apache.crunch.impl.dist.collect.PCollectionImpl.by >>>> (PCollectionImpl.java:270) >>>> >>>> at com.db.myapp.driver.myapp.run(myapp.java:62) >>>> >>>> >>>> >>>> -- >>>> >>>> >>>> Regards. >>>> Miguel Ángel >>>> >>> >>> >> >> >> -- >> >> >> Saludos. >> Miguel Ángel >> > > -- Saludos. Miguel Ángel
