Why don't you use a composite key for the Flink join (first.join(second).where(0,1).equalTo(2,3).with(...)? This would be more efficient and you can omit the check in the join function.
Best, Fabian 2015-11-08 19:13 GMT+01:00 Philip Lee <[email protected]>: > I want to join two tables with two columns like > > // AND sr_customer_sk = ws_bill_customer_sk > // AND sr_item_sk = ws_item_sk > > val srJoinWs = > storeReturn.join(webSales).where(_._item_sk).equalTo(_._item_sk){ > (storeReturn: StoreReturn, webSales: WebSales, out: > Collector[(Long,Long,Long)]) => > if(storeReturn._customer_sk.equals(webSales._bill_customer_sk)) > > out.collect(storeReturn._item_sk,storeReturn._customer_sk,storeReturn._ticket_number) > else > None > } > > According to the explaination from join phase, I should do like it if I want > to join like the way. Isn't it right? > > But the thing is it does not work in that Type dismatch; expected > TypeInformation[Long], actual(StoreReturn, WebSales, > Collector[(Long,Long,Long)]) => Any > > I tried many ways but it still does not work. > > Any suggestion? > > Best Regards, > > Phil > > >
