Re: Preparing keyed state before snapshot

2024-02-21 Thread Lorenzo Nicora
you to store operator state as BLOB directly if that would be a >doable option for you. > > > > Sincere greetings > > > > Thias > > > > > > > > > > *From:* Zakelly Lan > *Sent:* Wednesday, February 21, 2024 8:04 AM > *To:* Lorenzo Nicora > *Cc:* F

RE: Preparing keyed state before snapshot

2024-02-21 Thread Schwalbe Matthias
directly if that would be a doable option for you. Sincere greetings Thias From: Zakelly Lan Sent: Wednesday, February 21, 2024 8:04 AM To: Lorenzo Nicora Cc: Flink User Group Subject: Re: Preparing keyed state before snapshot ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hi Lorenzo

Re: Preparing keyed state before snapshot

2024-02-20 Thread Zakelly Lan
t;state", similar to the kvCache in FastTop1Fucntion. > > But I am not sure I understand how I can set the keyed state for a > specific key, in snapshotState(). > FastTop1Function seems to rely on keyContext set via setKeyContext(). This > method is not part of the API. I see it's set s

Re: Preparing keyed state before snapshot

2024-02-20 Thread Lorenzo Nicora
Thanks Zakelly, I'd need to do something similar, with a map containing my non-serializable "state", similar to the kvCache in FastTop1Fucntion. But I am not sure I understand how I can set the keyed state for a specific key, in snapshotState(). FastTop1Function seems to rely on keyC

Re: Preparing keyed state before snapshot

2024-02-17 Thread Zakelly Lan
Hi Lorenzo, It is not recommended to do this with the keyed state. However there is an example in flink code (FastTop1Function#snapshotState) [1] of setting keys when snapshotState(). Hope this helps. [1] https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table

Re: Preparing keyed state before snapshot

2024-02-16 Thread Lorenzo Nicora
tate(…) you also get access to state of different operator > key. > > SnapshotState(…) is called as part of the (each) checkpoint in order to > store data. > > > > Sincere greetings > > > > Thias > > > > *From:* Lorenzo Nicora > *Sent:* Thursday, Fe

RE: Preparing keyed state before snapshot

2024-02-15 Thread Schwalbe Matthias
To: Flink User Group Subject: Preparing keyed state before snapshot Hello everyone, I have a convoluted problem. I am implementing a KeyedProcessFunction that keeps some non-serializable "state" in memory, in a transient Map (key = stream key, value = the non-serializable "state&qu

Preparing keyed state before snapshot

2024-02-15 Thread Lorenzo Nicora
" to Flink state. I need to do it only before the state snapshot. But KeyedProcessFunction has no entrypoint called before the state snapshot. I cannot use CheckpointedFunction.snapshotState(), because it does not work for keyed state. Any suggestions? Note that I cannot use operator state

Re: using CheckpointedFunction on a keyed state

2023-09-10 Thread liu ron
access state value from > open/init methods where there was not key context. > > Regarding the CheckpointedFunction interface. From javadoc example and > description I got an impression that this can be used to access keyed state > on keyed stream. > Quote "The OperatorStateS

Re: using CheckpointedFunction on a keyed state

2023-09-08 Thread Krzysztof Chmielewski
My apologies Mattthias, you are right. The issue was that I was trying to access state value from open/init methods where there was not key context. Regarding the CheckpointedFunction interface. From javadoc example and description I got an impression that this can be used to access keyed state

Re: updating keyed state in open method.

2023-09-07 Thread Zakelly Lan
Hi, You cannot access the keyed state within #open(). It can only be accessed under a keyed context ( a key is selected while processing an element, e.g. #processElement). Best, Zakelly On Thu, Sep 7, 2023 at 4:55 PM Krzysztof Chmielewski wrote: > > Hi, > I'm having a problem with my

RE: using CheckpointedFunction on a keyed state

2023-09-07 Thread Schwalbe Matthias
Hi Krzysztof again, Just for clarity … your sample code [1] tries to count the number of events per key. Assuming this is your intention? Anyway your previous implementation initialized the keyed state keyCounterState in the open function that is the right place to do this, you just wouldn’t

Re: updating keyed state in open method.

2023-09-07 Thread Krzysztof Chmielewski
Thanks, that helped. Regards, Krzysztof Chmielewski czw., 7 wrz 2023 o 09:52 Schwalbe Matthias napisał(a): > Hi Krzysztof, > > > > You cannot access keyed state in open(). > > Keyed state has a value per key. > > In theory you would have to initialize per p

using CheckpointedFunction on a keyed state

2023-09-07 Thread Krzysztof Chmielewski
::initializeState method that says: "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation." I got an impression from the docs that CheckpointedFunction can be used on a keyed stream and CheckpointedFunction::initializeState is for initialziing the state obje

RE: updating keyed state in open method.

2023-09-07 Thread Schwalbe Matthias
Hi Krzysztof, You cannot access keyed state in open(). Keyed state has a value per key. In theory you would have to initialize per possible key, which is quite impractical. However you don’t need to initialize state, the initial state per key default to the default value of the type (null

updating keyed state in open method.

2023-09-07 Thread Krzysztof Chmielewski
ink()); As you can see I'm using a keyBay and KeyCounter is extending KeyedProcessFunction. It seems that keyed state cannot be update from RichFunction::open() method. Is that intended? When I ran this example I have an exception that says: Caused by: java.lang.NullPointerException: No key

Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-15 Thread Lars Skjærven
e upgrade, but that was not >>>>> the case this time. >>>>> >>>>> I've not been able to reproduce this issue. I've checked that I can >>>>> kill the taskmanager and jobmanager (using kubectl delete pod), and the >>>>> job

Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-14 Thread Lars Skjærven
s not >>>> the case this time. >>>> >>>> I've not been able to reproduce this issue. I've checked that I can >>>> kill the taskmanager and jobmanager (using kubectl delete pod), and the job >>>> restores as expected. >>>> >>>&g

Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-13 Thread Martijn Visser
delete pod), and the job >>> restores as expected. >>> >>> The job is running with kubernetes high availability, rocksdb and >>> incremental checkpointing. >>> >>> Any tips are highly appreciated. >>> >>> Thanks, >>&

Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-09 Thread Lars Skjærven
ctl delete pod), and the job >> restores as expected. >> >> The job is running with kubernetes high availability, rocksdb and >> incremental checkpointing. >> >> Any tips are highly appreciated. >> >> Thanks, >> Lars >> >> Caused by: org.apache

Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-08 Thread Hangxiang Yu
ng. > > Any tips are highly appreciated. > > Thanks, > Lars > > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > KeyedProcessOperator_bf374b554824ef28e

Could not restore keyed state backend for KeyedProcessOperator

2022-12-08 Thread Lars Skjærven
are highly appreciated. Thanks, Lars Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the 1 provided restore options

Re: Re: Will Flink loss some old Keyed State when changing the parallelism

2021-12-20 Thread Seth Wiesman
le, you should take a savepoint, stop the job, then restart from > >> the savepoint with your new desired parallelism. This way, no data will be > >> lost. > >> > >> Best, > >> Nico > >> > >> On Thu, Nov 25, 2021 at 10:53 AM 杨浩 wrote: > >> > >> > Will Flink loss some old Keyed State when changing the parallelism, like > >> > 2 > >> > -> 5, or 5->3? > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > > > >

回复:Re: Will Flink loss some old Keyed State when changing the parallelism

2021-12-20 Thread 杨浩
he job, then restart from >> the savepoint with your new desired parallelism. This way, no data will be >> lost. >> >> Best, >> Nico >> >> On Thu, Nov 25, 2021 at 10:53 AM 杨浩 wrote: >> >> > Will Flink loss some old Keyed State when changing the parallelism, like 2 >> > -> 5, or 5->3? >> > >> > >> > >> > >> > >> > >> > >>

Re: Will Flink loss some old Keyed State when changing the parallelism

2021-11-26 Thread Yun Tang
cale, you should take a savepoint, stop the job, then restart from > the savepoint with your new desired parallelism. This way, no data will be > lost. > > Best, > Nico > > On Thu, Nov 25, 2021 at 10:53 AM 杨浩 wrote: > > > Will Flink loss some old Keyed State when ch

Re: Will Flink loss some old Keyed State when changing the parallelism

2021-11-26 Thread Nicolaus Weidner
Hi, to rescale, you should take a savepoint, stop the job, then restart from the savepoint with your new desired parallelism. This way, no data will be lost. Best, Nico On Thu, Nov 25, 2021 at 10:53 AM 杨浩 wrote: > Will Flink loss some old Keyed State when changing the parallelism, lik

Re: keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?

2021-11-25 Thread yidan zhao
当然不会。 杨浩 于2021年11月25日周四 下午6:09写道: > keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?

keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?

2021-11-25 Thread 杨浩
keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?

Will Flink loss some old Keyed State when changing the parallelism

2021-11-25 Thread 杨浩
Will Flink loss some old Keyed State when changing the parallelism, like 2 -> 5, or 5->3?

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Seth Wiesman
Hi Marc, I think you will find this is less efficient than just using keyed state. Remember state backends are local, reading and writing is extremely cheap. HashMapStateBackend is just an in-memory data structure and EmbeddedRocksDBStateBackend only works against local disk. Additionally

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Marc LEGER
d keys can then be retrieved inside every operator parallel task besides the other keyed state (control state): *// Control state partitioned by userId (keyed state) private ValueState controlState; // Data state partitioned by userId (operator state) private ListState dataState;* To avoid "stat

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Nicolaus Weidner
Hi Marc, thanks for clarifying, I had misunderstood some parts. Unfortunately, I don't think there is a way to update keyed state (for multiple keys even) outside of a keyed context. I will ask if someone else has an idea, but allow me to ask one counter-question first: Did you actually run

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-06 Thread Marc LEGER
Hello Nicolaus, Thank you for your quick feedback, sorry if I am not clear enough. Actually in the documented example, the state which is updated in the snapshotState method is an operator state and not a keyed state: *public void initializeState(FunctionInitializationContext context) throws

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-06 Thread Nicolaus Weidner
Hi Marc, I think you can just use keyed state in a CheckpointedFunction. FunctionInitializationContext gives you access to both keyed state and operator state (your stream needs to be keyed, of course). So you could just update your local custom state on regular invocations and update keyed state

Snapshot method for custom keyed state checkpointing ?

2021-10-05 Thread Marc LEGER
in a non-keyed context. Indeed, I am implementing a CoFlatMapFunction with: - a keyed state (state1) for a "control" stream (stream1) which is not often updated, - a keyed state (state2) for a "data" stream (stream2) with a high throughput and relying on a custom solution for int

Re: State processor API very slow reading a keyed state with RocksDB

2021-09-10 Thread David Causse
oc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/ReadOptions.html#setFillCache(boolean) > -- > *From:* Seth Wiesman > *Sent:* Friday, September 10, 2021 0:58 > *To:* David Causse ; user > *Cc:* Piotr Nowojski > *Subject:* Re: State processor A

Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Yun Tang
) From: Seth Wiesman Sent: Friday, September 10, 2021 0:58 To: David Causse ; user Cc: Piotr Nowojski Subject: Re: State processor API very slow reading a keyed state with RocksDB Hi David, I was also able to reproduce the behavior, but was able to get significant

Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Seth Wiesman
e same ballpark. >> >> I wrote a simple test case[1] to reproduce the problem. This program has >> 3 jobs: >> - CreateState: generate a keyed state (string->long) using the state >> processor api >> - StreamJob: reads all the keys using a StreamingExecutionEn

Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Piotr Nowojski
l hours. I would have expected these two job > runtimes to be in the same ballpark. > > I wrote a simple test case[1] to reproduce the problem. This program has 3 > jobs: > - CreateState: generate a keyed state (string->long) using the state > processor api > - StreamJob: re

State processor API very slow reading a keyed state with RocksDB

2021-09-08 Thread David Causse
: - CreateState: generate a keyed state (string->long) using the state processor api - StreamJob: reads all the keys using a StreamingExecutionEnvironment - ReadState: reads all the keys using the state processor api Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState & St

Re: Delete Keyed State outside of StateTTL

2021-08-31 Thread JING ZHANG
ive key, i.e. the key of the input element. >> Could we call the `clear()` method directly to remove the state under the >> specified key? >> >> Best, >> JING ZHANG >> >> >> narasimha 于2021年8月31日周二 上午9:44写道: >> >>> Hi, >>> &g

Re: Delete Keyed State outside of StateTTL

2021-08-30 Thread narasimha
于2021年8月31日周二 上午9:44写道: > >> Hi, >> >> I have a use case where the keyed state is managed (create, reset) by >> dynamically changing rules. New action "delete" has to be added. >> Delete is to completely delete the keyed state, same as how StateTTL doe

Re: Delete Keyed State outside of StateTTL

2021-08-30 Thread JING ZHANG
t; I have a use case where the keyed state is managed (create, reset) by > dynamically changing rules. New action "delete" has to be added. > Delete is to completely delete the keyed state, same as how StateTTL does > post expiration time. > > Use StateTTL? > >

Delete Keyed State outside of StateTTL

2021-08-30 Thread narasimha
Hi, I have a use case where the keyed state is managed (create, reset) by dynamically changing rules. New action "delete" has to be added. Delete is to completely delete the keyed state, same as how StateTTL does post expiration time. Use StateTTL? Initially used StateTTL, but i

Re: Is keyed state supported in PyFlink?

2021-05-05 Thread Sumeet Malhotra
gt; 2021年5月4日 上午2:09,Sumeet Malhotra 写道: > > Hi, > > Is keyed state [1] supported by PyFlink yet? I can see some code for it in > the Flink master branch, but there's no mention of it in the 1.12 Python > documentation. > > Thanks, > Sumeet > > [1] > https://

Re: Is keyed state supported in PyFlink?

2021-05-05 Thread Dian Fu
Hi Sumeet, This feature is supported in 1.13.0 which was just released and so there is no documentation about it in 1.12. Regards, Dian > 2021年5月4日 上午2:09,Sumeet Malhotra 写道: > > Hi, > > Is keyed state [1] supported by PyFlink yet? I can see some code for it in > the F

Is keyed state supported in PyFlink?

2021-05-03 Thread Sumeet Malhotra
Hi, Is keyed state [1] supported by PyFlink yet? I can see some code for it in the Flink master branch, but there's no mention of it in the 1.12 Python documentation. Thanks, Sumeet [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html

Re: How to report metric based on keyed state piece

2021-02-17 Thread Salva Alcántara
Awesome Piotr! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to report metric based on keyed state piece

2021-02-17 Thread Salva Alcántara
Many thanks Kezhu for pointing me on that direction! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to report metric based on keyed state piece

2021-02-17 Thread Piotr Nowojski
Hi Salva, I'm not sure, but I think you can not access the state (especially the keyed state) from within the metric, as metrics are being evaluated outside of the keyed context, and also from another thread. Also things like `ValueState`/`MapState` are not exposing any size. So probably you

Re: How to report metric based on keyed state piece

2021-02-17 Thread Kezhu Wang
wonder what is the canonical way to accomplish the following: Given a Flink UDF, how to report a metric `y` which is a function of some (keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in the size of the state X. For instance, consider a `CoFlatMap` function with: - `X

How to report metric based on keyed state piece

2021-02-16 Thread Salva Alcántara
I wonder what is the canonical way to accomplish the following: Given a Flink UDF, how to report a metric `y` which is a function of some (keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in the size of the state X. For instance, consider a `CoFlatMap` function with: - `X

Re: How long will keyed state exist if no TTL given?

2020-12-08 Thread Marco Villalobos
2020 at 5:09 PM Marco Villalobos > wrote: >> >> After reading >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html >> >> It is unclear to me how long keyed state will exist if it has no TTL. >> Is it cached foreve

Re: How long will keyed state exist if no TTL given?

2020-12-08 Thread Khachatryan Roman
stream/state/state.html > > It is unclear to me how long keyed state will exist if it has no TTL. > Is it cached forever, unless explicitly cleared or overwritten? > > can somebody please explain to me? > > Thank you. >

How long will keyed state exist if no TTL given?

2020-12-08 Thread Marco Villalobos
After reading https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html It is unclear to me how long keyed state will exist if it has no TTL. Is it cached forever, unless explicitly cleared or overwritten? can somebody please explain to me? Thank you.

Re: Adding keyed state to test harness before calling process function.

2020-11-15 Thread Guowei Ma
Hi, Macro I do not find the direct way for doing this.(Maybe other guys know) A possible way might be 1. Build the expected keyed state and get the `OperatorSubtaskState` from an `xxOperatorTestHarness`. 2. Use the `OperatorSubtaskState` to initialize the `xxOperatorTestHarness` that is needed

Re: Adding keyed state to test harness before calling process function.

2020-11-12 Thread Marco Villalobos
Thank you.. I looked into that, but that does not initialize any values in keyed state, instead, it using key state, and lines 407-412 show that is not setting key state values in advanced, handling null values when it is not set in advance. public void processElement(String value, Context ctx

Re: Adding keyed state to test harness before calling process function.

2020-11-12 Thread Guowei Ma
; I would like to adding keyed state to test harness before calling process > function. > > I am using the OneInputStreamOperatorTestHarness. > > I can't find any examples online on how to do that, and I am struggling to > figure this out. > > Can somebody please provide guidan

Adding keyed state to test harness before calling process function.

2020-11-12 Thread Marco Villalobos
Hi, I would like to adding keyed state to test harness before calling process function. I am using the OneInputStreamOperatorTestHarness. I can't find any examples online on how to do that, and I am struggling to figure this out. Can somebody please provide guidance? My test case has keyed

回复: why we need keyed state and operate state when we already have checkpoint?

2020-10-13 Thread 大森林
So state: store the result of some operator(such as keyby,map) Checkpoint: store the last result when the program is running OK. Am I right? Thanks for your help~! --原始邮件-- 发件人:

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread Congxian Qiu
lease-1.11/concepts/stateful-stream-processing.html Best, Congxian 大森林 于2020年10月12日周一 下午9:26写道: > Thanks for your replies. > When I use no state-relevant code in my program,the checkingpoint can be > saved and resumed.❶ > > So then why we need *Keyed State/Operato

回复: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread 大森林
Thanks for your replies. When I use no state-relevant code in my program,the checkingpoint can be saved and resumed.❶ So then why we needKeyed State/Operator State/Stateful Function?❷ "the operators are reset to the time of the respective checkpoint." We already have met the requirement:"resume

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread Arvid Heise
Hi 大森林, You can always resume from checkpoints independent of the usage of keyed or non-keyed state of operators. 1 checkpoint contains the state of all operators at a given point in time. Each operator may have keyed state, raw state, or non-keyed state. As long as you are not changing

?????? why we need keyed state and operate state when we already have checkpoint?

2020-10-07 Thread ??????
Thanks for your replies,I have some understandings. There are two cases. 1. if I use no keyed state in program,when it's killed,I can only resume from previous result 1. if I use keyed state in program,when it's killed,I canresume from previous result and previous variable temporary

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-07 Thread Arvid Heise
I think there is some misunderstanding here: a checkpoint IS (a snapshot of) the keyed state and operator state (among a few more things). [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions On Wed, Oct 7, 2020 at 6:51 AM 大森林 wrote

?????? why we need keyed state and operate state when we already have checkpoint?

2020-10-06 Thread ??????
when the job is killed,state is also misssing. so why we need keyed state?Is keyed state useful when we try to resuming the killed job

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-06 Thread Shengkai Fang
y we need keyed state and operator state when we already have checkpoint? > > when a running jar crash,we can resume from the checkpoint > automatically/manually. > So why did we still need keyed state and operator state. > > Thanks >

why we need keyed state and operate state when we already have checkpoint?

2020-10-06 Thread ??????
Could you tell me: why we need keyed state and operator state when we already have checkpoint? when a running jar crash,we can resume from the checkpoint automatically/manually. So why did we still need keyed state and operator state. Thanks

Re: Monitor the usage of keyed state

2020-08-26 Thread Yun Tang
sday, August 25, 2020 22:12 To: Mu Kong Cc: flink-u...@apache.org Subject: Re: Monitor the usage of keyed state Hi Mu, I would suggest to look into RocksDB metrics which you can enable as Flink metrics [1] Best, Andrey [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.

Re: Monitor the usage of keyed state

2020-08-25 Thread Andrey Zagrebin
e a Flink job running with RichMapFunction that uses keyed state. > Although the TTL is enabled, I wonder if there is a way that I can monitor > the memory usage of the keyed state. I'm using RocksDB as the state backend. > > Best regards, > Mu >

Monitor the usage of keyed state

2020-08-20 Thread Mu Kong
Hi community, I have a Flink job running with RichMapFunction that uses keyed state. Although the TTL is enabled, I wonder if there is a way that I can monitor the memory usage of the keyed state. I'm using RocksDB as the state backend. Best regards, Mu

Re: Using managed keyed state with AsynIo

2020-08-14 Thread Arvid Heise
Hi Kristoff, the answer to your big questions is unfortunately no, two times. I see two options in general: 1) process function (as you proposed). On processElement, you'd read the state and invoke your async operation. You enqueue your result in some result queue where you emit it in the next

Re: Using managed keyed state with AsynIo

2020-08-14 Thread KristoffSC
Thanks Arvid, I like your propositions in my case I wanted to use the state value to decide if I should do the Async Call to external system. The result of this call would be a state input. So having this: Process1(calcualteValue or take it from state) -> AsyncCall to External system to

Re: Using managed keyed state with AsynIo

2020-08-14 Thread Arvid Heise
ot > be shared between operators, and here you are suggesting doing that. Is it > possible then? > > > Using this occasion I have additional question, Is there any difference > from > Flink perspective between this two approaches: > > MyProcessFunction pf = new MyProcessFunction(); M

Re: Using managed keyed state with AsynIo

2020-08-13 Thread KristoffSC
u are suggesting doing that. Is it possible then? Using this occasion I have additional question, Is there any difference from Flink perspective between this two approaches: MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is stateless object, but it uses Flink keyed state. Setup 1

Re: Using managed keyed state with AsynIo

2020-08-13 Thread Arvid Heise
ave the full context information on the subsequent operators. On Mon, Aug 10, 2020 at 4:24 PM KristoffSC wrote: > Hi guys, > I'm using Flink 1.9.2 > > I have a question about uses case where I would like to use FLink's managed > keyed state with Async IO [1] > > > Lets take as

Re: State Processor API to boot strap keyed state for Stream Application.

2020-08-13 Thread Arvid Heise
For future readers: this thread has been resolved in "Please help, I need to bootstrap keyed state into a stream" on the user mailing list asked by Marco. On Fri, Aug 7, 2020 at 11:52 PM Marco Villalobos wrote: > I have read the documentation and various blogs that state that it

Re: Please help, I need to bootstrap keyed state into a stream

2020-08-12 Thread Marco Villalobos
Hi Seth, Thank you for the advice. The solution you mentioned is exactly what I did. I wrote a small tutorial that explains how to repeat that pattern. You can read about my solution at https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream <https://github.

Re: Please help, I need to bootstrap keyed state into a stream

2020-08-12 Thread Seth Wiesman
Just to summarize the conversation so far: The state processor api reads data from a 3rd party system - such as JDBC in this example - and generates a savepoint file that is written out to some DFS. This savepoint can then be used to when starting a flink streaming application. It is a two-step

Re: Please help, I need to bootstrap keyed state into a stream

2020-08-10 Thread Marco Villalobos
I think there is a bug in Flink when running locally without a cluster. My code worked in a cluster, but failed when run locally. My code does not save null values in Map State. > On Aug 9, 2020, at 11:27 PM, Tzu-Li Tai wrote: > > Hi, > > For the NullPointerException, what seems to be

Re: Please help, I need to bootstrap keyed state into a stream

2020-08-10 Thread Marco Villalobos
Thank you. Your instruction was helpful in my solving this. You can read about my solution at https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream <https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream> > On Aug 10, 2

Re: Please help, I need to bootstrap keyed state into a stream

2020-08-10 Thread Marco Villalobos
First, thank you. I want to believe you, I don't see how that is possible. All of the code is self-contained, and at the bottom of all the code, I print out the non-null values before I attempt to put in the map state. All of the debug output before and after indicates that there is a null

Using managed keyed state with AsynIo

2020-08-10 Thread KristoffSC
Hi guys, I'm using Flink 1.9.2 I have a question about uses case where I would like to use FLink's managed keyed state with Async IO [1] Lets take as a base line below example taken from [1] and lets assume that we are executing this on a keyed stream. final Future result = client.query(key

Re: Please help, I need to bootstrap keyed state into a stream

2020-08-10 Thread orionemail
I recently was in the same situation as Marco, the docs do explain what you need to do, but without experience with Flink it might still not be obvious what you need to do. What I did initially: Setup the job to run in a 'write a save state' mode by implementing a command line switch I could

Re: Please help, I need to bootstrap keyed state into a stream

2020-08-10 Thread Tzu-Li Tai
Hi, For the NullPointerException, what seems to be happening is that you are setting NULL values in your MapState, that is not allowed by the API. Otherwise, the code that you showed for bootstrapping state seems to be fine. > I have yet to find a working example that shows how to do both >

Please help, I need to bootstrap keyed state into a stream

2020-08-08 Thread Marco Villalobos
19/12/14/bootstrap-your-flink-jobs>) However, I have yet to find a working example that shows how to do both. I am reaching out to the community to help solve and document this common problem. My progress is at https://github.com/minmay/flink-patterns/blob/master/bootstrap-keyed-stat

State Processor API to boot strap keyed state for Stream Application.

2020-08-07 Thread Marco Villalobos
I have read the documentation and various blogs that state that it is possible to load data into a data-set and use that data to bootstrap a stream application. The documentation literally says this, "...you can read a batch of data from any store, preprocess it, and write the result to a

Re: keyed state在不同算子中使用

2020-06-10 Thread Congxian Qiu
你好 现在 KeyedState 是不能跨算子使用的,也就是不同的算子使用的是不同的 state。 Best, Congxian Z-Z 于2020年6月11日周四 上午10:11写道: > 请问,假设两个算子以相同的字段keyby,它们可以使用相同的StateDescriptor从而使用相同的的keyed state吗

keyed state????????????????

2020-06-10 Thread Z-Z
??keybyStateDescriptorkeyed state??

Re: Flink: For terabytes of keyed state.

2020-05-06 Thread Congxian Qiu
//issues.apache.org/jira/browse/FLINK-10461 >>>> [3] https://issues.apache.org/jira/browse/FLINK-11008 >>>> >>>> Best, >>>> Congxian >>>> >>>> >>>> Gowri Sundaram 于2020年5月1日周五 下午6:29写道: >>>> &g

Re: Flink: For terabytes of keyed state.

2020-05-05 Thread Gowri Sundaram
wse/FLINK-10461 >>> [3] https://issues.apache.org/jira/browse/FLINK-11008 >>> >>> Best, >>> Congxian >>> >>> >>> Gowri Sundaram 于2020年5月1日周五 下午6:29写道: >>> >>>> Hello all, >>>> We have read in multiple >>

Re: Flink: For terabytes of keyed state.

2020-05-05 Thread Congxian Qiu
se/FLINK-11008 >> >> Best, >> Congxian >> >> >> Gowri Sundaram 于2020年5月1日周五 下午6:29写道: >> >>> Hello all, >>> We have read in multiple >>> <https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html> >>> sources <

Re: Flink: For terabytes of keyed state.

2020-05-03 Thread Gowri Sundaram
//flink.apache.org/usecases.html> that Flink has been >> used for use cases with terabytes of application state. >> >> We are considering using Flink for a similar use case with* keyed state >> in the range of 20 to 30 TB*. We had a few questions regarding the same. >>

Re: Flink: For terabytes of keyed state.

2020-05-03 Thread Congxian Qiu
f application state. > > We are considering using Flink for a similar use case with* keyed state > in the range of 20 to 30 TB*. We had a few questions regarding the same. > > >- *Is Flink a good option for this kind of scale of data* ? We are >considering using RocksDB as the

Flink: For terabytes of keyed state.

2020-05-01 Thread Gowri Sundaram
lar use case with* keyed state in the range of 20 to 30 TB*. We had a few questions regarding the same. - *Is Flink a good option for this kind of scale of data* ? We are considering using RocksDB as the state backend. - *What happens when we want to add a node to the cluster *?

Re:Re: Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-14 Thread chenxyz
tate.NoOpTaskLocalStateStoreImpl@3fd9cf09. 2020-04-14 11:42:46,070 DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating keyed state backend for KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) and restoring with

Re: Is it possible to emulate keyed state with operator state?

2020-04-10 Thread David Anderson
-each-record-to-a-processor-task-slot-td16483.html On Wed, Apr 8, 2020 at 9:10 AM Salva Alcántara wrote: > > Just for the sake of experimenting and learning. Let's assume that we have a > keyed process function using keyed state and we want to rewrite it using > operator state.

Is it possible to emulate keyed state with operator state?

2020-04-08 Thread Salva Alcántara
Just for the sake of experimenting and learning. Let's assume that we have a keyed process function using keyed state and we want to rewrite it using operator state. The question is, would that be possible to keep the exact same behaviour? For example, one could use operator union list state

Re: On efficient checkpoints with dynamic (self-evolving) keyed state

2020-04-06 Thread Salva Alcántara
Yet another option would be to use operator state instead, but this looks trickier to me. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: On efficient checkpoints with dynamic (self-evolving) keyed state

2020-04-06 Thread Salva Alcántara
I guess another option not mentioned in my question could be to use a custom serializer for the models. This way, I would not need to consider serialization issues myself within the process function and the snapshots for my models would be taken only once per checkpoint as desired -- Sent from:

  1   2   >