Hi,

Following on from an earlier email my approach has changed but I am still 
unsure how to best acheive my goal.

I have records coming through a kinesis stream into flink:

{ id: <string>
  var1: <string>
  ...
}

'id' needs to be replaced with a value from a DB store, or if not present in 
the DB generate in flink a new ID, cache the value and then store back in the 
db.  This is essentially a basic ID mapping service.

Currently for each record I use asyncIO to get a value from Dynamo or generate 
and write the new value back to the DB.

This is unnecissary as I should be able to cache this value after the first 
time it is seen/generated.

What I want to do is cache the value from the DB after first fetch in some form 
of local state  but also update the DB.

My confusion is over which of the API's or what I should use to do this?

Currently my code looks something like:

source = KeyedStream<Pojo, Object> getKinesisSource().keyBy(pojo - pojo.id)

SingleOutputStreamOperator<Pojo2> ps = AsycDataStream.unorderedWait(source,
new DynoProcessingCode(),
..
..).process(new processFunction())

class processFunction extends ProcessFunction<Pojo, O> {
    ..
}

If I insert a KeyedProcessFunction after the keyby and before the asyncIO I 
could abort the Async process if the ID has already been read from the cache, 
but if I do need to fetch from the db, how do I store that in the keyed cache 
in the Async IO process?  It seems that maybe that is not possible and I should 
use Operator State?

Any help appreciated.

Thanks,

O

Sent with [ProtonMail](https://protonmail.com) Secure Email.

Reply via email to