Hi All,

I looked into an earlier email about the topic broadcast config through
connected stream and I couldn't find the conclusion.

I can't do the below approach since I need the config to be published to
all operator instances but I need keyed state for external querying.

streamToBeConfigured.connect(configMessageStream)
.keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
.flatMap(new FunctionWithConfigurableState())
.addSink(...);

One of the resolution I found in that mail chain was below. I can use this
to solve my issue but is this still the recommended approach?

stream1.connect(stream2)
            .map(new MergeStreamsMapFunction()) // Holds transient state of
the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data,
ConfigMessage>
            .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow
for ValueStateDescriptors and semantically correct partitioning according
to business logic
            .flatMap(new StatefulFlatMapFunction()) // Save latest received
ConfigMessage-Value in ValueStateDescriptor here
            .addSink(...);

Thanks,
Navneeth

Reply via email to