Re: Using managed keyed state with AsynIo

2020-08-14 Thread Arvid Heise
Hi Kristoff, the answer to your big questions is unfortunately no, two times. I see two options in general: 1) process function (as you proposed). On processElement, you'd read the state and invoke your async operation. You enqueue your result in some result queue where you emit it in the next

Re: Using managed keyed state with AsynIo

2020-08-14 Thread KristoffSC
Thanks Arvid, I like your propositions in my case I wanted to use the state value to decide if I should do the Async Call to external system. The result of this call would be a state input. So having this: Process1(calcualteValue or take it from state) -> AsyncCall to External system to

Re: Using managed keyed state with AsynIo

2020-08-14 Thread Arvid Heise
Hi KristoffSC, you are right that state is not shared across operators - I forgot about that. So the approach would only be valid as is if the state can be properly separated into two independent subtasks. For example, you need the state to find the database key and you store the full entry in

Re: Using managed keyed state with AsynIo

2020-08-13 Thread KristoffSC
Hi Arvid, thank you for the respond. Yeah I tried to run my job shortly after posting my message and I got "State is not supported in rich async function" ;) I came up with a solution that would solve my initial problem - concurrent/Async problem of processing messages with the same key but

Re: Using managed keyed state with AsynIo

2020-08-13 Thread Arvid Heise
Hi KristoffSC, Afaik asyncIO does not support state operations at all because of your mentioned issues (RichAsyncFunction fails if you access state). I'd probably solve it by having a map or process function before and after the asyncIO for the state operations. If you enable object reuse,

Using managed keyed state with AsynIo

2020-08-10 Thread KristoffSC
Hi guys, I'm using Flink 1.9.2 I have a question about uses case where I would like to use FLink's managed keyed state with Async IO [1] Lets take as a base line below example taken from [1] and lets assume that we are executing this on a keyed stream. final Future result = client.query(key);