I really don't know what you mean, I've been reading the documentation and examples showing iterations. but it just won't work for me I believe. Maybe you can write a quick example? It doesn't matter the details, only the topology.
If anyone else has an idea it's very welcome! Matt On Tue, Jan 31, 2017 at 3:07 PM, Gábor Gévay <gga...@gmail.com> wrote: > I somehow still suspect that iterations might work for your use case. > Note, that in the streaming API, iterations are currently nothing more than > a back-edge in the topology, i.e. a low-level tool to create a cyclic > topology, like as you say with your hypothetical setter syntax. (It's quite > different from the iterations of the batch API.) > > The tricky part for your use-case is that you would want a ConnectedStream > as your iteration head, which should get the elements from the back-edge in > a separated way from the normal input. You could simulate this by using not > ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input > element type is an Either type, whose two components would be the normal > input and the back-edge input. (And you add maps before the closeWith and > to your input1, which would appropriately wrap into the two alternatives of > the Either type.) > > Best, > Gábor > > > > 2017-01-29 15:39 GMT+01:00 Matt <dromitl...@gmail.com>: > >> Check this image for clarification, this is what I'm trying to do: >> http://i.imgur.com/iZxPv04.png >> >> [image: Inline image 1] >> >> The rectangles are the two CoFlatMapFunction, sharing a state between >> process and update (map1 and map2). It's clear from the image that I need >> input1 and the green box to create the blue box, and input2 and the blue >> box to create the green one. >> >> --- >> *blue* = *input1*.connect(*green*).keyBy(...).flatMap(...); >> *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...); >> --- >> >> As you can see there's no cycle in the flow of data so I guess this >> topology is valid. The problem is not having a way to define such flow. >> >> For instance, with the appropriate setters we would be able to do this: >> >> --- >> *blue* = *input1*.connect(); >> *green* = *input2*.connect(); >> >> *blue.*setConnection(*green*); >> *green*.setConnection(*blue*); >> >> *blue*.keyBy(...).flatMap(...); >> *green*.keyBy(...).flatMap(...); >> --- >> >> Any idea is welcome. >> >> Matt >> >> On Sat, Jan 28, 2017 at 5:31 PM, Matt <dromitl...@gmail.com> wrote: >> >>> I'm aware of IterativeStream but I don't think it's useful in this case. >>> >>> As shown in the example above, my use case is "cyclic" in that the same >>> object goes from *Input* to *predictionStream* (flatMap1), then to >>> *statsStream* (flatMap2, where it's updated with an object from *Input2*) >>> and finally to *predictionStream* (flatMap2). >>> >>> The same operator is never applied twice to the object, thus I would say >>> this dataflow is cyclic only in the dependencies of the stream >>> (predictionStream depends on statsStream, but it depends on >>> predictionStream in the first place). >>> >>> I hope it is clear now. >>> >>> Matt >>> >>> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> Cyclic dataflows can be built using iterations: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >>>> dev/datastream_api.html#iterations >>>> >>>> Best, >>>> Gábor >>>> >>>> >>>> >>>> >>>> 2017-01-28 18:39 GMT+01:00 Matt <dromitl...@gmail.com>: >>>> > I have a ConnectedStream (A) that depends on another ConnectedStream >>>> (B), >>>> > which depends on the first one (A). >>>> > >>>> > Simplified code: >>>> > >>>> > predictionStream = input >>>> > .connect(statsStream) >>>> > .keyBy(...) >>>> > .flatMap(CoFlatMapFunction { >>>> > flatMap1(obj, output) { >>>> > p = prediction(obj) >>>> > output.collect(p) >>>> > } >>>> > flatMap2(stat, output) { >>>> > updateModel(stat) >>>> > } >>>> > }) >>>> > >>>> > statsStream = input2 >>>> > .connect(predictionStream) >>>> > .keyBy(...) >>>> > .flatMap(CoFlatMapFunction { >>>> > flatMap1(obj2, output) { >>>> > s = getStats(obj2, p) >>>> > output.collect(s) >>>> > } >>>> > flatMap2(prediction, output) { >>>> > p = prediction >>>> > } >>>> > }) >>>> > >>>> > I'm guessing this should be possible to achieve, one way would be to >>>> add a >>>> > sink on statsStream to save the elements into Kafka and read from >>>> that topic >>>> > on predictionStream instead of initializing it with a reference of >>>> > statsStream. But I would rather avoid writing unnecessarily into >>>> kafka. >>>> > >>>> > Is there any other way to achieve this? >>>> > >>>> > Thanks, >>>> > Matt >>>> >>> >>> >> >