Hi, Huang >From the document[1] it seems that you need to close the iterate stream. such as `iteration.closeWith(feedback);` BTW You also could get a detailed iteration example from here [2].
[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/#iterate [2] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java Best, Guowei On Mon, Mar 21, 2022 at 2:27 PM Chen-Che Huang <acmic...@gmail.com> wrote: > Hi all, > > We have an application where the operations on some keys depend on the > results of related keys. Assume that there are > two keys k1 and k2 that have some relationship between them. Our > application won't send the value for key k1 to the data sink > when the value for key k2 was sent to the data sink earlier. To do so, we > hope that our Flink application can send some value > information for key k2 to SideOutput and the SideOutput becomes the input > of the original stream (see below). > > dataSource1 > .union(dataSource2) > .iterate( > inStream => { > val outStream = inStream > .keyBy(_.key) > .connect(relationshipSource) > .process(new CustomOperator()) > > (outStream.getSideOutput(CustomOperator.Result), outStream) > } > ) > .disableChaining() > .name(OperatorKey.Name).uid(OperatorKey.Name) > > However, although our Flink application can write value info to SideOutput > successfully, the data in SideOutput won't be > sent to the input stream. We wonder whether it's doable for our scenario > with Flink? If so, how should we modify our code to > achieve the goal? Many thanks for any comments. > > Best regards, > Chen-Che Huang >