Hi Abhinav,

sounds like you want to implement a join [1]. You usually want to use a
window and then correlate the data between them only within the timeframe.
You can use global windows if you cannot add a time window, but note that
the state will grow indefinitely.

If one of the sources is small, also consider the broadcast state pattern.
[2]

Note that if you are application is only doing standard relational algebra,
I'd recommend Table API/SQL which will produce faster applications [3].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html#the-broadcast-state-pattern
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html

On Wed, Feb 24, 2021 at 11:14 AM Abhinav Sharma <abhinavandfrie...@gmail.com>
wrote:

> Hi,
>
> How can I co-relate two streams of different types in Flink?
> Scenario: In stream1, I have data in pojo with a field user. In stream2, I
> have data in a different pojo which also contains the field user. (However,
> other than the user field, they have no common field).
>
> Now what I want to do is relate the two streams such that for every event
> in stream1, I want to collect events from stream2 where the user is the
> same. Both stream1 and stream2 are unbounded.
>
> I tried using
> stream1.connect(stream2).process(new CoProcessFunction<Type1, Type2,
> Type2>) {
> private String user;
>
> public void processElement1(Type1 inp, CoProcessFunction<Type1, Type2,
> Type2>.Context ctx, Collector<Type2> out)  {
> user = inp.getUser();
> }
>
> public void processElement2(Type2 inp, CoProcessFunction<Type1, Type2,
> Type2>.Context ctx, Collector<Type2> out)  {
> if (user.equals(inp.getUser())) {
> out.collect(inp);
> }
> }
> });
>
> But this works only and only if both elements occur simultaneously.
>
> How can I collect the cases with history? Is using ListState required?
> Is there some better way to this in Flink?
>
>
> Requesting help,
> Abhinav
>

Reply via email to