Thanks a lot Aljoscha. That helps. On Mon, Sep 25, 2017 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
> Hi, > > I think this is a valid approach, you can even use "operator state" in > your map function to make the broadcast config state stateful. > > Another approach would be to use internal APIs to hack an operator that > has a keyed stream on one input and a broadcast stream on the second input. > You can see that approach in action in the Beam Flink Runner [1] but I > would strongly recommend against doing that because it is using internal > APIs and if the other approach works for you I would stay with that. > > Best, > Aljoscha > > [1] https://github.com/apache/beam/blob/be9fb29901cf4a1ae7b4a9d8e9f25f > 4ea78359fd/runners/flink/src/main/java/org/apache/beam/runners/flink/ > FlinkStreamingTransformTranslators.java#L488 > > On 15. Sep 2017, at 07:04, Navneeth Krishnan <reachnavnee...@gmail.com> > wrote: > > Hi, > > Any suggestions on this could be achieved? > > Thanks > > On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan < > reachnavnee...@gmail.com> wrote: > >> Hi All, >> >> Any suggestions on this would really help. >> >> Thanks. >> >> On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan < >> reachnavnee...@gmail.com> wrote: >> >>> Hi All, >>> >>> I looked into an earlier email about the topic broadcast config through >>> connected stream and I couldn't find the conclusion. >>> >>> I can't do the below approach since I need the config to be published to >>> all operator instances but I need keyed state for external querying. >>> >>> streamToBeConfigured.connect(configMessageStream) >>> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector) >>> .flatMap(new FunctionWithConfigurableState()) >>> .addSink(...); >>> >>> One of the resolution I found in that mail chain was below. I can use >>> this to solve my issue but is this still the recommended approach? >>> >>> stream1.connect(stream2) >>> .map(new MergeStreamsMapFunction()) // Holds transient state >>> of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data, >>> ConfigMessage> >>> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow >>> for ValueStateDescriptors and semantically correct partitioning according >>> to business logic >>> .flatMap(new StatefulFlatMapFunction()) // Save latest >>> received ConfigMessage-Value in ValueStateDescriptor here >>> .addSink(...); >>> >>> Thanks, >>> Navneeth >>> >> >> > >