Check this image for clarification, this is what I'm trying to do:
http://i.imgur.com/iZxPv04.png

[image: Inline image 1]

The rectangles are the two CoFlatMapFunction, sharing a state between
process and update (map1 and map2). It's clear from the image that I need
input1 and the green box to create the blue box, and input2 and the blue
box to create the green one.

---
*blue*  = *input1*.connect(*green*).keyBy(...).flatMap(...);
*green* = *input2*.connect(*blue*).keyBy(...).flatMap(...);
---

As you can see there's no cycle in the flow of data so I guess this
topology is valid. The problem is not having a way to define such flow.

For instance, with the appropriate setters we would be able to do this:

---
*blue*  = *input1*.connect();
*green* = *input2*.connect();

*blue.*setConnection(*green*);
*green*.setConnection(*blue*);

*blue*.keyBy(...).flatMap(...);
*green*.keyBy(...).flatMap(...);
---

Any idea is welcome.

Matt

On Sat, Jan 28, 2017 at 5:31 PM, Matt <dromitl...@gmail.com> wrote:

> I'm aware of IterativeStream but I don't think it's useful in this case.
>
> As shown in the example above, my use case is "cyclic" in that the same
> object goes from *Input* to *predictionStream* (flatMap1), then to
> *statsStream* (flatMap2, where it's updated with an object from *Input2*)
> and finally to *predictionStream* (flatMap2).
>
> The same operator is never applied twice to the object, thus I would say
> this dataflow is cyclic only in the dependencies of the stream
> (predictionStream depends on statsStream, but it depends on
> predictionStream in the first place).
>
> I hope it is clear now.
>
> Matt
>
> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote:
>
>> Hello,
>>
>> Cyclic dataflows can be built using iterations:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/datastream_api.html#iterations
>>
>> Best,
>> Gábor
>>
>>
>>
>>
>> 2017-01-28 18:39 GMT+01:00 Matt <dromitl...@gmail.com>:
>> > I have a ConnectedStream (A) that depends on another ConnectedStream
>> (B),
>> > which depends on the first one (A).
>> >
>> > Simplified code:
>> >
>> > predictionStream = input
>> >   .connect(statsStream)
>> >   .keyBy(...)
>> >   .flatMap(CoFlatMapFunction {
>> >      flatMap1(obj, output) {
>> >          p = prediction(obj)
>> >          output.collect(p)
>> >      }
>> >      flatMap2(stat, output) {
>> >          updateModel(stat)
>> >      }
>> >   })
>> >
>> > statsStream = input2
>> >   .connect(predictionStream)
>> >   .keyBy(...)
>> >   .flatMap(CoFlatMapFunction {
>> >      flatMap1(obj2, output) {
>> >         s = getStats(obj2, p)
>> >         output.collect(s)
>> >      }
>> >      flatMap2(prediction, output) {
>> >         p = prediction
>> >      }
>> >   })
>> >
>> > I'm guessing this should be possible to achieve, one way would be to
>> add a
>> > sink on statsStream to save the elements into Kafka and read from that
>> topic
>> > on predictionStream instead of initializing it with a reference of
>> > statsStream. But I would rather avoid writing unnecessarily into kafka.
>> >
>> > Is there any other way to achieve this?
>> >
>> > Thanks,
>> > Matt
>>
>
>

Reply via email to