Hi Jaswin, Currently, DataStream API doesn't support outer joins. As a workaround, you can use coGroup function [1].
Hive is also not supported by DataStream API though it's supported by Table API [2]. [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html Regards, Roman On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <jaswin.s...@outlook.com> wrote: > Hi, > I want to implement the below use case in my application: > I am doing an interval join between two data streams and then, in process > function catching up the discrepant results on joining. Joining is done on > key orderId. Now, I want to identify all the messages in both datastreams > which are not joined. Means, for a message in left stream if I do not > find any message in right stream over the interval defined, then, that > message should be caught and same for right stream if there are messages > which do not have corresponding messages in left streams then, catch > them.Need an help how can I achieve the use case. I know this can be done > with outer join but interval join or tumbling event time window joins only > support inner join as per my knowledge. I do not want to use table/sql api > here but want to work on this datastream apis only. > > Currently I am using this which is working for 90 % of the cases but 10 % > of the cases where large large delay can happen and messages in left or > right streams are missing are not getting supported with my this > implementaions: > > /** > * Join cart and pg streams on mid and orderId, and the interval specified. > * > * @param leftStream > * @param rightStream > * @return > */ > public SingleOutputStreamOperator<ResultMessage> > intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, > DataStream<PGMessage> rightStream, ParameterTool parameter) { > //Descripant results are sent to kafka from CartPGProcessFunction. > return leftStream > .keyBy(new CartJoinColumnsSelector()) > .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector())) > > .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), > > Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND)))) > .process(new CartPGProcessFunction()); > > } > > > > Secondly, I am unable to find the streaming support to stream out the > datastreams I am reading from kafka to hive which I want to batch process > with Flink > > Please help me on resolving this use cases. > > Thanks, > Jaswin > > > Get Outlook for Android <https://aka.ms/ghei36> >