Hi Julian, I think it's currently not possible to do that in a fault-tolerant way. (The problem is that the state that results from the broadcast input also needs to be checkpointed, which is not possible right now.) A while back, I created an issue for that: https://issues.apache.org/jira/browse/FLINK-3659. I'm hoping we can still get this in in some form for Flink 1.2.
Cheers, Aljoscha On Thu, 27 Oct 2016 at 10:57 Julian Bauß <julian.ba...@gmail.com> wrote: > Hi Ufuk, > > Thanks for your response. Unfortunately that does not work. > Having ValueStateDescriptors in the CoFlatMap on the connected Stream > requires a keyBy on the connected Stream. > > Another solution I can think of would be this: > > stream1.connect(stream2) > .map(new MergeStreamsMapFunction()) // Holds transient state > of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data, > ConfigMessage> > .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow > for ValueStateDescriptors and semantically correct partitioning according > to business logic > .flatMap(new StatefulFlatMapFunction()) // Save latest > received ConfigMessage-Value in ValueStateDescriptor here > .addSink(...); > > I have yet to test this. > This seems a little complicated but it might work? > > Best Regards, > > Julian > > 2016-10-26 16:09 GMT+02:00 Ufuk Celebi <u...@apache.org>: > > Does the following work? > > stream1.keyBy().connect(stream2.broadcast()) > > On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß <julian.ba...@gmail.com> > wrote: > > Hello Everybody, > > > > I'm currently trying to change the state of a CoFlatMapFunction with the > > help of a connected configuration-stream. The code looks something like > > this. > > > > streamToBeConfigured.connect(configMessageStream) > > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector) > > .flatMap(new FunctionWithConfigurableState()) > > .addSink(...); > > > > The Stream with the actual functionality is keyedBy an Id but the > > ConfigMessages don't contain any Id to key them by. They are just > > "key=value"-Strings that should be broadcasted to all instances of the > > CoFlatMapFunction() regardless of what Id they are keyed by. > > > > Is there any way to do that? > > > > Best Regards, > > > > Julian > > >