Hi all,

We have an application where the operations on some keys depend on the
results of related keys. Assume that there are
two keys k1 and k2 that have some relationship between them. Our
application won't send the value for key k1 to the data sink
when the value for key k2 was sent to the data sink earlier. To do so, we
hope that our Flink application can send some value
information for key k2 to SideOutput and the SideOutput becomes the input
of the original stream (see below).

dataSource1
.union(dataSource2)
.iterate(
inStream => {
val outStream = inStream
.keyBy(_.key)
.connect(relationshipSource)
.process(new CustomOperator())

(outStream.getSideOutput(CustomOperator.Result), outStream)
}
)
.disableChaining()
.name(OperatorKey.Name).uid(OperatorKey.Name)

However, although our Flink application can write value info to SideOutput
successfully, the data in SideOutput won't be
sent to the input stream. We wonder whether it's doable for our scenario
with Flink? If so, how should we modify our code to
achieve the goal? Many thanks for any comments.

Best regards,
Chen-Che Huang

Reply via email to