Hi all, We have an application where the operations on some keys depend on the results of related keys. Assume that there are two keys k1 and k2 that have some relationship between them. Our application won't send the value for key k1 to the data sink when the value for key k2 was sent to the data sink earlier. To do so, we hope that our Flink application can send some value information for key k2 to SideOutput and the SideOutput becomes the input of the original stream (see below).
dataSource1 .union(dataSource2) .iterate( inStream => { val outStream = inStream .keyBy(_.key) .connect(relationshipSource) .process(new CustomOperator()) (outStream.getSideOutput(CustomOperator.Result), outStream) } ) .disableChaining() .name(OperatorKey.Name).uid(OperatorKey.Name) However, although our Flink application can write value info to SideOutput successfully, the data in SideOutput won't be sent to the input stream. We wonder whether it's doable for our scenario with Flink? If so, how should we modify our code to achieve the goal? Many thanks for any comments. Best regards, Chen-Che Huang