Hi. Because the data that I will cache come from a downstream operator and iterations was the only way to look data back to a prev. Operator as I know
Med venlig hilsen / Best regards Lasse Nedergaard > Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <pi...@data-artisans.com>: > > Hi, > > Why can not you use simple CoProcessFunction and handle cache updates within > it’s processElement1 or processElement2 method? > > Piotrek > >> On 1 May 2018, at 10:20, Lasse Nedergaard <lassenederga...@gmail.com> wrote: >> >> Hi. >> >> I have a case where I have a input stream that I want to enrich with >> external data. I want to cache some of the external lookup data to improve >> the overall performances. >> To update my cache (a CoProcessFunction) I would use iteration to send the >> external enriched information back to the cache and update a mapstate. I use >> CoProcesFunction as the input stream and the enrich stream contains 2 >> diff.object types and I don't want to mix them. >> Because I use a ConnectedIterativeStream I can't use state in my >> CoProcessFunction because the ConnectedIterativeStream create a DataStream >> based on the Feedback signature and not the stream I close the iteration >> with and it is not possible to provide a keySelector in the withFeedbackType >> >> Form Flink source >> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> >> feedbackType, long waitTime) { >> super(input.getExecutionEnvironment(), input, new >> DataStream(input.getExecutionEnvironment(), new >> CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime))); >> } >> and both streams need to be keyed before state are assigned to the operator. >> Any ideas how to workaround this problem? >> >> My sudo code is as below. >> >> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration >> = inputStream >> .keyBy(obj -> obj.getkey)) >> >> .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new >> TypeHint<EnrichData>() {})); >> >> DataStream<ReportMessageBase> enrichedStream = iteration >> .process(new EnrichFromState()); >> >> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream >> .filter(obj -> obj.enriched); >> >> EnrichService EnrichService = new EnrichService(); >> DataStream<InputObject> enrichedFromApi = >> EnrichService.parse(notEnrichedOutput); >> >> DataStream<EnrichData> newEnrich = enrichedFromApi >> .map(obj -> { >> >> EnrichData newData = new EnrichData(); >> newData.xx = obj.xx(); >> >> return newData; >> }) >> .keyBy(obj -> obj.getkey); >> >> >> iteration.closeWith(newAddresses); >> .... >