There is no guarantee about the order in which each stream elements arrive
in a connected streams. You have to check if the elements have arrived from
Stream A before using the information to process elements from Stream B.
Otherwise you have to buffer elements from stream B and check if there are
unprocessed elements from stream B when elements arrive from stream A. You
might need to do that for elements from both streams depending on how you
use them.

You will get  NPE if you assume events have arrived from A and but they
might be lagging behind.

On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gizera...@hotmail.com> wrote:

> Let's say I have two types sharing the same trait
>
> trait Event {
> def id: Id
> }
>
> case class EventA(id: Id, info: InfoA) extends Event
> case class EventB(id: Id, info: InfoB) extends Event
>
> Each of these events gets pushed to a Kafka topic and gets consumed by a
> stream in Flink.
>
> Let's say I have two streams
>
> Events of type A create state:
>
> val typeAStream = env.addSource(...)
> .flatMap(someUnmarshallerForA)
> .keyBy(_.id)
> .mapWithState(...)
>
> val typeBStream = env.addSource(...)
> .flatMap(someUnmarshallerForB)
> .keyBy(_.id)
>
> I want now to process the events in typeBStream using the information
> stored in the State of typeAStream.
>
> One approach would be to use the same stream for the two topics and then
> pattern match, but Event subclasses may grow in numbers and
> may have different loads, thus I may want to keep things separate.
>
> Would something along the lines of:
>
> typeAStream.connect(typeBStream).
> flatMap(
> new IdentityFlatMapFunction(),
> new SomeRichFlatMapFunctionForEventB[EventB, O] with
> StateFulFuntion[EventB, O, G[EventA]] { ... }
> )
>
> work?
>
> I tried this approach and I ended up in a NPE because the state object was
> not initialized (meaning it was not there).
>
>
> Thanks,
> Aris
>
>

Reply via email to