Hi, Huang
>From the document[1] it seems that you need to close the iterate stream.
such as `iteration.closeWith(feedback);`
BTW You also could get a detailed iteration example from here [2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/#iterate
[2]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java

Best,
Guowei


On Mon, Mar 21, 2022 at 2:27 PM Chen-Che Huang <acmic...@gmail.com> wrote:

> Hi all,
>
> We have an application where the operations on some keys depend on the
> results of related keys. Assume that there are
> two keys k1 and k2 that have some relationship between them. Our
> application won't send the value for key k1 to the data sink
> when the value for key k2 was sent to the data sink earlier. To do so, we
> hope that our Flink application can send some value
> information for key k2 to SideOutput and the SideOutput becomes the input
> of the original stream (see below).
>
> dataSource1
> .union(dataSource2)
> .iterate(
> inStream => {
> val outStream = inStream
> .keyBy(_.key)
> .connect(relationshipSource)
> .process(new CustomOperator())
>
> (outStream.getSideOutput(CustomOperator.Result), outStream)
> }
> )
> .disableChaining()
> .name(OperatorKey.Name).uid(OperatorKey.Name)
>
> However, although our Flink application can write value info to SideOutput
> successfully, the data in SideOutput won't be
> sent to the input stream. We wonder whether it's doable for our scenario
> with Flink? If so, how should we modify our code to
> achieve the goal? Many thanks for any comments.
>
> Best regards,
> Chen-Che Huang
>

Reply via email to