Thanks a lot Aljoscha. That helps.

On Mon, Sep 25, 2017 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> I think this is a valid approach, you can even use "operator state" in
> your map function to make the broadcast config state stateful.
>
> Another approach would be to use internal APIs to hack an operator that
> has a keyed stream on one input and a broadcast stream on the second input.
> You can see that approach in action in the Beam Flink Runner [1] but I
> would strongly recommend against doing that because it is using internal
> APIs and if the other approach works for you I would stay with that.
>
> Best,
> Aljoscha
>
> [1] https://github.com/apache/beam/blob/be9fb29901cf4a1ae7b4a9d8e9f25f
> 4ea78359fd/runners/flink/src/main/java/org/apache/beam/runners/flink/
> FlinkStreamingTransformTranslators.java#L488
>
> On 15. Sep 2017, at 07:04, Navneeth Krishnan <reachnavnee...@gmail.com>
> wrote:
>
> Hi,
>
> Any suggestions on this could be achieved?
>
> Thanks
>
> On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> Any suggestions on this would really help.
>>
>> Thanks.
>>
>> On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I looked into an earlier email about the topic broadcast config through
>>> connected stream and I couldn't find the conclusion.
>>>
>>> I can't do the below approach since I need the config to be published to
>>> all operator instances but I need keyed state for external querying.
>>>
>>> streamToBeConfigured.connect(configMessageStream)
>>> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
>>> .flatMap(new FunctionWithConfigurableState())
>>> .addSink(...);
>>>
>>> One of the resolution I found in that mail chain was below. I can use
>>> this to solve my issue but is this still the recommended approach?
>>>
>>> stream1.connect(stream2)
>>>             .map(new MergeStreamsMapFunction()) // Holds transient state
>>> of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data,
>>> ConfigMessage>
>>>             .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow
>>> for ValueStateDescriptors and semantically correct partitioning according
>>> to business logic
>>>             .flatMap(new StatefulFlatMapFunction()) // Save latest
>>> received ConfigMessage-Value in ValueStateDescriptor here
>>>             .addSink(...);
>>>
>>> Thanks,
>>> Navneeth
>>>
>>
>>
>
>

Reply via email to