I'm trying to do something that seems like it should be possible, but my implementation doesn't behave as expected, and I'm not sure how else to express it.
Let's say the stream is composed of tuples like this: (Alice, Bob, 1) and I want to keyBy(1), flatMap with state associated with Alice, then keyBy(2) with state associated with Bob. The trick is, when I later get a tuple like (Bob, Alice, 1), I want the first operator to see the state that was updated in the second op previously. Is this possible? I tried implementing both operators as one, getting the state by descriptor in the flatMap body, and even instantiating the operator only once; the behavior is, as you might guess, that the state in stage 1 doesn't include changes made previously in stage 2. Is there any way to do this without throwing away the parallelism? Thanks in advance! ~Michael