Hi Orionemail,

There is no simple state access in asyncIO operator. I think this would
require a custom caching solution.
Maybe, other community users solved this problem in some other way.

Best,
Andrey


On Mon, Jun 8, 2020 at 5:33 PM orionemail <orionem...@protonmail.com> wrote:

> Hi,
>
> Following on from an earlier email my approach has changed but I am still
> unsure how to best acheive my goal.
>
> I have records coming through a kinesis stream into flink:
>
> { id: <string>
>   var1: <string>
>   ...
> }
>
> 'id' needs to be replaced with a value from a DB store, or if not present
> in the DB generate in flink a new ID, cache the value and then store back
> in the db.  This is essentially a basic ID mapping service.
>
> Currently for each record I use asyncIO to get a value from Dynamo or
> generate and write the new value back to the DB.
>
> This is unnecissary as I should be able to cache this value after the
> first time it is seen/generated.
>
> What I want to do is cache the value from the DB after first fetch in some
> form of local state  but also update the DB.
>
> My confusion is over which of the API's or what I should use to do this?
>
> Currently my code looks something like:
>
> source = KeyedStream<Pojo, Object> getKinesisSource().keyBy(pojo - pojo.id
> )
>
> SingleOutputStreamOperator<Pojo2> ps = AsycDataStream.unorderedWait(source,
> new DynoProcessingCode(),
> ..
> ..).process(new processFunction())
>
> class processFunction extends ProcessFunction<Pojo, O> {
>     ..
> }
>
> If I insert a KeyedProcessFunction after the keyby and before the asyncIO
> I could abort the Async process if the ID has already been read from the
> cache, but if I do need to fetch from the db, how do I store that in the
> keyed cache in the Async IO process?  It seems that maybe that is not
> possible and I should use Operator State?
>
> Any help appreciated.
>
> Thanks,
>
> O
>
>
> Sent with ProtonMail <https://protonmail.com> Secure Email.
>
>

Reply via email to