Correct. That's what I get for writing code in an email. :) On Mon, Aug 1, 2016 at 2:45 PM, Masf <[email protected]> wrote:
> Hi. > I suppose that it will be "Avros.tableOf(Avros.strings(), Avros.strings())" > instead of "Avros.tableOf(String, String)" > Right? > > Thanks. > Miguel. > > On Mon, Aug 1, 2016 at 9:13 PM, Micah Whitacre <[email protected]> > wrote: > >> Try changing this from: >> *final* PTable<String, String> bhSide = bh.by(*new* BHExtractor(), >> Writables.*strings*()); >> >> to >> >> final PTable<String, String> bhSide = bh.parallelDo(new KeyExtractor(), >> Avros.tableOf(String, String)); >> >> public class KeyExtractor extends MapFn<String, Pair<String, String>>{ >> public Pair<String, String> map(String input){ >> String key = ...; >> return Pair.of(key, input); >> } >> } >> >> This will let you avoid mixing PTypeFamilies. I'm guessing you already >> have most of the code but instead of just emitting the Key you emit the >> pair. >> >> On Mon, Aug 1, 2016 at 2:06 PM, Masf <[email protected]> wrote: >> >>> Hi. >>> Thanks for the reply. >>> As you said, it works when I execute "by" method with "Avros.strings()". >>> However, it fails when I try to build the join >>> >>> *final* JoinStrategy<String, myAvro2, String> strategy = *new* >>> DefaultJoinStrategy<>(); >>> *final* PTable<String, Pair<myAvro2, String>> joined = strategy.join( >>> positionSide, bhSide, JoinType.*LEFT_OUTER_JOIN*); <-- It fails >>> >>> The exception is produced when the job executes the las statement: >>> >>> Caused by: java.lang.ClassCastException: >>> org.apache.crunch.types.writable.WritableType cannot be cast to >>> org.apache.crunch.types.avro.AvroType >>> >>> at >>> org.apache.crunch.types.avro.Avros.createTupleSchema(Avros.java:831) >>> >>> at >>> org.apache.crunch.types.avro.Avros.createTupleSchema(Avros.java:818) >>> >>> at org.apache.crunch.types.avro.Avros.pairs(Avros.java:622) >>> >>> at >>> org.apache.crunch.types.avro.AvroTypeFamily.pairs(AvroTypeFamily.java:116) >>> >>> at >>> org.apache.crunch.lib.join.DefaultJoinStrategy.preJoin(DefaultJoinStrategy.java:84) >>> >>> at >>> org.apache.crunch.lib.join.DefaultJoinStrategy.join(DefaultJoinStrategy.java:73) >>> >>> at >>> org.apache.crunch.lib.join.DefaultJoinStrategy.join(DefaultJoinStrategy.java:52) >>> >>> at com.db.myapp.driver.myapp.run(myapp.java:66) >>> >>> >>> >>> On Mon, Aug 1, 2016 at 7:31 PM, Micah Whitacre <[email protected]> >>> wrote: >>> >>>> You cannot mix PTypeFamilies in a single PType. In this case change: >>>> >>>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* >>>> BHExtractorAvro(), >>>> Writables.*strings*()); >>>> >>>> to >>>> >>>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* >>>> BHExtractorAvro(), >>>> Avros.*strings*()); >>>> >>>> I'm not sure if you'll hit the same problem when you join but in the >>>> code you provided that's the reason for the exception. >>>> >>>> On Mon, Aug 1, 2016 at 12:21 PM, Masf <[email protected]> wrote: >>>> >>>>> Hi. >>>>> >>>>> I'm trying to build a join between a csv and avro. First I get csv to >>>>> pcollection: >>>>> >>>>> *final* PCollection<String> bh = pipeline.readTextFile("/pathcsv/"); >>>>> >>>>> >>>>> Second, I read the avro file and later I do a transformation >>>>> >>>>> *final* PCollection<MyAvro> gp = pipeline.read(From.*avroFile*( >>>>> inputPath,Avros.*specifics*(myAvro.*class*))); >>>>> >>>>> *final* PCollection<myAvro2> vp = gp.parallelDo("trans", *new* >>>>> MapTrasnf(), Avros.*records*(myAvro2.*class*)); >>>>> >>>>> *Before to make the join, I extract keys:* >>>>> >>>>> *final* PTable<String, String> bhSide = bh.by(*new* BHExtractor(), >>>>> Writables.*strings*()); >>>>> >>>>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* >>>>> BHExtractorAvro(), Writables.*strings*()); >>>>> >>>>> Applying "by" method to Avro PCollection returns an exception and I >>>>> don't know why: >>>>> >>>>> Caused by: java.lang.ClassCastException: >>>>> org.apache.crunch.types.writable.WritableType cannot be cast to >>>>> org.apache.crunch.types.avro.AvroType >>>>> >>>>> at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:895) >>>>> >>>>> at >>>>> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:136) >>>>> >>>>> at org.apache.crunch.impl.dist.collect.PCollectionImpl.by >>>>> (PCollectionImpl.java:270) >>>>> >>>>> at com.db.myapp.driver.myapp.run(myapp.java:62) >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> >>>>> Regards. >>>>> Miguel Ángel >>>>> >>>> >>>> >>> >>> >>> -- >>> >>> >>> Saludos. >>> Miguel Ángel >>> >> >> > > > -- > > > Saludos. > Miguel Ángel >
