Hi Orionemail, There is no simple state access in asyncIO operator. I think this would require a custom caching solution. Maybe, other community users solved this problem in some other way.
Best, Andrey On Mon, Jun 8, 2020 at 5:33 PM orionemail <orionem...@protonmail.com> wrote: > Hi, > > Following on from an earlier email my approach has changed but I am still > unsure how to best acheive my goal. > > I have records coming through a kinesis stream into flink: > > { id: <string> > var1: <string> > ... > } > > 'id' needs to be replaced with a value from a DB store, or if not present > in the DB generate in flink a new ID, cache the value and then store back > in the db. This is essentially a basic ID mapping service. > > Currently for each record I use asyncIO to get a value from Dynamo or > generate and write the new value back to the DB. > > This is unnecissary as I should be able to cache this value after the > first time it is seen/generated. > > What I want to do is cache the value from the DB after first fetch in some > form of local state but also update the DB. > > My confusion is over which of the API's or what I should use to do this? > > Currently my code looks something like: > > source = KeyedStream<Pojo, Object> getKinesisSource().keyBy(pojo - pojo.id > ) > > SingleOutputStreamOperator<Pojo2> ps = AsycDataStream.unorderedWait(source, > new DynoProcessingCode(), > .. > ..).process(new processFunction()) > > class processFunction extends ProcessFunction<Pojo, O> { > .. > } > > If I insert a KeyedProcessFunction after the keyby and before the asyncIO > I could abort the Async process if the ID has already been read from the > cache, but if I do need to fetch from the db, how do I store that in the > keyed cache in the Async IO process? It seems that maybe that is not > possible and I should use Operator State? > > Any help appreciated. > > Thanks, > > O > > > Sent with ProtonMail <https://protonmail.com> Secure Email. > >