Hi KristoffSC,

you are right that state is not shared across operators - I forgot about
that. So the approach would only be valid as is if the state can be
properly separated into two independent subtasks. For example, you need the
state to find the database key and you store the full entry in Flink state
afterwards. Then you could fetch the key in the map before async IO and
keep the full record in the map after async IO.

Another approach is to perform some kind of feedback from async IO to the
first map. There is usually a tradeoff between performance (use a Kafka
topic for feedback) and complexity (write some TCP socket magic). I'd
rather recommend to have a look at statefun though [1], which implements
this feedback in an efficient way and provides a good abstraction for
everything that is state-related. Unfortunately, mixing Flink jobs and
statefun applications is still not easily possible - I'm assuming it would
happen in the next major release. But maybe, you can express everything in
statefun, at which point, it's the best choice.

For your question: it shouldn't make any difference, as the function gets
serialized in the main() and deserialized at each JM/TM resulting in many
copies. The only difference is that in your main(), you have one fewer
copy. Since Flink state is only touched in TM, the function instances are
different anyways.

[1] https://github.com/apache/flink-statefun

On Thu, Aug 13, 2020 at 2:53 PM KristoffSC <krzysiek.chmielew...@gmail.com>
wrote:

> Hi Arvid,
> thank you for the respond.
> Yeah I tried to run my job shortly after posting my message and I got
> "State
> is not supported in rich async function" ;)
>
> I came up with a solution that would solve my initial problem -
> concurrent/Async problem of processing messages with the same key but
> unfortunately stet is not sported here.
>
> Thank you for the proposition
> source -> keyby -> map (retrieve state) -> async IO (use state) -> map
> (update state)
>
> However I'm a little bit surprised. I thought that state on a keyed cannot
> be shared between operators, and here you are suggesting doing that. Is it
> possible then?
>
>
> Using this occasion I have additional question, Is there any difference
> from
> Flink perspective between this two approaches:
>
> MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is
> stateless object, but it uses Flink keyed state.
>
> Setup 1:
>
> source -> keyBy(key) ->  proces(pf) -> map() -> process(pf) -> sink
>
> Setup 2:
> source -> keyBy(key) ->  proces(new MyProcessFunction()) -> map() ->
> process(new MyProcessFunction()) -> sink
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to