Hi Kristoff,

the answer to your big questions is unfortunately no, two times. I see two
options in general:

1) process function (as you proposed). On processElement, you'd read the
state and invoke your async operation. You enqueue your result in some
result queue where you emit it in the next call of processElement. To deal
with rare keys, you'd probably also want to use a timer to flush the
outputs instead. In the timer/next processElement, you can also access the
key state. However, you also need to ensure that these pending results are
snapshotted, such that they are not lost on crash. I'd expect that you can
mix ProcessFunction and CheckpointedFunction, but haven't done it yet
myself.

2) implement your own operator, where you can start by copying or
subclassing the existing AsyncWaitOperator [1]. One thing that you need to
look out for is to access the state and output collector only in mailbox
thread (=main task thread). You can use mailboxExecutor to execute a piece
of code in the mailbox.

Even if you go by 1), have a look at the AsyncWaitOperator as it should
serve as a good template.

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java

On Fri, Aug 14, 2020 at 12:14 PM KristoffSC <krzysiek.chmielew...@gmail.com>
wrote:

> Thanks Arvid,
> I like your propositions in my case I wanted to use the state value to
> decide if I should do the Async Call to external system. The result of this
> call would be a state input. So having this:
>
> Process1(calcualteValue or take it from state) -> AsyncCall to External
> system to persist/Validate the value -> Process2(feedback loop Via
> meessagibg queue to process1).
>
> Apart from that Process1 would have to consume two streams, which is ok, I
> woudl actually have a delay. I wanted to avouid uneceserry calls to
> External
> system by having the cashed/validated value in state.
>
> And this would be done without the delay if I could use State in Async
> Operators.
>
>
> I'm finking bout manufacturing my own Semi Async Operator. My Idea is that
> I
> would have normal KeyedProcessFunction that will wrap list of
> SingleThreadExecutors.
>
> In processElement method I will use Key to calculate the index of that
> Array
> to make sure that message for same Key will go to the same ThreadExecutor.
> I
> do want to keep the message order.
>
> I will submit a task like
> executor.submit(() -> {
>     MyResult result = rservice.process(message, mapState.get(key));
>     mapState.put(key, result);
>     out.collect(newMessage);
> }
>
>
>
> Big questions:
> 1. In my solution  out.collect(newMessage); will be called from few threads
> (each will have different message). Is it ThreadSafe?
> 2. Is using the MapState in multiThreadEnv like I would have here is thread
> safe?
> Alternativelly I can have associate list of mapStates, one for each
> SingleThreadExecutors, so It will be used only by one thread.
>
> With this setup I will not block my Pipeline and I will be able to use
> state. I agree that Size of SingleThreadExecutors list will be a limiting
> factor.
>
>
> Is this setup possible with Flink?
>
>
> Btw I will use RocksDbStateBackend
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to