Hi Julian,
I think it's currently not possible to do that in a fault-tolerant way.
(The problem is that the state that results from the broadcast input also
needs to be checkpointed, which is not possible right now.) A while back, I
created an issue for that: https://issues.apache.org/jira/browse/FLINK-3659.
I'm hoping we can still get this in in some form for Flink 1.2.

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 10:57 Julian Bauß <julian.ba...@gmail.com> wrote:

> Hi Ufuk,
>
> Thanks for your response. Unfortunately that does not work.
> Having ValueStateDescriptors in the CoFlatMap on the connected Stream
> requires a keyBy on the connected Stream.
>
> Another solution I can think of would be this:
>
> stream1.connect(stream2)
>             .map(new MergeStreamsMapFunction()) // Holds transient state
> of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data,
> ConfigMessage>
>             .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow
> for ValueStateDescriptors and semantically correct partitioning according
> to business logic
>             .flatMap(new StatefulFlatMapFunction()) // Save latest
> received ConfigMessage-Value in ValueStateDescriptor here
>             .addSink(...);
>
> I have yet to test this.
> This seems a little complicated but it might work?
>
> Best Regards,
>
> Julian
>
> 2016-10-26 16:09 GMT+02:00 Ufuk Celebi <u...@apache.org>:
>
> Does the following work?
>
> stream1.keyBy().connect(stream2.broadcast())
>
> On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß <julian.ba...@gmail.com>
> wrote:
> > Hello Everybody,
> >
> > I'm currently trying to change the state of a CoFlatMapFunction with the
> > help of a connected configuration-stream. The code looks something like
> > this.
> >
> > streamToBeConfigured.connect(configMessageStream)
> > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> > .flatMap(new FunctionWithConfigurableState())
> > .addSink(...);
> >
> > The Stream with the actual functionality is keyedBy an Id but the
> > ConfigMessages don't contain any Id to key them by. They are just
> > "key=value"-Strings that should be broadcasted to all instances of the
> > CoFlatMapFunction() regardless of what Id they are keyed by.
> >
> > Is there any way to do that?
> >
> > Best Regards,
> >
> > Julian
>
>
>

Reply via email to