Hey Adrienne!

On Wed, Oct 12, 2016 at 4:10 PM, Adrienne Kole <adrienneko...@gmail.com> wrote:
> Hi,
>
> I have 2 streams which are partitioned based on key field. I want to join
> those streams based on  key fields on windows. This is an example I saw in
> the flink website:
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
>
> val firstKeyed = firstInput.keyBy("userId")
> val secondKeyed = secondInput.keyBy("id")
>
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed)
>    onWindow(Time.of(5, SECONDS))

This does not work. I could not find this example in the Flink docs.
Do you remember where you found this? Would make sense to remove it.
:-)

You have to go with the other approach you described
(keyBy-join-where-equalTo-etc.). It would make sense to provide the
keyed stream join API though. If you like, you can open a JIRA issue
for it (you would need to tell me your JIRA ID so I can add you as a
contributor).

> val firstInput: KeyedStream[MyType] = ...
> val secondInput: KeyedStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
>
> and
>
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)

Only if you need a specific KeyedDataStream operation, you would need
to go with the KeyedStream type. There is no difference execution wise
between the two examples.

– Ufuk

Reply via email to