Try changing this from:
*final* PTable<String, String> bhSide = bh.by(*new* BHExtractor(),
Writables.*strings*());

to

final PTable<String, String> bhSide = bh.parallelDo(new KeyExtractor(),
Avros.tableOf(String, String));

public class KeyExtractor extends MapFn<String, Pair<String, String>>{
public Pair<String, String> map(String input){
    String key = ...;
    return Pair.of(key, input);
}
}

This will let you avoid mixing PTypeFamilies.  I'm guessing you already
have most of the code but instead of just emitting the Key you emit the
pair.

On Mon, Aug 1, 2016 at 2:06 PM, Masf <[email protected]> wrote:

> Hi.
> Thanks for the reply.
> As you said, it works when I execute "by" method with "Avros.strings()".
> However, it fails when I try to build the join
>
> *final* JoinStrategy<String, myAvro2, String> strategy = *new*
> DefaultJoinStrategy<>();
> *final* PTable<String, Pair<myAvro2, String>> joined = strategy.join(
> positionSide, bhSide, JoinType.*LEFT_OUTER_JOIN*); <-- It fails
>
> The exception is produced when the job executes the las statement:
>
> Caused by: java.lang.ClassCastException:
> org.apache.crunch.types.writable.WritableType cannot be cast to
> org.apache.crunch.types.avro.AvroType
>
>         at
> org.apache.crunch.types.avro.Avros.createTupleSchema(Avros.java:831)
>
>         at
> org.apache.crunch.types.avro.Avros.createTupleSchema(Avros.java:818)
>
>         at org.apache.crunch.types.avro.Avros.pairs(Avros.java:622)
>
>         at
> org.apache.crunch.types.avro.AvroTypeFamily.pairs(AvroTypeFamily.java:116)
>
>         at
> org.apache.crunch.lib.join.DefaultJoinStrategy.preJoin(DefaultJoinStrategy.java:84)
>
>         at
> org.apache.crunch.lib.join.DefaultJoinStrategy.join(DefaultJoinStrategy.java:73)
>
>         at
> org.apache.crunch.lib.join.DefaultJoinStrategy.join(DefaultJoinStrategy.java:52)
>
>         at com.db.myapp.driver.myapp.run(myapp.java:66)
>
>
>
> On Mon, Aug 1, 2016 at 7:31 PM, Micah Whitacre <[email protected]>
> wrote:
>
>> You cannot mix PTypeFamilies in a single PType.  In this case change:
>>
>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* BHExtractorAvro(),
>> Writables.*strings*());
>>
>> to
>>
>> *final* PTable<String, myAvro2> positionSide = vp.by(*new* BHExtractorAvro(),
>> Avros.*strings*());
>>
>> I'm not sure if you'll hit the same problem when you join but in the code
>> you provided that's the reason for the exception.
>>
>> On Mon, Aug 1, 2016 at 12:21 PM, Masf <[email protected]> wrote:
>>
>>> Hi.
>>>
>>> I'm trying to build a join between a csv and avro. First I get csv to
>>> pcollection:
>>>
>>> *final* PCollection<String> bh = pipeline.readTextFile("/pathcsv/");
>>>
>>>
>>> Second, I read the avro file and later I do a transformation
>>>
>>> *final* PCollection<MyAvro> gp = pipeline.read(From.*avroFile*(inputPath
>>> ,Avros.*specifics*(myAvro.*class*)));
>>>
>>> *final* PCollection<myAvro2> vp = gp.parallelDo("trans", *new*
>>> MapTrasnf(), Avros.*records*(myAvro2.*class*));
>>>
>>> *Before to make the join, I extract keys:*
>>>
>>> *final* PTable<String, String> bhSide = bh.by(*new* BHExtractor(),
>>> Writables.*strings*());
>>>
>>> *final* PTable<String, myAvro2> positionSide = vp.by(*new*
>>> BHExtractorAvro(), Writables.*strings*());
>>>
>>> Applying "by" method to Avro PCollection returns an exception and I
>>> don't know why:
>>>
>>> Caused by: java.lang.ClassCastException:
>>> org.apache.crunch.types.writable.WritableType cannot be cast to
>>> org.apache.crunch.types.avro.AvroType
>>>
>>>         at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:895)
>>>
>>>         at
>>> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:136)
>>>
>>>         at org.apache.crunch.impl.dist.collect.PCollectionImpl.by
>>> (PCollectionImpl.java:270)
>>>
>>>         at com.db.myapp.driver.myapp.run(myapp.java:62)
>>>
>>>
>>>
>>> --
>>>
>>>
>>> Regards.
>>> Miguel Ángel
>>>
>>
>>
>
>
> --
>
>
> Saludos.
> Miguel Ángel
>

Reply via email to