Hi, Why can not you use simple CoProcessFunction and handle cache updates within it’s processElement1 or processElement2 method?
Piotrek > On 1 May 2018, at 10:20, Lasse Nedergaard <lassenederga...@gmail.com> wrote: > > Hi. > > I have a case where I have a input stream that I want to enrich with external > data. I want to cache some of the external lookup data to improve the overall > performances. > To update my cache (a CoProcessFunction) I would use iteration to send the > external enriched information back to the cache and update a mapstate. I use > CoProcesFunction as the input stream and the enrich stream contains 2 > diff.object types and I don't want to mix them. > Because I use a ConnectedIterativeStream I can't use state in my > CoProcessFunction because the ConnectedIterativeStream create a DataStream > based on the Feedback signature and not the stream I close the iteration with > and it is not possible to provide a keySelector in the withFeedbackType > > Form Flink source > public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> > feedbackType, long waitTime) { > super(input.getExecutionEnvironment(), input, new > DataStream(input.getExecutionEnvironment(), new > CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime))); > } > and both streams need to be keyed before state are assigned to the operator. > Any ideas how to workaround this problem? > > My sudo code is as below. > > IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration > = inputStream > .keyBy(obj -> obj.getkey)) > > .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new > TypeHint<EnrichData>() {})); > > DataStream<ReportMessageBase> enrichedStream = iteration > .process(new EnrichFromState()); > > DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream > .filter(obj -> obj.enriched); > > EnrichService EnrichService = new EnrichService(); > DataStream<InputObject> enrichedFromApi = > EnrichService.parse(notEnrichedOutput); > > DataStream<EnrichData> newEnrich = enrichedFromApi > .map(obj -> { > > EnrichData newData = new EnrichData(); > newData.xx = obj.xx(); > > return newData; > }) > .keyBy(obj -> obj.getkey); > > > iteration.closeWith(newAddresses); > ....