Re: Internal state and external stores conditional usage advice sought (dynamodb asyncIO)

2020-06-09 Thread Andrey Zagrebin
Hi Orionemail,

There is no simple state access in asyncIO operator. I think this would
require a custom caching solution.
Maybe, other community users solved this problem in some other way.

Best,
Andrey


On Mon, Jun 8, 2020 at 5:33 PM orionemail  wrote:

> Hi,
>
> Following on from an earlier email my approach has changed but I am still
> unsure how to best acheive my goal.
>
> I have records coming through a kinesis stream into flink:
>
> { id: 
>   var1: 
>   ...
> }
>
> 'id' needs to be replaced with a value from a DB store, or if not present
> in the DB generate in flink a new ID, cache the value and then store back
> in the db.  This is essentially a basic ID mapping service.
>
> Currently for each record I use asyncIO to get a value from Dynamo or
> generate and write the new value back to the DB.
>
> This is unnecissary as I should be able to cache this value after the
> first time it is seen/generated.
>
> What I want to do is cache the value from the DB after first fetch in some
> form of local state  but also update the DB.
>
> My confusion is over which of the API's or what I should use to do this?
>
> Currently my code looks something like:
>
> source = KeyedStream getKinesisSource().keyBy(pojo - pojo.id
> )
>
> SingleOutputStreamOperator ps = AsycDataStream.unorderedWait(source,
> new DynoProcessingCode(),
> ..
> ..).process(new processFunction())
>
> class processFunction extends ProcessFunction {
> ..
> }
>
> If I insert a KeyedProcessFunction after the keyby and before the asyncIO
> I could abort the Async process if the ID has already been read from the
> cache, but if I do need to fetch from the db, how do I store that in the
> keyed cache in the Async IO process?  It seems that maybe that is not
> possible and I should use Operator State?
>
> Any help appreciated.
>
> Thanks,
>
> O
>
>
> Sent with ProtonMail  Secure Email.
>
>


Internal state and external stores conditional usage advice sought (dynamodb asyncIO)

2020-06-08 Thread orionemail
Hi,

Following on from an earlier email my approach has changed but I am still 
unsure how to best acheive my goal.

I have records coming through a kinesis stream into flink:

{ id: 
  var1: 
  ...
}

'id' needs to be replaced with a value from a DB store, or if not present in 
the DB generate in flink a new ID, cache the value and then store back in the 
db.  This is essentially a basic ID mapping service.

Currently for each record I use asyncIO to get a value from Dynamo or generate 
and write the new value back to the DB.

This is unnecissary as I should be able to cache this value after the first 
time it is seen/generated.

What I want to do is cache the value from the DB after first fetch in some form 
of local state  but also update the DB.

My confusion is over which of the API's or what I should use to do this?

Currently my code looks something like:

source = KeyedStream getKinesisSource().keyBy(pojo - pojo.id)

SingleOutputStreamOperator ps = AsycDataStream.unorderedWait(source,
new DynoProcessingCode(),
..
..).process(new processFunction())

class processFunction extends ProcessFunction {
..
}

If I insert a KeyedProcessFunction after the keyby and before the asyncIO I 
could abort the Async process if the ID has already been read from the cache, 
but if I do need to fetch from the db, how do I store that in the keyed cache 
in the Async IO process?  It seems that maybe that is not possible and I should 
use Operator State?

Any help appreciated.

Thanks,

O

Sent with [ProtonMail](https://protonmail.com) Secure Email.