Hi, I agree with Aljoscha that needs to be solved properly, but it is technically possible to do it now as well (he actually had a PR a while back doing this.)
You need to manually change how the transform method works on the connected stream to allow setting the key only one input. You need to use some reflection magic though to create the output operator if you dont want to recompile your own Flink version but it's definitely doable. (I use this technique in several of my production jobs) https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java#L234 As for fault-tolerance you need to make sure to checkpoint the broadcasted state using the Checkpointed interface. Cheers, Gyula Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2016. okt. 27., Cs, 12:07): > Hi Julian, > I think it's currently not possible to do that in a fault-tolerant way. > (The problem is that the state that results from the broadcast input also > needs to be checkpointed, which is not possible right now.) A while back, I > created an issue for that: > https://issues.apache.org/jira/browse/FLINK-3659. I'm hoping we can still > get this in in some form for Flink 1.2. > > Cheers, > Aljoscha > > On Thu, 27 Oct 2016 at 10:57 Julian Bauß <julian.ba...@gmail.com> wrote: > > Hi Ufuk, > > Thanks for your response. Unfortunately that does not work. > Having ValueStateDescriptors in the CoFlatMap on the connected Stream > requires a keyBy on the connected Stream. > > Another solution I can think of would be this: > > stream1.connect(stream2) > .map(new MergeStreamsMapFunction()) // Holds transient state > of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data, > ConfigMessage> > .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow > for ValueStateDescriptors and semantically correct partitioning according > to business logic > .flatMap(new StatefulFlatMapFunction()) // Save latest > received ConfigMessage-Value in ValueStateDescriptor here > .addSink(...); > > I have yet to test this. > This seems a little complicated but it might work? > > Best Regards, > > Julian > > 2016-10-26 16:09 GMT+02:00 Ufuk Celebi <u...@apache.org>: > > Does the following work? > > stream1.keyBy().connect(stream2.broadcast()) > > On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß <julian.ba...@gmail.com> > wrote: > > Hello Everybody, > > > > I'm currently trying to change the state of a CoFlatMapFunction with the > > help of a connected configuration-stream. The code looks something like > > this. > > > > streamToBeConfigured.connect(configMessageStream) > > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector) > > .flatMap(new FunctionWithConfigurableState()) > > .addSink(...); > > > > The Stream with the actual functionality is keyedBy an Id but the > > ConfigMessages don't contain any Id to key them by. They are just > > "key=value"-Strings that should be broadcasted to all instances of the > > CoFlatMapFunction() regardless of what Id they are keyed by. > > > > Is there any way to do that? > > > > Best Regards, > > > > Julian > > >