Why don't you use a composite key for the Flink join
(first.join(second).where(0,1).equalTo(2,3).with(...)?
This would be more efficient and you can omit the check in the join
function.

Best, Fabian

2015-11-08 19:13 GMT+01:00 Philip Lee <[email protected]>:

> I want to join two tables with two columns like
>
> //    AND sr_customer_sk      = ws_bill_customer_sk
> //    AND sr_item_sk          = ws_item_sk
>
> val srJoinWs = 
> storeReturn.join(webSales).where(_._item_sk).equalTo(_._item_sk){
>     (storeReturn: StoreReturn, webSales: WebSales, out: 
> Collector[(Long,Long,Long)]) =>
>       if(storeReturn._customer_sk.equals(webSales._bill_customer_sk))
>         
> out.collect(storeReturn._item_sk,storeReturn._customer_sk,storeReturn._ticket_number)
>       else
>         None
> }
>
> According to the explaination from join phase, I should do like it if I want 
> to join like the way. Isn't it right?
>
> But the thing is it does not work in that Type dismatch; expected 
> TypeInformation[Long], actual(StoreReturn, WebSales, 
> Collector[(Long,Long,Long)]) => Any
>
> I tried many ways but it still does not work.
>
> Any suggestion?
>
> Best Regards,
>
> Phil
>
>
>

Reply via email to