Re: TTL issue with large RocksDB keyed state

2024-06-03 Thread Yanfei Lei
Hi,

> 1. After multiple full checkpoints and a NATIVE savepoint the size was 
> unchanged. I'm wondering if RocksDb compaction is  because we never update 
> key values? The state is nearly fully composed of keys' space. Do keys not 
> get freed using RocksDb compaction filter for TTL?

Regarding TTL-related questions, has your job been running for 30
days? TTL is checked based on the last time a key was created or
updated.

Regarding “ I'm wondering if RocksDb compaction is  because we never
update key values”: Periodic compaction could speed up expired state
entries cleanup, especially for state entries rarely accessed[1],
maybe you can try to set it. BTW, are there any deletion operations in
your job?

> 2. That should work but will doing that "reset the clock" for the TTL?

No, TTL is stored as part of KV and therefore will not be reset. I
think you can try “TTL Periodic compaction”[1] first.

3. Yes, restoring from a canonical savepoint can bypass FLINK-34050,
and a canonical savepoint should be generated first.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#cleanup-of-expired-state

Cliff Resnick  于2024年6月3日周一 02:44写道:
>
> Hi everyone,
>
>
> We have a Flink application that has a very large and perhaps unusual state. 
> The basic shape of it is a very large and somewhat random keyed-stream 
> partition space, each with a continuously growing map-state keyed by 
> microsecond time Long values. There are never any overwrites in the map state 
> which is monotonic per partition key.  Map state was chosen over list state 
> in the hope that we can manage a sliding window using TTL. Using RocksDB 
> incremental checkpointing, the app runs very well despite the large total 
> checkpoint size. Our current checkpoint size is 3.2TB.
>
>
> We have multiple questions around space amplification problems when using the 
> RocksDB backend and I'm wondering if anyone can suggest or confirm answers.
>
>
>
> 1. Using LEVEL compaction we have not seen any decrease in total checkpoint 
> size through TTL compaction. To test the TTL, I cut the period from 60 to 30 
> days (we have well over 60 days processing time), enabled 
> cleanupFullSnapshot() and ran a test job without incremental checkpointing 
> enabled. After multiple full checkpoints and a NATIVE savepoint the size was 
> unchanged. I'm wondering if RocksDb compaction is  because we never update 
> key values? The state is nearly fully composed of keys' space. Do keys not 
> get freed using RocksDb compaction filter for TTL?
>
> 2. I'm wondering if FIFO compaction is a solution for above. To move to that 
> that we will need to first take a canonical savepoint then redeploy with 
> RocksDB/FIFO. That should work but will doing that "reset the clock" for the 
> TTL? Given it's nature though, I am leaning to this as our only option.
>
>
> 3. Rescaling is a problem because of this issue: 
> https://issues.apache.org/jira/browse/FLINK-34050. The fix for this is not 
> yet released. Because of this bug  the checkpoint size scales somewhat larger 
> than is proportionate to the job rescaling. For example if we go from 44 
> slots to 60, the checkpoint will scale from 3.2 TB to 4.9 TB. Before 1.19.1 
> is released can cherry-pick the fix and create our own Docker image, or will 
> restoring from a canonical savepoint as described above sidestep this bug?
>
>
> If anyone can help with any insights, please do!
>
>



-- 
Best,
Yanfei


TTL issue with large RocksDB keyed state

2024-06-02 Thread Cliff Resnick
Hi everyone,


We have a Flink application that has a very large and perhaps unusual
state. The basic shape of it is a very large and somewhat random
keyed-stream partition space, each with a continuously growing map-state
keyed by microsecond time Long values. There are never any overwrites in
the map state which is monotonic per partition key.  Map state was chosen
over list state in the hope that we can manage a sliding window using TTL.
Using RocksDB incremental checkpointing, the app runs very well despite the
large total checkpoint size. Our current checkpoint size is 3.2TB.


We have multiple questions around space amplification problems when using
the RocksDB backend and I'm wondering if anyone can suggest or confirm
answers.



1. Using LEVEL compaction we have not seen any decrease in total checkpoint
size through TTL compaction. To test the TTL, I cut the period from 60 to
30 days (we have well over 60 days processing time), enabled
cleanupFullSnapshot() and ran a test job without incremental checkpointing
enabled. After multiple full checkpoints and a NATIVE savepoint the size
was unchanged. I'm wondering if RocksDb compaction is  because we never
update key values? The state is nearly fully composed of keys' space. Do
keys not get freed using RocksDb compaction filter for TTL?

2. I'm wondering if FIFO compaction is a solution for above. To move to
that that we will need to first take a canonical savepoint then redeploy
with RocksDB/FIFO. That should work but will doing that "reset the clock"
for the TTL? Given it's nature though, I am leaning to this as our only
option.


3. Rescaling is a problem because of this issue:
https://issues.apache.org/jira/browse/FLINK-34050. The fix for this is not
yet released. Because of this bug  the checkpoint size scales somewhat
larger than is proportionate to the job rescaling. For example if we go
from 44 slots to 60, the checkpoint will scale from 3.2 TB to 4.9 TB.
Before 1.19.1 is released can cherry-pick the fix and create our own Docker
image, or will restoring from a canonical savepoint as described above
sidestep this bug?


If anyone can help with any insights, please do!


Re: Preparing keyed state before snapshot

2024-02-21 Thread Lorenzo Nicora
Thanks Thias and Zakelly,

I probably muddied the waters saying that my use case was similar to
kvCache.
What I was calling "non serializable state" is actually a Random Cut Forest
ML model that cannot be serialized by itself, but you can extract a
serializable state. That is serializable, but definitely not a
primitive type.
To be specific, I am trying to implement a keyed version of this RCF
operator [1]. I need one RCF model (a separate "forest") per key. Key
cardinality is not very high, and the size of the state should not be
a problem.

I guess the only feasible way is what Zakelly is suggesting, using
reflection to extract and set the keyContext from within processElement().
I will explore this option.

Thanks again
Lorenzo

[1]
https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/AnomalyDetection/RandomCutForest/src/main/java/software/amazon/flink/example/operator/RandomCutForestOperator.java


On Wed, 21 Feb 2024 at 08:13, Schwalbe Matthias 
wrote:

> Good morning all,
>
>
>
> Let me loop myself in …
>
>
>
>1. Another even more convenient way to enable cache is to actually
>configure/assign RocksDB to use more off-heap memory for cache, you also
>might consider enabling bloom filters  (all depends on how large you
>key-space is (thousands/millions/billions/…)
>
> Within the technological limits, RocksDB is hard to top, if keeping all
> data in memory is no option, this is the path I usually follow.
>
>1. The other question on how to control the current-key from within
>snapshot state: you can acquire a pointer to the underlying state backend
>e.g. from within open() and the get hold of a pointer of the specific state
>primitive, and set the current key directly.
>
> In order to find out how to do that, put a breakpoint in debugger and walk
> up a couple of call stack frames, and/or walk into the value setters and
> model after how it is done there.
>
> Mind though, to restore the current key, if you happen to change it to
> another key.
>
> Doing this e.g. in initializeState() is time-insensitive, because this
> happens outside the ‘hot’ code paths.
>
>1. If the number of elements to store is small, you can store it in
>operator state and initialize your local structure in initializeState()
>from it, you probably would want to keep the data in serialized form in
>operator state, since you mentioned, serialization would be expensive.
>2. There is another API (which I don’t remember the name of) that
>allows you to store operator state as BLOB directly if that would be a
>doable option for you.
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Zakelly Lan 
> *Sent:* Wednesday, February 21, 2024 8:04 AM
> *To:* Lorenzo Nicora 
> *Cc:* Flink User Group 
> *Subject:* Re: Preparing keyed state before snapshot
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Lorenzo,
>
>
>
> I think the most convenient way is to modify the code of the state
> backend, adding a k-v cache as you want.
>
>
>
> Otherwise IIUC, there's no public interface to get keyContext. But well,
> you may try something hacky. You may use the passed-in `Context` instance
> in processElement, and leverage java reflection to get
> the KeyedProcessOperator instance, where you can perform setCurrentKey().
>
>
>
>
>
> Best,
>
> Zakelly
>
>
>
> On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
> wrote:
>
> Thanks Zakelly,
>
>
>
> I'd need to do something similar, with a map containing my
> non-serializable "state", similar to the kvCache in FastTop1Fucntion.
>
>
>
> But I am not sure I understand how I can set the keyed state for a
> specific key, in snapshotState().
>
> FastTop1Function seems to rely on keyContext set via setKeyContext(). This
> method is not part of the API. I see it's set specifically for
> AbstractTopNFuction in StreamExecRank.
>
> How can I do something similar without modifying the Flink runtime?
>
>
>
> Lorenzo
>
>
>
>
>
> On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:
>
> Hi Lorenzo,
>
>
>
> It is not recommended to do this with the keyed state. However there is an
> example in flink code (FastTop1Function#snapshotState) [1] of setting keys
> when snapshotState().
>
>
>
> Hope this helps.
>
>
>
> [1]
> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>
>
>
> Best,
>
> Zakelly
>
&g

RE: Preparing keyed state before snapshot

2024-02-21 Thread Schwalbe Matthias
Good morning all,

Let me loop myself in …


  1.  Another even more convenient way to enable cache is to actually 
configure/assign RocksDB to use more off-heap memory for cache, you also might 
consider enabling bloom filters  (all depends on how large you key-space is 
(thousands/millions/billions/…)
Within the technological limits, RocksDB is hard to top, if keeping all data in 
memory is no option, this is the path I usually follow.

  1.  The other question on how to control the current-key from within snapshot 
state: you can acquire a pointer to the underlying state backend e.g. from 
within open() and the get hold of a pointer of the specific state primitive, 
and set the current key directly.
In order to find out how to do that, put a breakpoint in debugger and walk up a 
couple of call stack frames, and/or walk into the value setters and model after 
how it is done there.
Mind though, to restore the current key, if you happen to change it to another 
key.
Doing this e.g. in initializeState() is time-insensitive, because this happens 
outside the ‘hot’ code paths.

  1.  If the number of elements to store is small, you can store it in operator 
state and initialize your local structure in initializeState() from it, you 
probably would want to keep the data in serialized form in operator state, 
since you mentioned, serialization would be expensive.
  2.  There is another API (which I don’t remember the name of) that allows you 
to store operator state as BLOB directly if that would be a doable option for 
you.

Sincere greetings

Thias




From: Zakelly Lan 
Sent: Wednesday, February 21, 2024 8:04 AM
To: Lorenzo Nicora 
Cc: Flink User Group 
Subject: Re: Preparing keyed state before snapshot

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Lorenzo,

I think the most convenient way is to modify the code of the state backend, 
adding a k-v cache as you want.

Otherwise IIUC, there's no public interface to get keyContext. But well, you 
may try something hacky. You may use the passed-in `Context` instance in 
processElement, and leverage java reflection to get the KeyedProcessOperator 
instance, where you can perform setCurrentKey().


Best,
Zakelly

On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
mailto:lorenzo.nic...@gmail.com>> wrote:
Thanks Zakelly,

I'd need to do something similar, with a map containing my non-serializable 
"state", similar to the kvCache in FastTop1Fucntion.

But I am not sure I understand how I can set the keyed state for a specific 
key, in snapshotState().
FastTop1Function seems to rely on keyContext set via setKeyContext(). This 
method is not part of the API. I see it's set specifically for 
AbstractTopNFuction in StreamExecRank.
How can I do something similar without modifying the Flink runtime?

Lorenzo


On Sun, 18 Feb 2024 at 03:42, Zakelly Lan 
mailto:zakelly@gmail.com>> wrote:
Hi Lorenzo,

It is not recommended to do this with the keyed state. However there is an 
example in flink code (FastTop1Function#snapshotState) [1] of setting keys when 
snapshotState().

Hope this helps.

[1] 
https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165

Best,
Zakelly

On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
mailto:lorenzo.nic...@gmail.com>> wrote:
Hi Thias

I considered CheckpointedFunction.
In snapshotState() I would have to update the state of each key, extracting the 
in-memory "state" of each key and putting it in the state with 
state.update(...) .
This must happen per key,
But snapshotState() has no visibility of the keys. And I have no way of 
selectively accessing the state of a specific key to update it.
Unless I am missing something

Thanks
Lorenzo


On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:
Good morning Lorenzo,

You may want to implement 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in 
your KeyedProcessFunction.
Btw. By the time initializeState(…) is called, the state backend is fully 
initialized and can be read and written to (which is not the case for when the 
open(…) function is called.
In initializeState(…) you also get access to state of different operator key.
SnapshotState(…) is called as part of the (each) checkpoint in order to store 
data.

Sincere greetings

Thias

From: Lorenzo Nicora mailto:lorenzo.nic...@gmail.com>>
Sent: Thursday, February 15, 2024 7:50 PM
To: Flink User Group mailto:user@flink.apache.org>>
Subject: Preparing keyed state before snapshot

Hello everyone,

I have a convoluted problem.

I am implementing a KeyedProcessFunction that keeps some non-serializable 
"state" in memory, in a transient Map (key = stream key, value = the 
non-serializable "state").

I can extract a serializable representation to p

Re: Preparing keyed state before snapshot

2024-02-20 Thread Zakelly Lan
Hi Lorenzo,

I think the most convenient way is to modify the code of the state backend,
adding a k-v cache as you want.

Otherwise IIUC, there's no public interface to get keyContext. But well,
you may try something hacky. You may use the passed-in `Context` instance
in processElement, and leverage java reflection to get
the KeyedProcessOperator instance, where you can perform setCurrentKey().


Best,
Zakelly

On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
wrote:

> Thanks Zakelly,
>
> I'd need to do something similar, with a map containing my
> non-serializable "state", similar to the kvCache in FastTop1Fucntion.
>
> But I am not sure I understand how I can set the keyed state for a
> specific key, in snapshotState().
> FastTop1Function seems to rely on keyContext set via setKeyContext(). This
> method is not part of the API. I see it's set specifically for
> AbstractTopNFuction in StreamExecRank.
> How can I do something similar without modifying the Flink runtime?
>
> Lorenzo
>
>
> On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:
>
>> Hi Lorenzo,
>>
>> It is not recommended to do this with the keyed state. However there is
>> an example in flink code (FastTop1Function#snapshotState) [1] of setting
>> keys when snapshotState().
>>
>> Hope this helps.
>>
>> [1]
>> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>>
>> Best,
>> Zakelly
>>
>> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
>> wrote:
>>
>>> Hi Thias
>>>
>>> I considered CheckpointedFunction.
>>> In snapshotState() I would have to update the state of each key,
>>> extracting the in-memory "state" of each key and putting it in the state
>>> with state.update(...) .
>>> This must happen per key,
>>> But snapshotState() has no visibility of the keys. And I have no way of
>>> selectively accessing the state of a specific key to update it.
>>> Unless I am missing something
>>>
>>> Thanks
>>> Lorenzo
>>>
>>>
>>> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
>>> matthias.schwa...@viseca.ch> wrote:
>>>
>>>> Good morning Lorenzo,
>>>>
>>>>
>>>>
>>>> You may want to implement
>>>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>>>> your KeyedProcessFunction.
>>>>
>>>> Btw. By the time initializeState(…) is called, the state backend is
>>>> fully initialized and can be read and written to (which is not the case for
>>>> when the open(…) function is called.
>>>>
>>>> In initializeState(…) you also get access to state of different
>>>> operator key.
>>>>
>>>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>>>> store data.
>>>>
>>>>
>>>>
>>>> Sincere greetings
>>>>
>>>>
>>>>
>>>> Thias
>>>>
>>>>
>>>>
>>>> *From:* Lorenzo Nicora 
>>>> *Sent:* Thursday, February 15, 2024 7:50 PM
>>>> *To:* Flink User Group 
>>>> *Subject:* Preparing keyed state before snapshot
>>>>
>>>>
>>>>
>>>> Hello everyone,
>>>>
>>>>
>>>>
>>>> I have a convoluted problem.
>>>>
>>>>
>>>>
>>>> I am implementing a KeyedProcessFunction that keeps some
>>>> non-serializable "state" in memory, in a transient Map (key = stream key,
>>>> value = the non-serializable "state").
>>>>
>>>>
>>>>
>>>> I can extract a serializable representation to put in Flink state, and
>>>> I can load my in-memory "state" from the Flink state. But these operations
>>>> are expensive.
>>>>
>>>>
>>>>
>>>> Initializing the in-memory "state" is relatively easy. I do it lazily,
>>>> in processElement(), on the first record for the key.
>>>>
>>>>
>>>>
>>>> The problem is saving the in-memory "state" to Flink state.
>>>>
>>>> I need to do it only before the state snapshot. But
>>>> KeyedProcessFunction has no entrypoint called before the state snapshot.
>>>>
&g

Re: Preparing keyed state before snapshot

2024-02-20 Thread Lorenzo Nicora
Thanks Zakelly,

I'd need to do something similar, with a map containing my non-serializable
"state", similar to the kvCache in FastTop1Fucntion.

But I am not sure I understand how I can set the keyed state for a specific
key, in snapshotState().
FastTop1Function seems to rely on keyContext set via setKeyContext(). This
method is not part of the API. I see it's set specifically for
AbstractTopNFuction in StreamExecRank.
How can I do something similar without modifying the Flink runtime?

Lorenzo


On Sun, 18 Feb 2024 at 03:42, Zakelly Lan  wrote:

> Hi Lorenzo,
>
> It is not recommended to do this with the keyed state. However there is an
> example in flink code (FastTop1Function#snapshotState) [1] of setting keys
> when snapshotState().
>
> Hope this helps.
>
> [1]
> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>
> Best,
> Zakelly
>
> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
> wrote:
>
>> Hi Thias
>>
>> I considered CheckpointedFunction.
>> In snapshotState() I would have to update the state of each key,
>> extracting the in-memory "state" of each key and putting it in the state
>> with state.update(...) .
>> This must happen per key,
>> But snapshotState() has no visibility of the keys. And I have no way of
>> selectively accessing the state of a specific key to update it.
>> Unless I am missing something
>>
>> Thanks
>> Lorenzo
>>
>>
>> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
>> matthias.schwa...@viseca.ch> wrote:
>>
>>> Good morning Lorenzo,
>>>
>>>
>>>
>>> You may want to implement
>>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>>> your KeyedProcessFunction.
>>>
>>> Btw. By the time initializeState(…) is called, the state backend is
>>> fully initialized and can be read and written to (which is not the case for
>>> when the open(…) function is called.
>>>
>>> In initializeState(…) you also get access to state of different operator
>>> key.
>>>
>>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>>> store data.
>>>
>>>
>>>
>>> Sincere greetings
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>> *From:* Lorenzo Nicora 
>>> *Sent:* Thursday, February 15, 2024 7:50 PM
>>> *To:* Flink User Group 
>>> *Subject:* Preparing keyed state before snapshot
>>>
>>>
>>>
>>> Hello everyone,
>>>
>>>
>>>
>>> I have a convoluted problem.
>>>
>>>
>>>
>>> I am implementing a KeyedProcessFunction that keeps some
>>> non-serializable "state" in memory, in a transient Map (key = stream key,
>>> value = the non-serializable "state").
>>>
>>>
>>>
>>> I can extract a serializable representation to put in Flink state, and I
>>> can load my in-memory "state" from the Flink state. But these operations
>>> are expensive.
>>>
>>>
>>>
>>> Initializing the in-memory "state" is relatively easy. I do it lazily,
>>> in processElement(), on the first record for the key.
>>>
>>>
>>>
>>> The problem is saving the in-memory "state" to Flink state.
>>>
>>> I need to do it only before the state snapshot. But KeyedProcessFunction
>>> has no entrypoint called before the state snapshot.
>>>
>>> I cannot use CheckpointedFunction.snapshotState(), because it does not
>>> work for keyed state.
>>>
>>>
>>>
>>> Any suggestions?
>>>
>>>
>>>
>>> Note that I cannot use operator state nor a broadcast state.
>>>
>>> Processing is keyed. Every processed record modifies the in-memory
>>> "state" of that key. If the job rescale, the state of the key must follow
>>> the partition.
>>>
>>>
>>>
>>>
>>>
>>> Regards
>>>
>>> Lorenzo
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>>
>>> This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>


Re: Preparing keyed state before snapshot

2024-02-17 Thread Zakelly Lan
Hi Lorenzo,

It is not recommended to do this with the keyed state. However there is an
example in flink code (FastTop1Function#snapshotState) [1] of setting keys
when snapshotState().

Hope this helps.

[1]
https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165

Best,
Zakelly

On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
wrote:

> Hi Thias
>
> I considered CheckpointedFunction.
> In snapshotState() I would have to update the state of each key,
> extracting the in-memory "state" of each key and putting it in the state
> with state.update(...) .
> This must happen per key,
> But snapshotState() has no visibility of the keys. And I have no way of
> selectively accessing the state of a specific key to update it.
> Unless I am missing something
>
> Thanks
> Lorenzo
>
>
> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
>> Good morning Lorenzo,
>>
>>
>>
>> You may want to implement
>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>> your KeyedProcessFunction.
>>
>> Btw. By the time initializeState(…) is called, the state backend is fully
>> initialized and can be read and written to (which is not the case for when
>> the open(…) function is called.
>>
>> In initializeState(…) you also get access to state of different operator
>> key.
>>
>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>> store data.
>>
>>
>>
>> Sincere greetings
>>
>>
>>
>> Thias
>>
>>
>>
>> *From:* Lorenzo Nicora 
>> *Sent:* Thursday, February 15, 2024 7:50 PM
>> *To:* Flink User Group 
>> *Subject:* Preparing keyed state before snapshot
>>
>>
>>
>> Hello everyone,
>>
>>
>>
>> I have a convoluted problem.
>>
>>
>>
>> I am implementing a KeyedProcessFunction that keeps some non-serializable
>> "state" in memory, in a transient Map (key = stream key, value = the
>> non-serializable "state").
>>
>>
>>
>> I can extract a serializable representation to put in Flink state, and I
>> can load my in-memory "state" from the Flink state. But these operations
>> are expensive.
>>
>>
>>
>> Initializing the in-memory "state" is relatively easy. I do it lazily, in
>> processElement(), on the first record for the key.
>>
>>
>>
>> The problem is saving the in-memory "state" to Flink state.
>>
>> I need to do it only before the state snapshot. But KeyedProcessFunction
>> has no entrypoint called before the state snapshot.
>>
>> I cannot use CheckpointedFunction.snapshotState(), because it does not
>> work for keyed state.
>>
>>
>>
>> Any suggestions?
>>
>>
>>
>> Note that I cannot use operator state nor a broadcast state.
>>
>> Processing is keyed. Every processed record modifies the in-memory
>> "state" of that key. If the job rescale, the state of the key must follow
>> the partition.
>>
>>
>>
>>
>>
>> Regards
>>
>> Lorenzo
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>


Re: Preparing keyed state before snapshot

2024-02-16 Thread Lorenzo Nicora
Hi Thias

I considered CheckpointedFunction.
In snapshotState() I would have to update the state of each key, extracting
the in-memory "state" of each key and putting it in the state with
state.update(...) .
This must happen per key,
But snapshotState() has no visibility of the keys. And I have no way of
selectively accessing the state of a specific key to update it.
Unless I am missing something

Thanks
Lorenzo


On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias 
wrote:

> Good morning Lorenzo,
>
>
>
> You may want to implement
> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
> your KeyedProcessFunction.
>
> Btw. By the time initializeState(…) is called, the state backend is fully
> initialized and can be read and written to (which is not the case for when
> the open(…) function is called.
>
> In initializeState(…) you also get access to state of different operator
> key.
>
> SnapshotState(…) is called as part of the (each) checkpoint in order to
> store data.
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
> *From:* Lorenzo Nicora 
> *Sent:* Thursday, February 15, 2024 7:50 PM
> *To:* Flink User Group 
> *Subject:* Preparing keyed state before snapshot
>
>
>
> Hello everyone,
>
>
>
> I have a convoluted problem.
>
>
>
> I am implementing a KeyedProcessFunction that keeps some non-serializable
> "state" in memory, in a transient Map (key = stream key, value = the
> non-serializable "state").
>
>
>
> I can extract a serializable representation to put in Flink state, and I
> can load my in-memory "state" from the Flink state. But these operations
> are expensive.
>
>
>
> Initializing the in-memory "state" is relatively easy. I do it lazily, in
> processElement(), on the first record for the key.
>
>
>
> The problem is saving the in-memory "state" to Flink state.
>
> I need to do it only before the state snapshot. But KeyedProcessFunction
> has no entrypoint called before the state snapshot.
>
> I cannot use CheckpointedFunction.snapshotState(), because it does not
> work for keyed state.
>
>
>
> Any suggestions?
>
>
>
> Note that I cannot use operator state nor a broadcast state.
>
> Processing is keyed. Every processed record modifies the in-memory "state"
> of that key. If the job rescale, the state of the key must follow the
> partition.
>
>
>
>
>
> Regards
>
> Lorenzo
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


RE: Preparing keyed state before snapshot

2024-02-15 Thread Schwalbe Matthias
Good morning Lorenzo,

You may want to implement 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in 
your KeyedProcessFunction.
Btw. By the time initializeState(…) is called, the state backend is fully 
initialized and can be read and written to (which is not the case for when the 
open(…) function is called.
In initializeState(…) you also get access to state of different operator key.
SnapshotState(…) is called as part of the (each) checkpoint in order to store 
data.

Sincere greetings

Thias

From: Lorenzo Nicora 
Sent: Thursday, February 15, 2024 7:50 PM
To: Flink User Group 
Subject: Preparing keyed state before snapshot

Hello everyone,

I have a convoluted problem.

I am implementing a KeyedProcessFunction that keeps some non-serializable 
"state" in memory, in a transient Map (key = stream key, value = the 
non-serializable "state").

I can extract a serializable representation to put in Flink state, and I can 
load my in-memory "state" from the Flink state. But these operations are 
expensive.

Initializing the in-memory "state" is relatively easy. I do it lazily, in 
processElement(), on the first record for the key.

The problem is saving the in-memory "state" to Flink state.
I need to do it only before the state snapshot. But KeyedProcessFunction has no 
entrypoint called before the state snapshot.
I cannot use CheckpointedFunction.snapshotState(), because it does not work for 
keyed state.

Any suggestions?

Note that I cannot use operator state nor a broadcast state.
Processing is keyed. Every processed record modifies the in-memory "state" of 
that key. If the job rescale, the state of the key must follow the partition.


Regards
Lorenzo
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Preparing keyed state before snapshot

2024-02-15 Thread Lorenzo Nicora
Hello everyone,

I have a convoluted problem.

I am implementing a KeyedProcessFunction that keeps some non-serializable
"state" in memory, in a transient Map (key = stream key, value = the
non-serializable "state").

I can extract a serializable representation to put in Flink state, and I
can load my in-memory "state" from the Flink state. But these operations
are expensive.

Initializing the in-memory "state" is relatively easy. I do it lazily, in
processElement(), on the first record for the key.

The problem is saving the in-memory "state" to Flink state.
I need to do it only before the state snapshot. But KeyedProcessFunction
has no entrypoint called before the state snapshot.
I cannot use CheckpointedFunction.snapshotState(), because it does not work
for keyed state.

Any suggestions?

Note that I cannot use operator state nor a broadcast state.
Processing is keyed. Every processed record modifies the in-memory "state"
of that key. If the job rescale, the state of the key must follow the
partition.


Regards
Lorenzo


Re: using CheckpointedFunction on a keyed state

2023-09-10 Thread liu ron
Hi Krzysztof


Can you give the original code in initializeState method and the
corresponding exception stack trace? It looks a little interesting.


Best,

Ron


Krzysztof Chmielewski  于2023年9月9日周六 07:12写道:

> My apologies Mattthias,
> you are right. The issue was that I was trying to access state value from
> open/init methods where there was not key context.
>
> Regarding the CheckpointedFunction interface. From javadoc example and
> description I got an impression that this can be used to access keyed state
> on keyed stream.
> Quote "The OperatorStateStore and KeyedStateStore give access to the data
> structures in which state should be stored for Flink to transparently
> manage and checkpoint it,"
>
> The way how I believe I should do is this:
>   public void initializeState(FunctionInitializationContext context)
> throws Exception {
> keyCounterState = context.getKeyedStateStore()
> .getState(new ValueStateDescriptor<>("keyCounter", Integer.class));
>   }
>
> So use `context.getKeyedStateStore()` to which I should have an access
> since my stream is keyed.
> I have updated my code [1] and run the toy job, that now works. I think
> that did the trick. WDYT?
>
> [1]
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/ae449b6b4b054f64f0828343456d814c3aaba962/src/main/java/org/example/KeyedCounter2.java#L19C5-L19C51
>
> czw., 7 wrz 2023 o 10:40 Schwalbe Matthias 
> napisał(a):
>
>> Hi Krzysztof again,
>>
>>
>>
>> Just for clarity … your sample code [1] tries to count the number of
>> events per key.
>>
>> Assuming this is your intention?
>>
>>
>>
>> Anyway your previous implementation initialized the keyed state
>> keyCounterState in the open function that is the right place to do this,
>>
>> you just wouldn’t want to store values in the state from within the
>> open() function.
>>
>>
>>
>> InitializeState() and snapshotState() are mainly used to initialize
>> operator state, not keyed state … refer to the relevant documentation.
>>
>>
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>> *From:* Krzysztof Chmielewski 
>> *Sent:* Donnerstag, 7. September 2023 09:59
>> *To:* user 
>> *Subject:* using CheckpointedFunction on a keyed state
>>
>>
>>
>> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>>
>>
>>
>> Hi,
>> I have a toy Flink job [1] where I have a KeyedProcessFunction
>> implementation [2] that also implements the CheckpointedFunction. My stream
>> definition has .keyBy(...) call as you can see in [1].
>>
>> However when I'm trying to run this toy job I'm getting an exception
>> from CheckpointedFunction::initializeState method that says:
>> "Keyed state can only be used on a 'keyed stream', i.e., after a
>> 'keyBy()' operation."
>>
>> I got an impression from the docs that  CheckpointedFunction can be used
>>  on a keyed stream and CheckpointedFunction::initializeState is for
>> initialziing the state object.
>> Are my assumptions wrong? Is  initializeState onlty to set an initial
>> value of a state per key and state object must be initialized in open()
>> method?
>>
>>
>> Thanks,
>> Krzysztof
>>
>> [1]
>> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/DataStreamJob.java
>>
>> [2]
>> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/KeyCounter.java
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>


Re: using CheckpointedFunction on a keyed state

2023-09-08 Thread Krzysztof Chmielewski
My apologies Mattthias,
you are right. The issue was that I was trying to access state value from
open/init methods where there was not key context.

Regarding the CheckpointedFunction interface. From javadoc example and
description I got an impression that this can be used to access keyed state
on keyed stream.
Quote "The OperatorStateStore and KeyedStateStore give access to the data
structures in which state should be stored for Flink to transparently
manage and checkpoint it,"

The way how I believe I should do is this:
  public void initializeState(FunctionInitializationContext context) throws
Exception {
keyCounterState = context.getKeyedStateStore()
.getState(new ValueStateDescriptor<>("keyCounter", Integer.class));
  }

So use `context.getKeyedStateStore()` to which I should have an access
since my stream is keyed.
I have updated my code [1] and run the toy job, that now works. I think
that did the trick. WDYT?

[1]
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/ae449b6b4b054f64f0828343456d814c3aaba962/src/main/java/org/example/KeyedCounter2.java#L19C5-L19C51

czw., 7 wrz 2023 o 10:40 Schwalbe Matthias 
napisał(a):

> Hi Krzysztof again,
>
>
>
> Just for clarity … your sample code [1] tries to count the number of
> events per key.
>
> Assuming this is your intention?
>
>
>
> Anyway your previous implementation initialized the keyed state
> keyCounterState in the open function that is the right place to do this,
>
> you just wouldn’t want to store values in the state from within the open()
> function.
>
>
>
> InitializeState() and snapshotState() are mainly used to initialize
> operator state, not keyed state … refer to the relevant documentation.
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* Krzysztof Chmielewski 
> *Sent:* Donnerstag, 7. September 2023 09:59
> *To:* user 
> *Subject:* using CheckpointedFunction on a keyed state
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
> I have a toy Flink job [1] where I have a KeyedProcessFunction
> implementation [2] that also implements the CheckpointedFunction. My stream
> definition has .keyBy(...) call as you can see in [1].
>
> However when I'm trying to run this toy job I'm getting an exception
> from CheckpointedFunction::initializeState method that says:
> "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()'
> operation."
>
> I got an impression from the docs that  CheckpointedFunction can be used
>  on a keyed stream and CheckpointedFunction::initializeState is for
> initialziing the state object.
> Are my assumptions wrong? Is  initializeState onlty to set an initial
> value of a state per key and state object must be initialized in open()
> method?
>
>
> Thanks,
> Krzysztof
>
> [1]
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/DataStreamJob.java
>
> [2]
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/KeyCounter.java
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Re: updating keyed state in open method.

2023-09-07 Thread Zakelly Lan
Hi,

You cannot access the keyed state within #open(). It can only be
accessed under a keyed context ( a key is selected while processing an
element, e.g. #processElement).

Best,
Zakelly

On Thu, Sep 7, 2023 at 4:55 PM Krzysztof Chmielewski
 wrote:
>
> Hi,
> I'm having a problem with my toy flink job where I would like to access a 
> ValueState of a keyed stream. The Job setup can be found here [1], it is 
> fairly simple
>
> env
> .addSource(new CheckpointCountingSource(100, 60))
> .keyBy(value -> value)
> .process(new KeyCounter())
> .addSink(new ConsoleSink());
>
> As you can see I'm using a keyBay and KeyCounter is extending 
> KeyedProcessFunction.
> It seems that keyed state cannot be update from RichFunction::open() method. 
> Is that intended?
>
> When I ran this example I have an exception that says:
>
> Caused by: java.lang.NullPointerException: No key set. This method should not 
> be called outside of a keyed context.
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
> at 
> org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:270)
> at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:260)
> at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143)
> at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72)
> at org.example.KeyCounter.open(KeyCounter.java:26)
>
>
> [1] 
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/KeyBayIssue/src/main/java/org/example/DataStreamJob.java


RE: using CheckpointedFunction on a keyed state

2023-09-07 Thread Schwalbe Matthias
Hi Krzysztof again,

Just for clarity … your sample code [1] tries to count the number of events per 
key.
Assuming this is your intention?

Anyway your previous implementation initialized the keyed state keyCounterState 
in the open function that is the right place to do this,
you just wouldn’t want to store values in the state from within the open() 
function.

InitializeState() and snapshotState() are mainly used to initialize operator 
state, not keyed state … refer to the relevant documentation.


Thias


From: Krzysztof Chmielewski 
Sent: Donnerstag, 7. September 2023 09:59
To: user 
Subject: using CheckpointedFunction on a keyed state

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi,
I have a toy Flink job [1] where I have a KeyedProcessFunction implementation 
[2] that also implements the CheckpointedFunction. My stream definition has 
.keyBy(...) call as you can see in [1].

However when I'm trying to run this toy job I'm getting an exception from 
CheckpointedFunction::initializeState method that says:
"Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' 
operation."

I got an impression from the docs that  CheckpointedFunction can be used   on a 
keyed stream and CheckpointedFunction::initializeState is for initialziing the 
state object.
Are my assumptions wrong? Is  initializeState onlty to set an initial value of 
a state per key and state object must be initialized in open() method?

Thanks,
Krzysztof
[1] 
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/DataStreamJob.java

[2] 
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/KeyCounter.java
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: updating keyed state in open method.

2023-09-07 Thread Krzysztof Chmielewski
Thanks,
that helped.

Regards,
Krzysztof Chmielewski

czw., 7 wrz 2023 o 09:52 Schwalbe Matthias 
napisał(a):

> Hi Krzysztof,
>
>
>
> You cannot access keyed state in open().
>
> Keyed state has a value per key.
>
> In theory you would have to initialize per possible key, which is quite
> impractical.
>
> However you don’t need to initialize state, the initial state per key
> default to the default value of the type (null for objects).
>
> Just drop the initializer [1]
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
> [1]
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/033f74c427553fbfe0aaffe7d2af4382c09734ad/src/main/java/org/example/KeyCounter.java#L26
>
>
>
>
>
>
>
>
>
> *From:* Krzysztof Chmielewski 
> *Sent:* Donnerstag, 7. September 2023 09:38
> *To:* user 
> *Subject:* updating keyed state in open method.
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
> I'm having a problem with my toy flink job where I would like to access a
> ValueState of a keyed stream. The Job setup can be found here [1], it is
> fairly simple
>
> env
> .addSource(new CheckpointCountingSource(100, 60))
> .keyBy(value -> value)
>     .process(new KeyCounter())
> .addSink(new ConsoleSink());
>
>
> As you can see I'm using a keyBay and KeyCounter is
> extending KeyedProcessFunction.
> It seems that keyed state cannot be update from RichFunction::open()
> method. Is that intended?
>
> When I ran this example I have an exception that says:
>
>
> Caused by: java.lang.NullPointerException: No key set. This method should
> not be called outside of a keyed context.
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
> at
> org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:270)
> at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:260)
> at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143)
> at
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72)
> at org.example.KeyCounter.open(KeyCounter.java:26)
>
>
> [1]
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/KeyBayIssue/src/main/java/org/example/DataStreamJob.java
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


using CheckpointedFunction on a keyed state

2023-09-07 Thread Krzysztof Chmielewski
Hi,
I have a toy Flink job [1] where I have a KeyedProcessFunction
implementation [2] that also implements the CheckpointedFunction. My stream
definition has .keyBy(...) call as you can see in [1].

However when I'm trying to run this toy job I'm getting an exception
from CheckpointedFunction::initializeState method that says:
"Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()'
operation."

I got an impression from the docs that  CheckpointedFunction can be used
 on a keyed stream and CheckpointedFunction::initializeState is for
initialziing the state object.
Are my assumptions wrong? Is  initializeState onlty to set an initial value
of a state per key and state object must be initialized in open() method?


Thanks,
Krzysztof

[1]
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/DataStreamJob.java

[2]
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/KeyCounter.java


RE: updating keyed state in open method.

2023-09-07 Thread Schwalbe Matthias
Hi Krzysztof,

You cannot access keyed state in open().
Keyed state has a value per key.
In theory you would have to initialize per possible key, which is quite 
impractical.
However you don’t need to initialize state, the initial state per key default 
to the default value of the type (null for objects).
Just drop the initializer [1]

Hope this helps

Thias


[1] 
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/033f74c427553fbfe0aaffe7d2af4382c09734ad/src/main/java/org/example/KeyCounter.java#L26




From: Krzysztof Chmielewski 
Sent: Donnerstag, 7. September 2023 09:38
To: user 
Subject: updating keyed state in open method.

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi,
I'm having a problem with my toy flink job where I would like to access a 
ValueState of a keyed stream. The Job setup can be found here [1], it is fairly 
simple
env
.addSource(new CheckpointCountingSource(100, 60))
.keyBy(value -> value)
.process(new KeyCounter())
.addSink(new ConsoleSink());

As you can see I'm using a keyBay and KeyCounter is extending 
KeyedProcessFunction.
It seems that keyed state cannot be update from RichFunction::open() method. Is 
that intended?

When I ran this example I have an exception that says:

Caused by: java.lang.NullPointerException: No key set. This method should not 
be called outside of a keyed context.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
at 
org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:270)
at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:260)
at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143)
at 
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72)
at org.example.KeyCounter.open(KeyCounter.java:26)


[1] 
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/KeyBayIssue/src/main/java/org/example/DataStreamJob.java
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


updating keyed state in open method.

2023-09-07 Thread Krzysztof Chmielewski
Hi,
I'm having a problem with my toy flink job where I would like to access a
ValueState of a keyed stream. The Job setup can be found here [1], it is
fairly simple

env
.addSource(new CheckpointCountingSource(100, 60))
.keyBy(value -> value)
.process(new KeyCounter())
.addSink(new ConsoleSink());

As you can see I'm using a keyBay and KeyCounter is
extending KeyedProcessFunction.
It seems that keyed state cannot be update from RichFunction::open()
method. Is that intended?

When I ran this example I have an exception that says:

Caused by: java.lang.NullPointerException: No key set. This method should
not be called outside of a keyed context.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
at
org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:270)
at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:260)
at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143)
at
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72)
at org.example.KeyCounter.open(KeyCounter.java:26)


[1]
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/KeyBayIssue/src/main/java/org/example/DataStreamJob.java


Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-15 Thread Lars Skjærven
Same error again today. Any tips ? I'm considering downgrading to Flink
1.14 ?

On Wed, Dec 14, 2022 at 11:51 AM Lars Skjærven  wrote:

> As far as I understand we are not specifying anything on restore mode. so
> I guess default (NO_CLAIM) is what we're using.
>
> We're using ververica platform to handle deploys, and things are a bit
> obscure on what happens underneath.
>
> It happened again this morning:
>
> Caused by: java.io.FileNotFoundException: Item not found: 
> 'gs://bucketname/namespace/flink-jobs/namespaces/default/jobs/fbdde9e7-cf5a-44b4-a3d4-d3ed517432a0/checkpoints/fbdde9e7cf5a44b4a3d4d3ed517432a0/shared/ae551eda-a588-45be-ba08-32bfbc50e965'.
>  Note, it is possible that the live version is still available but the 
> requested generation is deleted.
>
>
> On Tue, Dec 13, 2022 at 11:37 PM Martijn Visser 
> wrote:
>
>> Hi Lars,
>>
>> Have you used any of the new restore modes that were introduced with
>> 1.15? https://flink.apache.org/2022/05/06/restore-modes.html
>>
>> Best regards,
>>
>> Martijn
>>
>> On Fri, Dec 9, 2022 at 2:52 PM Lars Skjærven  wrote:
>>
>>> Lifecycle rulesNone
>>>
>>> On Fri, Dec 9, 2022 at 3:17 AM Hangxiang Yu  wrote:
>>>
>>>> Hi, Lars.
>>>> Could you check whether you have configured the lifecycle of google
>>>> cloud storage[1] which is not recommended in the flink checkpoint usage?
>>>>
>>>> [1] https://cloud.google.com/storage/docs/lifecycle
>>>>
>>>> On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven  wrote:
>>>>
>>>>> Hello,
>>>>> We had an incident today with a job that could not restore after crash
>>>>> (for unknown reason). Specifically, it fails due to a missing checkpoint
>>>>> file. We've experienced this a total of three times with Flink 1.15.2, but
>>>>> never with 1.14.x. Last time was during a node upgrade, but that was not
>>>>> the case this time.
>>>>>
>>>>> I've not been able to reproduce this issue. I've checked that I can
>>>>> kill the taskmanager and jobmanager (using kubectl delete pod), and the 
>>>>> job
>>>>> restores as expected.
>>>>>
>>>>> The job is running with kubernetes high availability, rocksdb and
>>>>> incremental checkpointing.
>>>>>
>>>>> Any tips are highly appreciated.
>>>>>
>>>>> Thanks,
>>>>> Lars
>>>>>
>>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>>>>> keyed state backend for
>>>>> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of 
>>>>> the
>>>>> 1 provided restore options.
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>>>>> ... 11 more
>>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>>>> Caught unexpected exception.
>>>>> at
>>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
>>>>> at
>>>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
>>>>> at
>>>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>>>> ... 13 more
>>>>> Caused by: java.io.FileNotFoundException: Item not found:
>>>>> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
>>>>> Note, it is possible that the live version is still available but the
>>>>> requested generation is deleted.
>>>>> at
>>>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best,
>>>> Hangxiang.
>>>>
>>>


Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-14 Thread Lars Skjærven
As far as I understand we are not specifying anything on restore mode. so I
guess default (NO_CLAIM) is what we're using.

We're using ververica platform to handle deploys, and things are a bit
obscure on what happens underneath.

It happened again this morning:

Caused by: java.io.FileNotFoundException: Item not found:
'gs://bucketname/namespace/flink-jobs/namespaces/default/jobs/fbdde9e7-cf5a-44b4-a3d4-d3ed517432a0/checkpoints/fbdde9e7cf5a44b4a3d4d3ed517432a0/shared/ae551eda-a588-45be-ba08-32bfbc50e965'.
Note, it is possible that the live version is still available but the
requested generation is deleted.


On Tue, Dec 13, 2022 at 11:37 PM Martijn Visser 
wrote:

> Hi Lars,
>
> Have you used any of the new restore modes that were introduced with 1.15?
> https://flink.apache.org/2022/05/06/restore-modes.html
>
> Best regards,
>
> Martijn
>
> On Fri, Dec 9, 2022 at 2:52 PM Lars Skjærven  wrote:
>
>> Lifecycle rulesNone
>>
>> On Fri, Dec 9, 2022 at 3:17 AM Hangxiang Yu  wrote:
>>
>>> Hi, Lars.
>>> Could you check whether you have configured the lifecycle of google
>>> cloud storage[1] which is not recommended in the flink checkpoint usage?
>>>
>>> [1] https://cloud.google.com/storage/docs/lifecycle
>>>
>>> On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven  wrote:
>>>
>>>> Hello,
>>>> We had an incident today with a job that could not restore after crash
>>>> (for unknown reason). Specifically, it fails due to a missing checkpoint
>>>> file. We've experienced this a total of three times with Flink 1.15.2, but
>>>> never with 1.14.x. Last time was during a node upgrade, but that was not
>>>> the case this time.
>>>>
>>>> I've not been able to reproduce this issue. I've checked that I can
>>>> kill the taskmanager and jobmanager (using kubectl delete pod), and the job
>>>> restores as expected.
>>>>
>>>> The job is running with kubernetes high availability, rocksdb and
>>>> incremental checkpointing.
>>>>
>>>> Any tips are highly appreciated.
>>>>
>>>> Thanks,
>>>> Lars
>>>>
>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>>>> keyed state backend for
>>>> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the
>>>> 1 provided restore options.
>>>> at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>>>> ... 11 more
>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>>> Caught unexpected exception.
>>>> at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
>>>> at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>>> at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>>> ... 13 more
>>>> Caused by: java.io.FileNotFoundException: Item not found:
>>>> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
>>>> Note, it is possible that the live version is still available but the
>>>> requested generation is deleted.
>>>> at
>>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)
>>>>
>>>>
>>>
>>> --
>>> Best,
>>> Hangxiang.
>>>
>>


Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-13 Thread Martijn Visser
Hi Lars,

Have you used any of the new restore modes that were introduced with 1.15?
https://flink.apache.org/2022/05/06/restore-modes.html

Best regards,

Martijn

On Fri, Dec 9, 2022 at 2:52 PM Lars Skjærven  wrote:

> Lifecycle rulesNone
>
> On Fri, Dec 9, 2022 at 3:17 AM Hangxiang Yu  wrote:
>
>> Hi, Lars.
>> Could you check whether you have configured the lifecycle of google cloud
>> storage[1] which is not recommended in the flink checkpoint usage?
>>
>> [1] https://cloud.google.com/storage/docs/lifecycle
>>
>> On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven  wrote:
>>
>>> Hello,
>>> We had an incident today with a job that could not restore after crash
>>> (for unknown reason). Specifically, it fails due to a missing checkpoint
>>> file. We've experienced this a total of three times with Flink 1.15.2, but
>>> never with 1.14.x. Last time was during a node upgrade, but that was not
>>> the case this time.
>>>
>>> I've not been able to reproduce this issue. I've checked that I can kill
>>> the taskmanager and jobmanager (using kubectl delete pod), and the job
>>> restores as expected.
>>>
>>> The job is running with kubernetes high availability, rocksdb and
>>> incremental checkpointing.
>>>
>>> Any tips are highly appreciated.
>>>
>>> Thanks,
>>> Lars
>>>
>>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>>> state backend for
>>> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the
>>> 1 provided restore options.
>>> at
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>>> ... 11 more
>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>> Caught unexpected exception.
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
>>> at
>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
>>> at
>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
>>> at
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>> at
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>> ... 13 more
>>> Caused by: java.io.FileNotFoundException: Item not found:
>>> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
>>> Note, it is possible that the live version is still available but the
>>> requested generation is deleted.
>>> at
>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)
>>>
>>>
>>
>> --
>> Best,
>> Hangxiang.
>>
>


Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-09 Thread Lars Skjærven
Lifecycle rulesNone

On Fri, Dec 9, 2022 at 3:17 AM Hangxiang Yu  wrote:

> Hi, Lars.
> Could you check whether you have configured the lifecycle of google cloud
> storage[1] which is not recommended in the flink checkpoint usage?
>
> [1] https://cloud.google.com/storage/docs/lifecycle
>
> On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven  wrote:
>
>> Hello,
>> We had an incident today with a job that could not restore after crash
>> (for unknown reason). Specifically, it fails due to a missing checkpoint
>> file. We've experienced this a total of three times with Flink 1.15.2, but
>> never with 1.14.x. Last time was during a node upgrade, but that was not
>> the case this time.
>>
>> I've not been able to reproduce this issue. I've checked that I can kill
>> the taskmanager and jobmanager (using kubectl delete pod), and the job
>> restores as expected.
>>
>> The job is running with kubernetes high availability, rocksdb and
>> incremental checkpointing.
>>
>> Any tips are highly appreciated.
>>
>> Thanks,
>> Lars
>>
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> state backend for
>> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the
>> 1 provided restore options.
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>> ... 11 more
>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>> Caught unexpected exception.
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
>> at
>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
>> at
>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> ... 13 more
>> Caused by: java.io.FileNotFoundException: Item not found:
>> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
>> Note, it is possible that the live version is still available but the
>> requested generation is deleted.
>> at
>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)
>>
>>
>
> --
> Best,
> Hangxiang.
>


Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-08 Thread Hangxiang Yu
Hi, Lars.
Could you check whether you have configured the lifecycle of google cloud
storage[1] which is not recommended in the flink checkpoint usage?

[1] https://cloud.google.com/storage/docs/lifecycle

On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven  wrote:

> Hello,
> We had an incident today with a job that could not restore after crash
> (for unknown reason). Specifically, it fails due to a missing checkpoint
> file. We've experienced this a total of three times with Flink 1.15.2, but
> never with 1.14.x. Last time was during a node upgrade, but that was not
> the case this time.
>
> I've not been able to reproduce this issue. I've checked that I can kill
> the taskmanager and jobmanager (using kubectl delete pod), and the job
> restores as expected.
>
> The job is running with kubernetes high availability, rocksdb and
> incremental checkpointing.
>
> Any tips are highly appreciated.
>
> Thanks,
> Lars
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the
> 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
> ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ... 13 more
> Caused by: java.io.FileNotFoundException: Item not found:
> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
> Note, it is possible that the live version is still available but the
> requested generation is deleted.
> at
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)
>
>

-- 
Best,
Hangxiang.


Could not restore keyed state backend for KeyedProcessOperator

2022-12-08 Thread Lars Skjærven
Hello,
We had an incident today with a job that could not restore after crash (for
unknown reason). Specifically, it fails due to a missing checkpoint file.
We've experienced this a total of three times with Flink 1.15.2, but never
with 1.14.x. Last time was during a node upgrade, but that was not the case
this time.

I've not been able to reproduce this issue. I've checked that I can kill
the taskmanager and jobmanager (using kubectl delete pod), and the job
restores as expected.

The job is running with kubernetes high availability, rocksdb and
incremental checkpointing.

Any tips are highly appreciated.

Thanks,
Lars

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for
KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the
1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
unexpected exception.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.io.FileNotFoundException: Item not found:
'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
Note, it is possible that the live version is still available but the
requested generation is deleted.
at
com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)


Re: Re: Will Flink loss some old Keyed State when changing the parallelism

2021-12-20 Thread Seth Wiesman
No. The default max parallelism of 128 will be applied. If you try to
restore above that value, the restore will fail and you can simply restore
at a smaller value.

No data loss.

On Mon, Dec 20, 2021 at 2:28 AM 杨浩  wrote:

>
> Thanks for your replay. If we don't set the max parallelism, and we change
> the parallelism to a very big num, will the state loss?
>
>
>
>
>
> At 2021-11-27 01:20:49, "Yun Tang"  wrote:
> >Hi Yang,
> >
> >Flink keeps the max key groups the same no matter how parallelism changes, 
> >and use this to avoid state data lost [1]
> >
> >[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
> >
> >
> >Best
> >Yun Tang
> >
> >On 2021/11/26 10:07:29 Nicolaus Weidner wrote:
> >> Hi,
> >>
> >> to rescale, you should take a savepoint, stop the job, then restart from
> >> the savepoint with your new desired parallelism. This way, no data will be
> >> lost.
> >>
> >> Best,
> >> Nico
> >>
> >> On Thu, Nov 25, 2021 at 10:53 AM 杨浩  wrote:
> >>
> >> > Will Flink loss some old Keyed State when changing the parallelism, like 
> >> > 2
> >> > -> 5, or 5->3?
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
>
>
>
>
>


回复:Re: Will Flink loss some old Keyed State when changing the parallelism

2021-12-20 Thread 杨浩



Thanks for your replay. If we don't set the max parallelism, and we change the 
parallelism to a very big num, will the state loss?











At 2021-11-27 01:20:49, "Yun Tang"  wrote:
>Hi Yang,
>
>Flink keeps the max key groups the same no matter how parallelism changes, and 
>use this to avoid state data lost [1]
>
>[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>
>
>Best
>Yun Tang
>
>On 2021/11/26 10:07:29 Nicolaus Weidner wrote:
>> Hi,
>> 
>> to rescale, you should take a savepoint, stop the job, then restart from
>> the savepoint with your new desired parallelism. This way, no data will be
>> lost.
>> 
>> Best,
>> Nico
>> 
>> On Thu, Nov 25, 2021 at 10:53 AM 杨浩  wrote:
>> 
>> > Will Flink loss some old Keyed State when changing the parallelism, like 2
>> > -> 5, or 5->3?
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> 


Re: Will Flink loss some old Keyed State when changing the parallelism

2021-11-26 Thread Yun Tang
Hi Yang,

Flink keeps the max key groups the same no matter how parallelism changes, and 
use this to avoid state data lost [1]

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html


Best
Yun Tang

On 2021/11/26 10:07:29 Nicolaus Weidner wrote:
> Hi,
> 
> to rescale, you should take a savepoint, stop the job, then restart from
> the savepoint with your new desired parallelism. This way, no data will be
> lost.
> 
> Best,
> Nico
> 
> On Thu, Nov 25, 2021 at 10:53 AM 杨浩  wrote:
> 
> > Will Flink loss some old Keyed State when changing the parallelism, like 2
> > -> 5, or 5->3?
> >
> >
> >
> >
> >
> >
> >
> 


Re: Will Flink loss some old Keyed State when changing the parallelism

2021-11-26 Thread Nicolaus Weidner
Hi,

to rescale, you should take a savepoint, stop the job, then restart from
the savepoint with your new desired parallelism. This way, no data will be
lost.

Best,
Nico

On Thu, Nov 25, 2021 at 10:53 AM 杨浩  wrote:

> Will Flink loss some old Keyed State when changing the parallelism, like 2
> -> 5, or 5->3?
>
>
>
>
>
>
>


Re: keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?

2021-11-25 Thread yidan zhao
当然不会。

杨浩  于2021年11月25日周四 下午6:09写道:

> keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?


keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?

2021-11-25 Thread 杨浩
keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?

Will Flink loss some old Keyed State when changing the parallelism

2021-11-25 Thread 杨浩
Will Flink loss some old Keyed State when changing the parallelism, like 2 -> 
5, or 5->3?





 

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Seth Wiesman
Hi Marc,

I think you will find this is less efficient than just using keyed state.
Remember state backends are local, reading and writing is extremely cheap.
HashMapStateBackend is just an in-memory data structure and
EmbeddedRocksDBStateBackend only works against local disk. Additionally,
the embedded rocksdb state backend already supports incremental
checkpointing, so when an asynchronous checkpoint does occur you are not
paying transfer cost on slow changing state values.

Seth



On Tue, Oct 12, 2021 at 10:12 AM Marc LEGER  wrote:

> Hello Nicolaus,
>
> Unfortunately, I don't really have the hand on the custom state solution
> since it is managed by an existing system which cannot be easily modified.
>
> What I finally did for the "data state" in my CoFlatMapFunction is to use a*
> list-style operator state* to store the partitioned state for a key per
> element in the list with an *union redistribution* scheme in case of
> restore/redistribution.
> Not sure if it's really efficient (need to do more tests) but all
> operators are then receiving the same whole custom state from which the
> partitioned state for the assigned keys can then be retrieved inside every
> operator parallel task besides the other keyed state (control state):
>
>
>
>
> *// Control state partitioned by userId (keyed state) private
> ValueState controlState; // Data state partitioned by userId
> (operator state) private ListState dataState;*
>
> To avoid "state explosion", I also added a custom TTL-based cleanup
> mechanism for this operator state to remove elements in the list which are
> not used for some time.
> However, I am still interested in any other better solution if available
> in Flink.
>
> Thank you for your help.
>
> Best Regards,
> Marc
>
>
> Le mar. 12 oct. 2021 à 09:02, Nicolaus Weidner <
> nicolaus.weid...@ververica.com> a écrit :
>
>> Hi Marc,
>>
>> thanks for clarifying, I had misunderstood some parts.
>> Unfortunately, I don't think there is a way to update keyed state (for
>> multiple keys even) outside of a keyed context.
>>
>> I will ask if someone else has an idea, but allow me to ask one
>> counter-question first: Did you actually run tests to verify that using the
>> custom state solution is more efficient than using Flink's keyed state
>> regularly (in the end, you would even have to include the state
>> synchronization in the performance test)? Efficient stateful stream
>> processing is one of the key features of Flink, and you are essentially
>> trying to override a specific piece of it with custom logic.
>>
>> Best regards,
>> Nico
>>
>> On Wed, Oct 6, 2021 at 5:50 PM Marc LEGER  wrote:
>>
>>> Hello Nicolaus,
>>>
>>> Thank you for your quick feedback, sorry if I am not clear enough.
>>> Actually in the documented example, the state which is updated in the
>>> snapshotState method is an operator state and not a keyed state:
>>>
>>> *public void initializeState(FunctionInitializationContext context)
>>> throws Exception {*
>>>
>>>
>>> *  [...]*
>>>
>>> *  countPerPartition =
>>> context.getOperatorStateStore().getOperatorState(new
>>> ListStateDescriptor<>("perPartitionCount", Long.class));*
>>>
>>>
>>>
>>>
>>> *  [...] } public void snapshotState(FunctionSnapshotContext context)
>>> throws Exception {*
>>>
>>>
>>> *  [...]*
>>>
>>> *  countPerPartition.add(localCount);*
>>>
>>> *}*
>>>
>>>
>>> It seems that the method is then only called once per operator parallel
>>> task and not once per key.
>>> On my side I have two keyed states with same key (e.g., userId) in a
>>> CoFlatMapFunction:
>>>
>>>
>>>
>>>
>>> *// Control state partitioned by userId private ValueState
>>> controlState; // Data state partitioned by userId coming from the
>>> ser/deserialization of a custom system having a partitioned state private
>>> ValueState dataState;*
>>>
>>> and I would like to do something like that to update dataState in a
>>> keyed context for every key and every checkpoint:
>>>
>>>
>>>
>>> *public void snapshotState(FunctionSnapshotContext context) throws
>>> Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
>>> // Not a keyed context here ! }*
>>>
>>> instead of saving dataState in the flatMap2 function for every received
>>> event:
>>>

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Marc LEGER
 Hello Nicolaus,

Unfortunately, I don't really have the hand on the custom state solution
since it is managed by an existing system which cannot be easily modified.

What I finally did for the "data state" in my CoFlatMapFunction is to use a*
list-style operator state* to store the partitioned state for a key per
element in the list with an *union redistribution* scheme in case of
restore/redistribution.
Not sure if it's really efficient (need to do more tests) but all operators
are then receiving the same whole custom state from which the partitioned
state for the assigned keys can then be retrieved inside every operator
parallel task besides the other keyed state (control state):




*// Control state partitioned by userId (keyed state) private
ValueState controlState; // Data state partitioned by userId
(operator state) private ListState dataState;*

To avoid "state explosion", I also added a custom TTL-based cleanup
mechanism for this operator state to remove elements in the list which are
not used for some time.
However, I am still interested in any other better solution if available in
Flink.

Thank you for your help.

Best Regards,
Marc


Le mar. 12 oct. 2021 à 09:02, Nicolaus Weidner <
nicolaus.weid...@ververica.com> a écrit :

> Hi Marc,
>
> thanks for clarifying, I had misunderstood some parts.
> Unfortunately, I don't think there is a way to update keyed state (for
> multiple keys even) outside of a keyed context.
>
> I will ask if someone else has an idea, but allow me to ask one
> counter-question first: Did you actually run tests to verify that using the
> custom state solution is more efficient than using Flink's keyed state
> regularly (in the end, you would even have to include the state
> synchronization in the performance test)? Efficient stateful stream
> processing is one of the key features of Flink, and you are essentially
> trying to override a specific piece of it with custom logic.
>
> Best regards,
> Nico
>
> On Wed, Oct 6, 2021 at 5:50 PM Marc LEGER  wrote:
>
>> Hello Nicolaus,
>>
>> Thank you for your quick feedback, sorry if I am not clear enough.
>> Actually in the documented example, the state which is updated in the
>> snapshotState method is an operator state and not a keyed state:
>>
>> *public void initializeState(FunctionInitializationContext context)
>> throws Exception {*
>>
>>
>> *  [...]*
>>
>> *  countPerPartition =
>> context.getOperatorStateStore().getOperatorState(new
>> ListStateDescriptor<>("perPartitionCount", Long.class));*
>>
>>
>>
>>
>> *  [...] } public void snapshotState(FunctionSnapshotContext context)
>> throws Exception {*
>>
>>
>> *  [...]*
>>
>> *  countPerPartition.add(localCount);*
>>
>> *}*
>>
>>
>> It seems that the method is then only called once per operator parallel
>> task and not once per key.
>> On my side I have two keyed states with same key (e.g., userId) in a
>> CoFlatMapFunction:
>>
>>
>>
>>
>> *// Control state partitioned by userId private ValueState
>> controlState; // Data state partitioned by userId coming from the
>> ser/deserialization of a custom system having a partitioned state private
>> ValueState dataState;*
>>
>> and I would like to do something like that to update dataState in a keyed
>> context for every key and every checkpoint:
>>
>>
>>
>> *public void snapshotState(FunctionSnapshotContext context) throws
>> Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
>> // Not a keyed context here ! }*
>>
>> instead of saving dataState in the flatMap2 function for every received
>> event:
>>
>>
>> *public void flatMap1(Control control, Collector out) {*
>>
>> *   controlState.update(control); *
>>
>> *}*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public void flatMap2(Event event, Collector out) {  //
>> Perform some event transformations based on controlState  ProcessedEvent
>> result = customSystem.process(controlState.value() , event);  // Save
>> internal custom system state after processing: can be costly if high event
>> throughput
>> dataState.update(customSystem.getSnapshot(controlState.value().getUserId());
>> // Output the processed event  out.collect(result); }*
>>
>>
>> So basically, I want to be able to synchronize the partitioned state of
>> my custom system with the checkpoints done by Flink.
>>
>>
>> Best Regards,
>> Marc
>>
>> Le mer. 6 oct. 2021 à 12:11, Nicolaus Weidner <
>>

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Nicolaus Weidner
Hi Marc,

thanks for clarifying, I had misunderstood some parts.
Unfortunately, I don't think there is a way to update keyed state (for
multiple keys even) outside of a keyed context.

I will ask if someone else has an idea, but allow me to ask one
counter-question first: Did you actually run tests to verify that using the
custom state solution is more efficient than using Flink's keyed state
regularly (in the end, you would even have to include the state
synchronization in the performance test)? Efficient stateful stream
processing is one of the key features of Flink, and you are essentially
trying to override a specific piece of it with custom logic.

Best regards,
Nico

On Wed, Oct 6, 2021 at 5:50 PM Marc LEGER  wrote:

> Hello Nicolaus,
>
> Thank you for your quick feedback, sorry if I am not clear enough.
> Actually in the documented example, the state which is updated in the
> snapshotState method is an operator state and not a keyed state:
>
> *public void initializeState(FunctionInitializationContext context) throws
> Exception {*
>
>
> *  [...]*
>
> *  countPerPartition =
> context.getOperatorStateStore().getOperatorState(new
> ListStateDescriptor<>("perPartitionCount", Long.class));*
>
>
>
>
> *  [...] } public void snapshotState(FunctionSnapshotContext context)
> throws Exception {*
>
>
> *  [...]*
>
> *  countPerPartition.add(localCount);*
>
> *}*
>
>
> It seems that the method is then only called once per operator parallel
> task and not once per key.
> On my side I have two keyed states with same key (e.g., userId) in a
> CoFlatMapFunction:
>
>
>
>
> *// Control state partitioned by userId private ValueState
> controlState; // Data state partitioned by userId coming from the
> ser/deserialization of a custom system having a partitioned state private
> ValueState dataState;*
>
> and I would like to do something like that to update dataState in a keyed
> context for every key and every checkpoint:
>
>
>
> *public void snapshotState(FunctionSnapshotContext context) throws
> Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
> // Not a keyed context here ! }*
>
> instead of saving dataState in the flatMap2 function for every received
> event:
>
>
> *public void flatMap1(Control control, Collector out) {*
>
> *   controlState.update(control); *
>
> *}*
>
>
>
>
>
>
>
>
>
> *public void flatMap2(Event event, Collector out) {  //
> Perform some event transformations based on controlState  ProcessedEvent
> result = customSystem.process(controlState.value() , event);  // Save
> internal custom system state after processing: can be costly if high event
> throughput
> dataState.update(customSystem.getSnapshot(controlState.value().getUserId());
> // Output the processed event  out.collect(result); }*
>
>
> So basically, I want to be able to synchronize the partitioned state of my
> custom system with the checkpoints done by Flink.
>
>
> Best Regards,
> Marc
>
> Le mer. 6 oct. 2021 à 12:11, Nicolaus Weidner <
> nicolaus.weid...@ververica.com> a écrit :
>
>> Hi Marc,
>>
>> I think you can just use keyed state in a
>> CheckpointedFunction. FunctionInitializationContext gives you access to
>> both keyed state and operator state (your stream needs to be keyed, of
>> course). So you could just update your local custom state on regular
>> invocations and update keyed state on calls to snapshotState.
>> Check out the example in [1] where both types of state are used.
>>
>> Does that help? Not sure if I understood the problem correctly.
>>
>> Best regards,
>> Nico
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java#L74-L110
>>
>> On Tue, Oct 5, 2021 at 3:28 PM Marc LEGER  wrote:
>>
>>> Hello,
>>>
>>> Is there any method available in a RichFunction to be called by Flink
>>> with a keyed context each time a checkpoint is triggered please ?
>>>
>>> It seems that the CheckpointedFunction interface provides such a feature
>>> (snapshotState method) but only in case of operator state and it is called
>>> in a non-keyed context.
>>>
>>> Indeed, I am implementing a CoFlatMapFunction with:
>>> - a keyed state (state1) for a "control" stream (stream1) which is not
>>> often updated,
>>> - a keyed state (state2) for a "data" stream (stream2) with a high
>>> throughput and relying on a custom solution for internal state snapshot
>>> with some potential performance impact.
>>>
>>> Consequently, I don't want to trigger a state2 update for every event
>>> received in stream2 for efficiency reasons but rather update state2 based
>>> on checkpoints triggered by Flink.
>>>
>>> Best Regards,
>>> Marc
>>>
>>>


Re: Snapshot method for custom keyed state checkpointing ?

2021-10-06 Thread Marc LEGER
Hello Nicolaus,

Thank you for your quick feedback, sorry if I am not clear enough.
Actually in the documented example, the state which is updated in the
snapshotState method is an operator state and not a keyed state:

*public void initializeState(FunctionInitializationContext context) throws
Exception {*


*  [...]*

*  countPerPartition = context.getOperatorStateStore().getOperatorState(new
ListStateDescriptor<>("perPartitionCount", Long.class));*




*  [...] } public void snapshotState(FunctionSnapshotContext context)
throws Exception {*


*  [...]*

*  countPerPartition.add(localCount);*

*}*


It seems that the method is then only called once per operator parallel
task and not once per key.
On my side I have two keyed states with same key (e.g., userId) in a
CoFlatMapFunction:




*// Control state partitioned by userId private ValueState
controlState; // Data state partitioned by userId coming from the
ser/deserialization of a custom system having a partitioned state private
ValueState dataState;*

and I would like to do something like that to update dataState in a keyed
context for every key and every checkpoint:



*public void snapshotState(FunctionSnapshotContext context) throws
Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
// Not a keyed context here ! }*

instead of saving dataState in the flatMap2 function for every received
event:


*public void flatMap1(Control control, Collector out) {*

*   controlState.update(control); *

*}*









*public void flatMap2(Event event, Collector out) {  //
Perform some event transformations based on controlState  ProcessedEvent
result = customSystem.process(controlState.value() , event);  // Save
internal custom system state after processing: can be costly if high event
throughput
dataState.update(customSystem.getSnapshot(controlState.value().getUserId());
// Output the processed event  out.collect(result); }*


So basically, I want to be able to synchronize the partitioned state of my
custom system with the checkpoints done by Flink.


Best Regards,
Marc

Le mer. 6 oct. 2021 à 12:11, Nicolaus Weidner <
nicolaus.weid...@ververica.com> a écrit :

> Hi Marc,
>
> I think you can just use keyed state in a
> CheckpointedFunction. FunctionInitializationContext gives you access to
> both keyed state and operator state (your stream needs to be keyed, of
> course). So you could just update your local custom state on regular
> invocations and update keyed state on calls to snapshotState.
> Check out the example in [1] where both types of state are used.
>
> Does that help? Not sure if I understood the problem correctly.
>
> Best regards,
> Nico
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java#L74-L110
>
> On Tue, Oct 5, 2021 at 3:28 PM Marc LEGER  wrote:
>
>> Hello,
>>
>> Is there any method available in a RichFunction to be called by Flink
>> with a keyed context each time a checkpoint is triggered please ?
>>
>> It seems that the CheckpointedFunction interface provides such a feature
>> (snapshotState method) but only in case of operator state and it is called
>> in a non-keyed context.
>>
>> Indeed, I am implementing a CoFlatMapFunction with:
>> - a keyed state (state1) for a "control" stream (stream1) which is not
>> often updated,
>> - a keyed state (state2) for a "data" stream (stream2) with a high
>> throughput and relying on a custom solution for internal state snapshot
>> with some potential performance impact.
>>
>> Consequently, I don't want to trigger a state2 update for every event
>> received in stream2 for efficiency reasons but rather update state2 based
>> on checkpoints triggered by Flink.
>>
>> Best Regards,
>> Marc
>>
>>


Re: Snapshot method for custom keyed state checkpointing ?

2021-10-06 Thread Nicolaus Weidner
Hi Marc,

I think you can just use keyed state in a
CheckpointedFunction. FunctionInitializationContext gives you access to
both keyed state and operator state (your stream needs to be keyed, of
course). So you could just update your local custom state on regular
invocations and update keyed state on calls to snapshotState.
Check out the example in [1] where both types of state are used.

Does that help? Not sure if I understood the problem correctly.

Best regards,
Nico

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java#L74-L110

On Tue, Oct 5, 2021 at 3:28 PM Marc LEGER  wrote:

> Hello,
>
> Is there any method available in a RichFunction to be called by Flink with
> a keyed context each time a checkpoint is triggered please ?
>
> It seems that the CheckpointedFunction interface provides such a feature
> (snapshotState method) but only in case of operator state and it is called
> in a non-keyed context.
>
> Indeed, I am implementing a CoFlatMapFunction with:
> - a keyed state (state1) for a "control" stream (stream1) which is not
> often updated,
> - a keyed state (state2) for a "data" stream (stream2) with a high
> throughput and relying on a custom solution for internal state snapshot
> with some potential performance impact.
>
> Consequently, I don't want to trigger a state2 update for every event
> received in stream2 for efficiency reasons but rather update state2 based
> on checkpoints triggered by Flink.
>
> Best Regards,
> Marc
>
>


Snapshot method for custom keyed state checkpointing ?

2021-10-05 Thread Marc LEGER
Hello,

Is there any method available in a RichFunction to be called by Flink with
a keyed context each time a checkpoint is triggered please ?

It seems that the CheckpointedFunction interface provides such a feature
(snapshotState method) but only in case of operator state and it is called
in a non-keyed context.

Indeed, I am implementing a CoFlatMapFunction with:
- a keyed state (state1) for a "control" stream (stream1) which is not
often updated,
- a keyed state (state2) for a "data" stream (stream2) with a high
throughput and relying on a custom solution for internal state snapshot
with some potential performance impact.

Consequently, I don't want to trigger a state2 update for every event
received in stream2 for efficiency reasons but rather update state2 based
on checkpoints triggered by Flink.

Best Regards,
Marc


Re: State processor API very slow reading a keyed state with RocksDB

2021-09-10 Thread David Causse
Thank you all for the great insights and suggestions!

I understand that the underlying components used by the state processor api
are sufficiently different that it may explain this slowness and this
behavior is not something caused by the way we use this API.

David.

On Fri, Sep 10, 2021 at 5:27 AM Yun Tang  wrote:

> Hi David,
>
> I think Seth had shared some useful information.
>
> If you want to know what happened within RocksDB when you're reading, you
> can leverage async-profiler [1] to catch the RocksDB stacks and I guess
> that index block might be evicted too frequently during your read. And we
> could use new read option which disable fillCache [2] to speedup bulk scan
> in the future to help improve the performance.
>
>
> Best
> Yun Tang
>
> [1] https://github.com/jvm-profiling-tools/async-profiler
> [2]
> https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/ReadOptions.html#setFillCache(boolean)
> --
> *From:* Seth Wiesman 
> *Sent:* Friday, September 10, 2021 0:58
> *To:* David Causse ; user 
> *Cc:* Piotr Nowojski 
> *Subject:* Re: State processor API very slow reading a keyed state with
> RocksDB
>
> Hi David,
>
> I was also able to reproduce the behavior, but was able to get
> significant performance improvements by reducing the number of slots on
> each TM to 1.
>
> My suspicion, as Piotr alluded to, has to do with the different runtime
> execution of DataSet over DataStream. In particular, Flink's DataStream
> operators are aware of the resource requirements of the state backend and
> include RocksDB in its internal memory configurations. In the state
> processor api, the underlying input format is a blackbox.
>
> Another thing to know is that when running multiple RocksDB instances
> within the same JVM, you are actually running a single native process with
> multiple logical instances. I _think_ we are seeing contention amongst the
> logical RocksDB instances.
>
> Even with one slot, it is not as fast as I would like and will need to
> continue investigating. If my suspicion for the slowness is correct, we
> will need to migrate to the new Source API and improve this as part of
> DataStream integration. This migration is something we'd like to do
> regardless, but I don't have a timeline to share.
>
> *Aside: Why is writing still relatively fast? *
>
> Even with these factors accounted for, I do still expect writing to be
> faster than reading. This is due to both how RocksDB internal data
> structures work, along with some peculiarities of how to state processor
> API has to perform reads.
>
> 1. RocksDB internally uses a data structure called a log structured merge
> tree (or LSM). This means writes are always implemented as appends, so
> there is no seek required. Additionally, writes go into an in-memory data
> structure called a MemTable that is flushed to disk asynchronously.
> Because there may be multiple entries for a given key, RocksDB needs to
> search for the most recent value and potentially read from disk. This may
> be alleviated by enabling bloom filters but does have memory costs.
>
> 2. RocksDB is a key value store, so Flink represents each registered state
> (ValueState, ListState, etc) as its own column family (table). A key only
> exists in a table if it has a non-null value. This means not all keys exist
> in all column families for a given operator. The state-proc-api wants to
> make it appear as if each operator is composed of a single table with
> multiple columns. To do this, we perform a full table scan on one column
> family and then do point lookups of that key on the others. However, we
> still need to find the keys that may only exist in other tables. The trick
> we perform is to delete keys from rocksDB after each read, so we can do
> full table scans on all column families but never see any duplicates. This
> means the reader is performing multiple reads and writes on every call to
> `readKey` and is more expensive than it may appear.
>
> Seth
>
>
> On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski 
> wrote:
>
> Hi David,
>
> I can confirm that I'm able to reproduce this behaviour. I've tried
> profiling/flame graphs and I was not able to make much sense out of those
> results. There are no IO/Memory bottlenecks that I could notice, it looks
> indeed like the Job is stuck inside RocksDB itself. This might be an issue
> with for example memory configuration. Streaming jobs and State Processor
> API are running in very different environments as the latter one is using
> DataSet API under the hood, so maybe that can explain this? However I'm no
> expert in neither DataSet API nor the RocksDB, so it's hard for me to make
> progress here.
>
&

Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Yun Tang
Hi David,

I think Seth had shared some useful information.

If you want to know what happened within RocksDB when you're reading, you can 
leverage async-profiler [1] to catch the RocksDB stacks and I guess that index 
block might be evicted too frequently during your read. And we could use new 
read option which disable fillCache [2] to speedup bulk scan in the future to 
help improve the performance.


Best
Yun Tang

[1] https://github.com/jvm-profiling-tools/async-profiler
[2] 
https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/ReadOptions.html#setFillCache(boolean)

From: Seth Wiesman 
Sent: Friday, September 10, 2021 0:58
To: David Causse ; user 
Cc: Piotr Nowojski 
Subject: Re: State processor API very slow reading a keyed state with RocksDB

Hi David,

I was also able to reproduce the behavior, but was able to get significant 
performance improvements by reducing the number of slots on each TM to 1.

My suspicion, as Piotr alluded to, has to do with the different runtime 
execution of DataSet over DataStream. In particular, Flink's DataStream 
operators are aware of the resource requirements of the state backend and 
include RocksDB in its internal memory configurations. In the state processor 
api, the underlying input format is a blackbox.

Another thing to know is that when running multiple RocksDB instances within 
the same JVM, you are actually running a single native process with multiple 
logical instances. I _think_ we are seeing contention amongst the logical 
RocksDB instances.

Even with one slot, it is not as fast as I would like and will need to continue 
investigating. If my suspicion for the slowness is correct, we will need to 
migrate to the new Source API and improve this as part of DataStream 
integration. This migration is something we'd like to do regardless, but I 
don't have a timeline to share.

Aside: Why is writing still relatively fast?

Even with these factors accounted for, I do still expect writing to be faster 
than reading. This is due to both how RocksDB internal data structures work, 
along with some peculiarities of how to state processor API has to perform 
reads.

1. RocksDB internally uses a data structure called a log structured merge tree 
(or LSM). This means writes are always implemented as appends, so there is no 
seek required. Additionally, writes go into an in-memory data structure called 
a MemTable that is flushed to disk asynchronously.  Because there may be 
multiple entries for a given key, RocksDB needs to search for the most recent 
value and potentially read from disk. This may be alleviated by enabling bloom 
filters but does have memory costs.

2. RocksDB is a key value store, so Flink represents each registered state 
(ValueState, ListState, etc) as its own column family (table). A key only 
exists in a table if it has a non-null value. This means not all keys exist in 
all column families for a given operator. The state-proc-api wants to make it 
appear as if each operator is composed of a single table with multiple columns. 
To do this, we perform a full table scan on one column family and then do point 
lookups of that key on the others. However, we still need to find the keys that 
may only exist in other tables. The trick we perform is to delete keys from 
rocksDB after each read, so we can do full table scans on all column families 
but never see any duplicates. This means the reader is performing multiple 
reads and writes on every call to `readKey` and is more expensive than it may 
appear.

Seth


On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski 
mailto:pnowoj...@apache.org>> wrote:
Hi David,

I can confirm that I'm able to reproduce this behaviour. I've tried 
profiling/flame graphs and I was not able to make much sense out of those 
results. There are no IO/Memory bottlenecks that I could notice, it looks 
indeed like the Job is stuck inside RocksDB itself. This might be an issue with 
for example memory configuration. Streaming jobs and State Processor API are 
running in very different environments as the latter one is using DataSet API 
under the hood, so maybe that can explain this? However I'm no expert in 
neither DataSet API nor the RocksDB, so it's hard for me to make progress here.

Maybe someone else can help here?

Piotrek


śr., 8 wrz 2021 o 14:45 David Causse 
mailto:dcau...@wikimedia.org>> napisał(a):
Hi,

I'm investigating why a job we use to inspect a flink state is a lot slower 
than the bootstrap job used to generate it.

I use RocksdbDB with a simple keyed value state mapping a string key to a long 
value. Generating the bootstrap state from a CSV file with 100M entries takes a 
couple minutes over 12 slots spread over 3 TM (4Gb allowed). But another job 
that does the opposite (converts this state into a CSV file) takes several 
hours. I would have expected these two job runtimes to be in the same ballpark.

I wrote a simple test case[1] to reproduce

Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Seth Wiesman
Hi David,

I was also able to reproduce the behavior, but was able to get
significant performance improvements by reducing the number of slots on
each TM to 1.

My suspicion, as Piotr alluded to, has to do with the different runtime
execution of DataSet over DataStream. In particular, Flink's DataStream
operators are aware of the resource requirements of the state backend and
include RocksDB in its internal memory configurations. In the state
processor api, the underlying input format is a blackbox.

Another thing to know is that when running multiple RocksDB instances
within the same JVM, you are actually running a single native process with
multiple logical instances. I _think_ we are seeing contention amongst the
logical RocksDB instances.

Even with one slot, it is not as fast as I would like and will need to
continue investigating. If my suspicion for the slowness is correct, we
will need to migrate to the new Source API and improve this as part of
DataStream integration. This migration is something we'd like to do
regardless, but I don't have a timeline to share.

*Aside: Why is writing still relatively fast? *

Even with these factors accounted for, I do still expect writing to be
faster than reading. This is due to both how RocksDB internal data
structures work, along with some peculiarities of how to state processor
API has to perform reads.

1. RocksDB internally uses a data structure called a log structured merge
tree (or LSM). This means writes are always implemented as appends, so
there is no seek required. Additionally, writes go into an in-memory data
structure called a MemTable that is flushed to disk asynchronously.
Because there may be multiple entries for a given key, RocksDB needs to
search for the most recent value and potentially read from disk. This may
be alleviated by enabling bloom filters but does have memory costs.

2. RocksDB is a key value store, so Flink represents each registered state
(ValueState, ListState, etc) as its own column family (table). A key only
exists in a table if it has a non-null value. This means not all keys exist
in all column families for a given operator. The state-proc-api wants to
make it appear as if each operator is composed of a single table with
multiple columns. To do this, we perform a full table scan on one column
family and then do point lookups of that key on the others. However, we
still need to find the keys that may only exist in other tables. The trick
we perform is to delete keys from rocksDB after each read, so we can do
full table scans on all column families but never see any duplicates. This
means the reader is performing multiple reads and writes on every call to
`readKey` and is more expensive than it may appear.

Seth


On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski  wrote:

> Hi David,
>
> I can confirm that I'm able to reproduce this behaviour. I've tried
> profiling/flame graphs and I was not able to make much sense out of those
> results. There are no IO/Memory bottlenecks that I could notice, it looks
> indeed like the Job is stuck inside RocksDB itself. This might be an issue
> with for example memory configuration. Streaming jobs and State Processor
> API are running in very different environments as the latter one is using
> DataSet API under the hood, so maybe that can explain this? However I'm no
> expert in neither DataSet API nor the RocksDB, so it's hard for me to make
> progress here.
>
> Maybe someone else can help here?
>
> Piotrek
>
>
> śr., 8 wrz 2021 o 14:45 David Causse  napisał(a):
>
>> Hi,
>>
>> I'm investigating why a job we use to inspect a flink state is a lot
>> slower than the bootstrap job used to generate it.
>>
>> I use RocksdbDB with a simple keyed value state mapping a string key to a
>> long value. Generating the bootstrap state from a CSV file with 100M
>> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
>> allowed). But another job that does the opposite (converts this state into
>> a CSV file) takes several hours. I would have expected these two job
>> runtimes to be in the same ballpark.
>>
>> I wrote a simple test case[1] to reproduce the problem. This program has
>> 3 jobs:
>> - CreateState: generate a keyed state (string->long) using the state
>> processor api
>> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
>> - ReadState: reads all the keys using the state processor api
>>
>> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
>> StreamJob are done in less than a minute.
>> ReadState is much slower (> 30minutes) on my system. The RocksDB state
>> appears to be restored relatively quickly but after that the slots are
>> performing at very different speeds. Some slots finish quickly but some
>> others struggle to advance

Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Piotr Nowojski
Hi David,

I can confirm that I'm able to reproduce this behaviour. I've tried
profiling/flame graphs and I was not able to make much sense out of those
results. There are no IO/Memory bottlenecks that I could notice, it looks
indeed like the Job is stuck inside RocksDB itself. This might be an issue
with for example memory configuration. Streaming jobs and State Processor
API are running in very different environments as the latter one is using
DataSet API under the hood, so maybe that can explain this? However I'm no
expert in neither DataSet API nor the RocksDB, so it's hard for me to make
progress here.

Maybe someone else can help here?

Piotrek


śr., 8 wrz 2021 o 14:45 David Causse  napisał(a):

> Hi,
>
> I'm investigating why a job we use to inspect a flink state is a lot
> slower than the bootstrap job used to generate it.
>
> I use RocksdbDB with a simple keyed value state mapping a string key to a
> long value. Generating the bootstrap state from a CSV file with 100M
> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
> allowed). But another job that does the opposite (converts this state into
> a CSV file) takes several hours. I would have expected these two job
> runtimes to be in the same ballpark.
>
> I wrote a simple test case[1] to reproduce the problem. This program has 3
> jobs:
> - CreateState: generate a keyed state (string->long) using the state
> processor api
> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
> - ReadState: reads all the keys using the state processor api
>
> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
> StreamJob are done in less than a minute.
> ReadState is much slower (> 30minutes) on my system. The RocksDB state
> appears to be restored relatively quickly but after that the slots are
> performing at very different speeds. Some slots finish quickly but some
> others struggle to advance.
> Looking at the thread dumps I always see threads in
> org.rocksdb.RocksDB.get:
>
> "DataSource (at readKeyedState(ExistingSavepoint.java:314)
> (org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
> RUNNABLE
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:2084)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)
>
> It seems suspiciously slow to me and I'm wondering if I'm missing
> something in the way the state processor api works.
>
> Thanks for your help!
>
> David.
>
> 1: https://github.com/nomoa/rocksdb-state-processor-test
>


State processor API very slow reading a keyed state with RocksDB

2021-09-08 Thread David Causse
Hi,

I'm investigating why a job we use to inspect a flink state is a lot slower
than the bootstrap job used to generate it.

I use RocksdbDB with a simple keyed value state mapping a string key to a
long value. Generating the bootstrap state from a CSV file with 100M
entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
allowed). But another job that does the opposite (converts this state into
a CSV file) takes several hours. I would have expected these two job
runtimes to be in the same ballpark.

I wrote a simple test case[1] to reproduce the problem. This program has 3
jobs:
- CreateState: generate a keyed state (string->long) using the state
processor api
- StreamJob: reads all the keys using a StreamingExecutionEnvironment
- ReadState: reads all the keys using the state processor api

Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
StreamJob are done in less than a minute.
ReadState is much slower (> 30minutes) on my system. The RocksDB state
appears to be restored relatively quickly but after that the slots are
performing at very different speeds. Some slots finish quickly but some
others struggle to advance.
Looking at the thread dumps I always see threads in org.rocksdb.RocksDB.get:

"DataSource (at readKeyedState(ExistingSavepoint.java:314)
(org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
RUNNABLE
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:2084)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)

It seems suspiciously slow to me and I'm wondering if I'm missing something
in the way the state processor api works.

Thanks for your help!

David.

1: https://github.com/nomoa/rocksdb-state-processor-test


Re: Delete Keyed State outside of StateTTL

2021-08-31 Thread JING ZHANG
Hi,
After you call `clear()`, you could be sure that they would not be returned
when you query the state under the key. The state would be removed.
The implication on memory occupation depends on which state backend.
Different state backend may have different implication here.
For example, if you use heap state backend, the entry would be removed from
heap directly, so the memory occupation would be reduced.

narasimha  于2021年8月31日周二 下午12:16写道:

> Thank JING.
>
> But I have a question here, what will happen to the keyed stream in that
> case?  Will it be removed automatically? or will be present but the state
> will be empty, in that case what is the implication on memory occupation?
>
> On Tue, Aug 31, 2021 at 8:14 AM JING ZHANG  wrote:
>
>> Hi,
>> All types of state also have a method clear() that clears the state for
>> the currently active key, i.e. the key of the input element.
>> Could we call the `clear()` method directly to remove the state under the
>> specified key?
>>
>> Best,
>> JING ZHANG
>>
>>
>> narasimha  于2021年8月31日周二 上午9:44写道:
>>
>>> Hi,
>>>
>>> I have a use case where the keyed state is managed (create, reset) by
>>> dynamically changing rules. New action "delete" has to be added.
>>> Delete is to completely delete the keyed state, same as how StateTTL
>>> does post expiration time.
>>>
>>> Use StateTTL?
>>>
>>> Initially used StateTTL, but it ended up in increasing the CPU usage
>>> even with low traffic.
>>> The state gets updated ~>50 times a second, and read multiple times in a
>>> second, so the time against state entry is getting updated often.
>>>
>>> So I didn't find it to be a viable solution.
>>>
>>> Can someone please help on how Keyed State can be removed outside of
>>> StateTTL.
>>>
>>> --
>>> A.Narasimha Swamy
>>>
>>
>
> --
> A.Narasimha Swamy
>


Re: Delete Keyed State outside of StateTTL

2021-08-30 Thread narasimha
Thank JING.

But I have a question here, what will happen to the keyed stream in that
case?  Will it be removed automatically? or will be present but the state
will be empty, in that case what is the implication on memory occupation?

On Tue, Aug 31, 2021 at 8:14 AM JING ZHANG  wrote:

> Hi,
> All types of state also have a method clear() that clears the state for
> the currently active key, i.e. the key of the input element.
> Could we call the `clear()` method directly to remove the state under the
> specified key?
>
> Best,
> JING ZHANG
>
>
> narasimha  于2021年8月31日周二 上午9:44写道:
>
>> Hi,
>>
>> I have a use case where the keyed state is managed (create, reset) by
>> dynamically changing rules. New action "delete" has to be added.
>> Delete is to completely delete the keyed state, same as how StateTTL does
>> post expiration time.
>>
>> Use StateTTL?
>>
>> Initially used StateTTL, but it ended up in increasing the CPU usage even
>> with low traffic.
>> The state gets updated ~>50 times a second, and read multiple times in a
>> second, so the time against state entry is getting updated often.
>>
>> So I didn't find it to be a viable solution.
>>
>> Can someone please help on how Keyed State can be removed outside of
>> StateTTL.
>>
>> --
>> A.Narasimha Swamy
>>
>

-- 
A.Narasimha Swamy


Re: Delete Keyed State outside of StateTTL

2021-08-30 Thread JING ZHANG
Hi,
All types of state also have a method clear() that clears the state for the
currently active key, i.e. the key of the input element.
Could we call the `clear()` method directly to remove the state under the
specified key?

Best,
JING ZHANG


narasimha  于2021年8月31日周二 上午9:44写道:

> Hi,
>
> I have a use case where the keyed state is managed (create, reset) by
> dynamically changing rules. New action "delete" has to be added.
> Delete is to completely delete the keyed state, same as how StateTTL does
> post expiration time.
>
> Use StateTTL?
>
> Initially used StateTTL, but it ended up in increasing the CPU usage even
> with low traffic.
> The state gets updated ~>50 times a second, and read multiple times in a
> second, so the time against state entry is getting updated often.
>
> So I didn't find it to be a viable solution.
>
> Can someone please help on how Keyed State can be removed outside of
> StateTTL.
>
> --
> A.Narasimha Swamy
>


Delete Keyed State outside of StateTTL

2021-08-30 Thread narasimha
Hi,

I have a use case where the keyed state is managed (create, reset) by
dynamically changing rules. New action "delete" has to be added.
Delete is to completely delete the keyed state, same as how StateTTL does
post expiration time.

Use StateTTL?

Initially used StateTTL, but it ended up in increasing the CPU usage even
with low traffic.
The state gets updated ~>50 times a second, and read multiple times in a
second, so the time against state entry is getting updated often.

So I didn't find it to be a viable solution.

Can someone please help on how Keyed State can be removed outside of
StateTTL.

-- 
A.Narasimha Swamy


Re: Is keyed state supported in PyFlink?

2021-05-05 Thread Sumeet Malhotra
Thanks Dian. Yes, I hadn't looked at the 1.13.0 documentation earlier.

On Wed, May 5, 2021 at 1:46 PM Dian Fu  wrote:

> Hi Sumeet,
>
> This feature is supported in 1.13.0 which was just released and so there
> is no documentation about it in 1.12.
>
> Regards,
> Dian
>
> 2021年5月4日 上午2:09,Sumeet Malhotra  写道:
>
> Hi,
>
> Is keyed state [1] supported by PyFlink yet? I can see some code for it in
> the Flink master branch, but there's no mention of it in the 1.12 Python
> documentation.
>
> Thanks,
> Sumeet
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html
>
>
>


Re: Is keyed state supported in PyFlink?

2021-05-05 Thread Dian Fu
Hi Sumeet,

This feature is supported in 1.13.0 which was just released and so there is no 
documentation about it in 1.12.

Regards,
Dian

> 2021年5月4日 上午2:09,Sumeet Malhotra  写道:
> 
> Hi,
> 
> Is keyed state [1] supported by PyFlink yet? I can see some code for it in 
> the Flink master branch, but there's no mention of it in the 1.12 Python 
> documentation.
> 
> Thanks,
> Sumeet
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html>
> 



Is keyed state supported in PyFlink?

2021-05-03 Thread Sumeet Malhotra
Hi,

Is keyed state [1] supported by PyFlink yet? I can see some code for it in
the Flink master branch, but there's no mention of it in the 1.12 Python
documentation.

Thanks,
Sumeet

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html


Re: How to report metric based on keyed state piece

2021-02-17 Thread Salva Alcántara
Awesome Piotr!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to report metric based on keyed state piece

2021-02-17 Thread Salva Alcántara
Many thanks Kezhu for pointing me on that direction!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to report metric based on keyed state piece

2021-02-17 Thread Piotr Nowojski
Hi Salva,

I'm not sure, but I think you can not access the state (especially the
keyed state) from within the metric, as metrics are being evaluated outside
of the keyed context, and also from another thread. Also things like
`ValueState`/`MapState` are not exposing any size.

So probably you would have to follow Kezhu's suggestion. Whenever you are
updating your state value, you can also update a shared variable to track
the combined size (`AtomicLong`?). Upon recovery you would need to
reinitialize it (maybe indeed `KeyedStateBackend.applyToAllKeys`).

Piotrek



śr., 17 lut 2021 o 14:13 Kezhu Wang  napisał(a):

> With an initial `y`, I think you could compute new `y` on new stream
> value. Upon recovering from checkpoint, may be
> `KeyedStateBackend.applyToAllKeys` could help you to rebuild an initial `y`.
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 13:09:39, Salva Alcántara (salcantara...@gmail.com)
> wrote:
>
> I wonder what is the canonical way to accomplish the following:
>
> Given a Flink UDF, how to report a metric `y` which is a function of some
> (keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
> the size of the state X.
>
> For instance, consider a `CoFlatMap` function with:
>
> - `X` being a `MapState`
> - `y` (the metric) consisting of the aggregated size (i.e., the total size
> of the `MapState`, for all keys)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: How to report metric based on keyed state piece

2021-02-17 Thread Kezhu Wang
With an initial `y`, I think you could compute new `y` on new stream value.
Upon recovering from checkpoint, may be `KeyedStateBackend.applyToAllKeys`
could help you to rebuild an initial `y`.

Best,
Kezhu Wang

On February 17, 2021 at 13:09:39, Salva Alcántara (salcantara...@gmail.com)
wrote:

I wonder what is the canonical way to accomplish the following:

Given a Flink UDF, how to report a metric `y` which is a function of some
(keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
the size of the state X.

For instance, consider a `CoFlatMap` function with:

- `X` being a `MapState`
- `y` (the metric) consisting of the aggregated size (i.e., the total size
of the `MapState`, for all keys)



-- 
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to report metric based on keyed state piece

2021-02-16 Thread Salva Alcántara
I wonder what is the canonical way to accomplish the following:

Given a Flink UDF, how to report a metric `y` which is a function of some
(keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
the size of the state X.

For instance, consider a `CoFlatMap` function with:

- `X` being a `MapState`
- `y` (the metric) consisting of the aggregated size (i.e., the total size
of the `MapState`, for all keys)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How long will keyed state exist if no TTL given?

2020-12-08 Thread Marco Villalobos
Thank you for the clarification.

On Tue, Dec 8, 2020 at 8:14 AM Khachatryan Roman
 wrote:
>
> Hi Marco,
>
> Yes, if TTL is not configured then the state will never expire (will stay 
> forever until deleted explicitly).
>
> Regards,
> Roman
>
>
> On Tue, Dec 8, 2020 at 5:09 PM Marco Villalobos  
> wrote:
>>
>> After reading
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html
>>
>> It is unclear to me how long keyed state will exist if it has no TTL.
>> Is it cached forever, unless explicitly cleared or overwritten?
>>
>> can somebody please explain to me?
>>
>> Thank you.


Re: How long will keyed state exist if no TTL given?

2020-12-08 Thread Khachatryan Roman
Hi Marco,

Yes, if TTL is not configured then the state will never expire (will stay
forever until deleted explicitly).

Regards,
Roman


On Tue, Dec 8, 2020 at 5:09 PM Marco Villalobos 
wrote:

> After reading
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html
>
> It is unclear to me how long keyed state will exist if it has no TTL.
> Is it cached forever, unless explicitly cleared or overwritten?
>
> can somebody please explain to me?
>
> Thank you.
>


How long will keyed state exist if no TTL given?

2020-12-08 Thread Marco Villalobos
After reading

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html

It is unclear to me how long keyed state will exist if it has no TTL.
Is it cached forever, unless explicitly cleared or overwritten?

can somebody please explain to me?

Thank you.


Re: Adding keyed state to test harness before calling process function.

2020-11-15 Thread Guowei Ma
Hi, Macro
I do not find the direct way for doing this.(Maybe other guys know)

A possible way might  be
1. Build the expected keyed state and get the `OperatorSubtaskState` from
an `xxOperatorTestHarness`.
2. Use the `OperatorSubtaskState` to initialize the `xxOperatorTestHarness`
that is needed to be tested.

Best,
Guowei


On Fri, Nov 13, 2020 at 12:43 PM Marco Villalobos 
wrote:

> Thank you.. I looked into that, but that does not initialize any values in
> keyed state, instead, it using key state, and lines 407-412 show that is
> not setting key state values in advanced, handling null values when it is
> not set in advance.
>
> public void processElement(String value, Context ctx, Collector
> out) throws Exception {
> Integer oldCount = counterState.value();
> Integer newCount = oldCount != null ? oldCount + 1 : 1;
> counterState.update(newCount);
> out.collect(newCount);
> }
>
> What I mean by initialize keyed state, is that I want to call
> processElement with values already existing in it.
>
> On Thu, Nov 12, 2020 at 7:52 PM Guowei Ma  wrote:
>
>> Hi, Macro
>> I think you could look at testScalingUp() at
>> flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
>> Best,
>> Guowei
>>
>>
>> On Fri, Nov 13, 2020 at 10:36 AM Marco Villalobos <
>> mvillalo...@kineteque.com> wrote:
>>
>>> Hi,
>>>
>>> I would like to adding keyed state to test harness before calling
>>> process function.
>>>
>>> I am using the OneInputStreamOperatorTestHarness.
>>>
>>> I can't find any examples online on how to do that, and I am struggling
>>> to figure this out.
>>>
>>> Can somebody please provide guidance?  My test case has keyed state
>>> pre-populated as one of its pre-conditions.
>>>
>>> Thank you.  Sincerely,
>>>
>>> Marco
>>
>>


Re: Adding keyed state to test harness before calling process function.

2020-11-12 Thread Marco Villalobos
Thank you.. I looked into that, but that does not initialize any values in
keyed state, instead, it using key state, and lines 407-412 show that is
not setting key state values in advanced, handling null values when it is
not set in advance.

public void processElement(String value, Context ctx, Collector
out) throws Exception {
Integer oldCount = counterState.value();
Integer newCount = oldCount != null ? oldCount + 1 : 1;
counterState.update(newCount);
out.collect(newCount);
}

What I mean by initialize keyed state, is that I want to call
processElement with values already existing in it.

On Thu, Nov 12, 2020 at 7:52 PM Guowei Ma  wrote:

> Hi, Macro
> I think you could look at testScalingUp() at
> flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
> Best,
> Guowei
>
>
> On Fri, Nov 13, 2020 at 10:36 AM Marco Villalobos <
> mvillalo...@kineteque.com> wrote:
>
>> Hi,
>>
>> I would like to adding keyed state to test harness before calling process
>> function.
>>
>> I am using the OneInputStreamOperatorTestHarness.
>>
>> I can't find any examples online on how to do that, and I am struggling
>> to figure this out.
>>
>> Can somebody please provide guidance?  My test case has keyed state
>> pre-populated as one of its pre-conditions.
>>
>> Thank you.  Sincerely,
>>
>> Marco
>
>


Re: Adding keyed state to test harness before calling process function.

2020-11-12 Thread Guowei Ma
Hi, Macro
I think you could look at testScalingUp() at
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
Best,
Guowei


On Fri, Nov 13, 2020 at 10:36 AM Marco Villalobos 
wrote:

> Hi,
>
> I would like to adding keyed state to test harness before calling process
> function.
>
> I am using the OneInputStreamOperatorTestHarness.
>
> I can't find any examples online on how to do that, and I am struggling to
> figure this out.
>
> Can somebody please provide guidance?  My test case has keyed state
> pre-populated as one of its pre-conditions.
>
> Thank you.  Sincerely,
>
> Marco


Adding keyed state to test harness before calling process function.

2020-11-12 Thread Marco Villalobos
Hi,

I would like to adding keyed state to test harness before calling process 
function.

I am using the OneInputStreamOperatorTestHarness.

I can't find any examples online on how to do that, and I am struggling to 
figure this out.

Can somebody please provide guidance?  My test case has keyed state 
pre-populated as one of its pre-conditions.

Thank you.  Sincerely, 

Marco

回复: why we need keyed state and operate state when we already have checkpoint?

2020-10-13 Thread 大森林
So
state:
store the result of some operator(such as keyby,map)


Checkpoint:
store the last result when the program is running OK.


Am I right?
Thanks for your help~!




--原始邮件--
发件人:
"Congxian Qiu"  
  
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html
Best,

Congxian









大森林 https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions


On Wed, Oct 7, 2020 at 6:51 AM 大森林 

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread Congxian Qiu
Hi
As others said, state is different as checkpoint.  a checkpoint is just
a **snapshot** of the state, and you can restore from the previous
checkpoint if the job crashed.

state is for stateful computation, and checkpoint is for
fault-tolerant[1]

The state keeps the information you'll need in the future. Take
wordcount as an example, the count of the word depends on the total count
of the word we have seen, we need to keep the "total count of the word have
seen before" somewhere, in Flink you can keep it in the state.
checkpoint/savepoint contains the **snapshot** of all the state, if
there is not state, then the checkpoint will be *empty*, you can restore
from it, but the content is empty.

PS: maybe you don't create state explicit, but there contain some
states in Flink(such as WindowOperator)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html
Best,
Congxian


大森林  于2020年10月12日周一 下午9:26写道:

> Thanks for your replies.
> When I use no state-relevant code in my program,the checkingpoint can be
> saved and resumed.❶
>
> So then why we need *Keyed State/Operator State/Stateful Function*?❷
> *"the operators are reset to the time of the respective checkpoint."*
> We already have met the requirement:*"resume from checkpoint(last state
> of each operator which store the result)"*❶,
> why we still need ❷?
> Thanks for your help~!
>
>
>
> -- 原始邮件 --
> *发件人:* "Arvid Heise" ;
> *发送时间:* 2020年10月12日(星期一) 下午2:53
> *收件人:* "大森林";
> *抄送:* "Shengkai Fang";"user";
> *主题:* Re: why we need keyed state and operate state when we already have
> checkpoint?
>
> Hi 大森林,
>
> You can always resume from checkpoints independent of the usage of keyed
> or non-keyed state of operators.
> 1 checkpoint contains the state of all operators at a given point in time.
> Each operator may have keyed state, raw state, or non-keyed state.
> As long as you are not changing the operators (too much) before
> restarting, you can always restart.
>
> During (automatic) restart of a Flink application, the state of a given
> checkpoint is restored to the operators, such that it looks like the
> operator never failed. However, the operators are reset to the time of the
> respective checkpoint.
>
> I have no clue what you mean with "previous variable temporary result".
>
> On Wed, Oct 7, 2020 at 9:13 AM 大森林  wrote:
>
>> Thanks for your replies,I have some understandings.
>>
>> There are two cases.
>> 1. if I use no keyed state in program,when it's killed,I can only resume
>> from previous result
>> 1. if I use  keyed state in program,when it's killed,I can
>>  resume from previous result and previous variable temporary result.
>>
>> Am I right?
>> Thanks for your guide.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Arvid Heise" ;
>> *发送时间:* 2020年10月7日(星期三) 下午2:25
>> *收件人:* "大森林";
>> *抄送:* "Shengkai Fang";"user";
>> *主题:* Re: why we need keyed state and operate state when we already have
>> checkpoint?
>>
>> I think there is some misunderstanding here: a checkpoint IS (a snapshot
>> of) the keyed state and operator state (among a few more things). [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions
>>
>> On Wed, Oct 7, 2020 at 6:51 AM 大森林  wrote:
>>
>>> when the job is killed,state is also misssing.
>>> so why we need keyed state?Is keyed state useful when we try to resuming
>>> the killed job?
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Shengkai Fang" ;
>>> *发送时间:* 2020年10月7日(星期三) 中午12:43
>>> *收件人:* "大森林";
>>> *抄送:* "user";
>>> *主题:* Re: why we need keyed state and operate state when we already
>>> have checkpoint?
>>>
>>> The checkpoint is a snapshot for the job and we can resume the job if
>>> the job is killed unexpectedly. The state is another thing to memorize the
>>> intermediate result of calculation. I don't think the checkpoint can
>>> replace state.
>>>
>>> 大森林  于2020年10月7日周三 下午12:26写道:
>>>
>>>> Could you tell me:
>>>>
>>>> why we need keyed state and operator state when we already have
>>>> checkpoint?
>>>>
>>>> when a running jar crash,we can resume from the checkpoint
>>>> automatically/manually.
>>>> So why did we still nee

回复: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread 大森林
Thanks for your replies.
When I use no state-relevant code in my program,the checkingpoint can be saved 
and resumed.❶


So then why we needKeyed State/Operator State/Stateful Function?❷
"the operators are reset to the time of the respective checkpoint."
We already have met the requirement:"resume from checkpoint(last state of each 
operator which store the result)"❶,
why we still need❷?
Thanks for your help~!






--原始邮件--
发件人:
"Arvid Heise"   
 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions


On Wed, Oct 7, 2020 at 6:51 AM 大森林 

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread Arvid Heise
Hi 大森林,

You can always resume from checkpoints independent of the usage of keyed or
non-keyed state of operators.
1 checkpoint contains the state of all operators at a given point in time.
Each operator may have keyed state, raw state, or non-keyed state.
As long as you are not changing the operators (too much) before restarting,
you can always restart.

During (automatic) restart of a Flink application, the state of a given
checkpoint is restored to the operators, such that it looks like the
operator never failed. However, the operators are reset to the time of the
respective checkpoint.

I have no clue what you mean with "previous variable temporary result".

On Wed, Oct 7, 2020 at 9:13 AM 大森林  wrote:

> Thanks for your replies,I have some understandings.
>
> There are two cases.
> 1. if I use no keyed state in program,when it's killed,I can only resume
> from previous result
> 1. if I use  keyed state in program,when it's killed,I can
>  resume from previous result and previous variable temporary result.
>
> Am I right?
> Thanks for your guide.
>
>
> -- 原始邮件 --
> *发件人:* "Arvid Heise" ;
> *发送时间:* 2020年10月7日(星期三) 下午2:25
> *收件人:* "大森林";
> *抄送:* "Shengkai Fang";"user";
> *主题:* Re: why we need keyed state and operate state when we already have
> checkpoint?
>
> I think there is some misunderstanding here: a checkpoint IS (a snapshot
> of) the keyed state and operator state (among a few more things). [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions
>
> On Wed, Oct 7, 2020 at 6:51 AM 大森林  wrote:
>
>> when the job is killed,state is also misssing.
>> so why we need keyed state?Is keyed state useful when we try to resuming
>> the killed job?
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Shengkai Fang" ;
>> *发送时间:* 2020年10月7日(星期三) 中午12:43
>> *收件人:* "大森林";
>> *抄送:* "user";
>> *主题:* Re: why we need keyed state and operate state when we already have
>> checkpoint?
>>
>> The checkpoint is a snapshot for the job and we can resume the job if the
>> job is killed unexpectedly. The state is another thing to memorize the
>> intermediate result of calculation. I don't think the checkpoint can
>> replace state.
>>
>> 大森林  于2020年10月7日周三 下午12:26写道:
>>
>>> Could you tell me:
>>>
>>> why we need keyed state and operator state when we already have
>>> checkpoint?
>>>
>>> when a running jar crash,we can resume from the checkpoint
>>> automatically/manually.
>>> So why did we still need keyed state and operator state.
>>>
>>> Thanks
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


?????? why we need keyed state and operate state when we already have checkpoint?

2020-10-07 Thread ??????
Thanks for your replies,I have some understandings.


There are two cases.
1. if I use no keyed state in program,when it's killed,I can only resume from 
previous result
1. if I use   keyed state in program,when it's killed,I 
canresume from previous result and previous 
variable temporary result.


Am I right?
Thanks for your guide.




----
??: 
   "Arvid Heise"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions


On Wed, Oct 7, 2020 at 6:51 AM ?? 

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-07 Thread Arvid Heise
I think there is some misunderstanding here: a checkpoint IS (a snapshot
of) the keyed state and operator state (among a few more things). [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions

On Wed, Oct 7, 2020 at 6:51 AM 大森林  wrote:

> when the job is killed,state is also misssing.
> so why we need keyed state?Is keyed state useful when we try to resuming
> the killed job?
>
>
> -- 原始邮件 --
> *发件人:* "Shengkai Fang" ;
> *发送时间:* 2020年10月7日(星期三) 中午12:43
> *收件人:* "大森林";
> *抄送:* "user";
> *主题:* Re: why we need keyed state and operate state when we already have
> checkpoint?
>
> The checkpoint is a snapshot for the job and we can resume the job if the
> job is killed unexpectedly. The state is another thing to memorize the
> intermediate result of calculation. I don't think the checkpoint can
> replace state.
>
> 大森林  于2020年10月7日周三 下午12:26写道:
>
>> Could you tell me:
>>
>> why we need keyed state and operator state when we already have
>> checkpoint?
>>
>> when a running jar crash,we can resume from the checkpoint
>> automatically/manually.
>> So why did we still need keyed state and operator state.
>>
>> Thanks
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


?????? why we need keyed state and operate state when we already have checkpoint?

2020-10-06 Thread ??????
when the job is killed,state is also misssing.
so why we need keyed state?Is keyed state useful when we try to resuming the 
killed job?




----
??: 
   "Shengkai Fang"  
  


Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-06 Thread Shengkai Fang
The checkpoint is a snapshot for the job and we can resume the job if the
job is killed unexpectedly. The state is another thing to memorize the
intermediate result of calculation. I don't think the checkpoint can
replace state.

大森林  于2020年10月7日周三 下午12:26写道:

> Could you tell me:
>
> why we need keyed state and operator state when we already have checkpoint?
>
> when a running jar crash,we can resume from the checkpoint
> automatically/manually.
> So why did we still need keyed state and operator state.
>
> Thanks
>


why we need keyed state and operate state when we already have checkpoint?

2020-10-06 Thread ??????
Could you tell me:


why we need keyed state and operator state when we already have checkpoint?

when a running jar crash,we can resume from the checkpoint 
automatically/manually.
So why did we still need keyed state and operator state.


Thanks

Re: Monitor the usage of keyed state

2020-08-26 Thread Yun Tang
Hi Mu

I want to share something more about the memory usage of RocksDB.

If you enable managed memory for rocksDB (which is enabled by default) [1], you 
should refer to the block cache usage as we cast index & filter into cache and 
counting write buffer usage in cache.
We could refer to the usage of block cache [2] to know the overall memory usage 
of RocksDB.
BTW, since we use the same cache for rocksDB instances within one slot, you 
might notice that all rocksDBs in the same slot would report the same block 
cache usage, please not sum them up.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#state-backend-rocksdb-memory-managed
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#state-backend-rocksdb-metrics-block-cache-usage

Best,
Yun Tang

From: Andrey Zagrebin 
Sent: Tuesday, August 25, 2020 22:12
To: Mu Kong 
Cc: flink-u...@apache.org 
Subject: Re: Monitor the usage of keyed state

Hi Mu,

I would suggest to look into RocksDB metrics which you can enable as Flink 
metrics [1]

Best,
Andrey

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#rocksdb-native-metrics

On Fri, Aug 21, 2020 at 4:27 AM Mu Kong 
mailto:kong.mu@gmail.com>> wrote:
Hi community,

I have a Flink job running with RichMapFunction that uses keyed state.
Although the TTL is enabled, I wonder if there is a way that I can monitor the 
memory usage of the keyed state. I'm using RocksDB as the state backend.

Best regards,
Mu


Re: Monitor the usage of keyed state

2020-08-25 Thread Andrey Zagrebin
Hi Mu,

I would suggest to look into RocksDB metrics which you can enable as Flink
metrics [1]

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#rocksdb-native-metrics

On Fri, Aug 21, 2020 at 4:27 AM Mu Kong  wrote:

> Hi community,
>
> I have a Flink job running with RichMapFunction that uses keyed state.
> Although the TTL is enabled, I wonder if there is a way that I can monitor
> the memory usage of the keyed state. I'm using RocksDB as the state backend.
>
> Best regards,
> Mu
>


Monitor the usage of keyed state

2020-08-20 Thread Mu Kong
Hi community,

I have a Flink job running with RichMapFunction that uses keyed state.
Although the TTL is enabled, I wonder if there is a way that I can monitor
the memory usage of the keyed state. I'm using RocksDB as the state backend.

Best regards,
Mu


Re: Using managed keyed state with AsynIo

2020-08-14 Thread Arvid Heise
Hi Kristoff,

the answer to your big questions is unfortunately no, two times. I see two
options in general:

1) process function (as you proposed). On processElement, you'd read the
state and invoke your async operation. You enqueue your result in some
result queue where you emit it in the next call of processElement. To deal
with rare keys, you'd probably also want to use a timer to flush the
outputs instead. In the timer/next processElement, you can also access the
key state. However, you also need to ensure that these pending results are
snapshotted, such that they are not lost on crash. I'd expect that you can
mix ProcessFunction and CheckpointedFunction, but haven't done it yet
myself.

2) implement your own operator, where you can start by copying or
subclassing the existing AsyncWaitOperator [1]. One thing that you need to
look out for is to access the state and output collector only in mailbox
thread (=main task thread). You can use mailboxExecutor to execute a piece
of code in the mailbox.

Even if you go by 1), have a look at the AsyncWaitOperator as it should
serve as a good template.

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java

On Fri, Aug 14, 2020 at 12:14 PM KristoffSC 
wrote:

> Thanks Arvid,
> I like your propositions in my case I wanted to use the state value to
> decide if I should do the Async Call to external system. The result of this
> call would be a state input. So having this:
>
> Process1(calcualteValue or take it from state) -> AsyncCall to External
> system to persist/Validate the value -> Process2(feedback loop Via
> meessagibg queue to process1).
>
> Apart from that Process1 would have to consume two streams, which is ok, I
> woudl actually have a delay. I wanted to avouid uneceserry calls to
> External
> system by having the cashed/validated value in state.
>
> And this would be done without the delay if I could use State in Async
> Operators.
>
>
> I'm finking bout manufacturing my own Semi Async Operator. My Idea is that
> I
> would have normal KeyedProcessFunction that will wrap list of
> SingleThreadExecutors.
>
> In processElement method I will use Key to calculate the index of that
> Array
> to make sure that message for same Key will go to the same ThreadExecutor.
> I
> do want to keep the message order.
>
> I will submit a task like
> executor.submit(() -> {
> MyResult result = rservice.process(message, mapState.get(key));
> mapState.put(key, result);
> out.collect(newMessage);
> }
>
>
>
> Big questions:
> 1. In my solution  out.collect(newMessage); will be called from few threads
> (each will have different message). Is it ThreadSafe?
> 2. Is using the MapState in multiThreadEnv like I would have here is thread
> safe?
> Alternativelly I can have associate list of mapStates, one for each
> SingleThreadExecutors, so It will be used only by one thread.
>
> With this setup I will not block my Pipeline and I will be able to use
> state. I agree that Size of SingleThreadExecutors list will be a limiting
> factor.
>
>
> Is this setup possible with Flink?
>
>
> Btw I will use RocksDbStateBackend
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Using managed keyed state with AsynIo

2020-08-14 Thread KristoffSC
Thanks Arvid,
I like your propositions in my case I wanted to use the state value to
decide if I should do the Async Call to external system. The result of this
call would be a state input. So having this:

Process1(calcualteValue or take it from state) -> AsyncCall to External
system to persist/Validate the value -> Process2(feedback loop Via
meessagibg queue to process1).

Apart from that Process1 would have to consume two streams, which is ok, I
woudl actually have a delay. I wanted to avouid uneceserry calls to External
system by having the cashed/validated value in state. 

And this would be done without the delay if I could use State in Async
Operators. 


I'm finking bout manufacturing my own Semi Async Operator. My Idea is that I
would have normal KeyedProcessFunction that will wrap list of
SingleThreadExecutors.

In processElement method I will use Key to calculate the index of that Array
to make sure that message for same Key will go to the same ThreadExecutor. I
do want to keep the message order.

I will submit a task like
executor.submit(() -> {
MyResult result = rservice.process(message, mapState.get(key));
mapState.put(key, result);
out.collect(newMessage);
}



Big questions:
1. In my solution  out.collect(newMessage); will be called from few threads
(each will have different message). Is it ThreadSafe?
2. Is using the MapState in multiThreadEnv like I would have here is thread
safe?
Alternativelly I can have associate list of mapStates, one for each
SingleThreadExecutors, so It will be used only by one thread.

With this setup I will not block my Pipeline and I will be able to use
state. I agree that Size of SingleThreadExecutors list will be a limiting
factor. 


Is this setup possible with Flink?


Btw I will use RocksDbStateBackend






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using managed keyed state with AsynIo

2020-08-14 Thread Arvid Heise
Hi KristoffSC,

you are right that state is not shared across operators - I forgot about
that. So the approach would only be valid as is if the state can be
properly separated into two independent subtasks. For example, you need the
state to find the database key and you store the full entry in Flink state
afterwards. Then you could fetch the key in the map before async IO and
keep the full record in the map after async IO.

Another approach is to perform some kind of feedback from async IO to the
first map. There is usually a tradeoff between performance (use a Kafka
topic for feedback) and complexity (write some TCP socket magic). I'd
rather recommend to have a look at statefun though [1], which implements
this feedback in an efficient way and provides a good abstraction for
everything that is state-related. Unfortunately, mixing Flink jobs and
statefun applications is still not easily possible - I'm assuming it would
happen in the next major release. But maybe, you can express everything in
statefun, at which point, it's the best choice.

For your question: it shouldn't make any difference, as the function gets
serialized in the main() and deserialized at each JM/TM resulting in many
copies. The only difference is that in your main(), you have one fewer
copy. Since Flink state is only touched in TM, the function instances are
different anyways.

[1] https://github.com/apache/flink-statefun

On Thu, Aug 13, 2020 at 2:53 PM KristoffSC 
wrote:

> Hi Arvid,
> thank you for the respond.
> Yeah I tried to run my job shortly after posting my message and I got
> "State
> is not supported in rich async function" ;)
>
> I came up with a solution that would solve my initial problem -
> concurrent/Async problem of processing messages with the same key but
> unfortunately stet is not sported here.
>
> Thank you for the proposition
> source -> keyby -> map (retrieve state) -> async IO (use state) -> map
> (update state)
>
> However I'm a little bit surprised. I thought that state on a keyed cannot
> be shared between operators, and here you are suggesting doing that. Is it
> possible then?
>
>
> Using this occasion I have additional question, Is there any difference
> from
> Flink perspective between this two approaches:
>
> MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is
> stateless object, but it uses Flink keyed state.
>
> Setup 1:
>
> source -> keyBy(key) ->  proces(pf) -> map() -> process(pf) -> sink
>
> Setup 2:
> source -> keyBy(key) ->  proces(new MyProcessFunction()) -> map() ->
> process(new MyProcessFunction()) -> sink
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Using managed keyed state with AsynIo

2020-08-13 Thread KristoffSC
Hi Arvid,
thank you for the respond. 
Yeah I tried to run my job shortly after posting my message and I got "State
is not supported in rich async function" ;)

I came up with a solution that would solve my initial problem -
concurrent/Async problem of processing messages with the same key but
unfortunately stet is not sported here.

Thank you for the proposition
source -> keyby -> map (retrieve state) -> async IO (use state) -> map
(update state)

However I'm a little bit surprised. I thought that state on a keyed cannot
be shared between operators, and here you are suggesting doing that. Is it
possible then?


Using this occasion I have additional question, Is there any difference from
Flink perspective between this two approaches:

MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is
stateless object, but it uses Flink keyed state.

Setup 1:

source -> keyBy(key) ->  proces(pf) -> map() -> process(pf) -> sink

Setup 2:
source -> keyBy(key) ->  proces(new MyProcessFunction()) -> map() ->
process(new MyProcessFunction()) -> sink 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using managed keyed state with AsynIo

2020-08-13 Thread Arvid Heise
Hi KristoffSC,

Afaik asyncIO does not support state operations at all because of your
mentioned issues (RichAsyncFunction fails if you access state).

I'd probably solve it by having a map or process function before and after
the asyncIO for the state operations. If you enable object reuse,
performance should be pretty much the same as if async I/O would support
it, but the threading model becomes much easier.

So, the pipeline is source -> keyby -> map (retrieve state) -> async IO
(use state) -> map (update state). You might need to return Tuple from map and asyncIO to have the full context information on the
subsequent operators.

On Mon, Aug 10, 2020 at 4:24 PM KristoffSC 
wrote:

> Hi guys,
> I'm using Flink 1.9.2
>
> I have a question about uses case where I would like to use FLink's managed
> keyed state with Async IO [1]
>
>
> Lets take as a base line below example taken from [1] and lets assume that
> we are executing this on a keyed stream.
>
> final Future result = client.query(key);
>
> CompletableFuture.supplyAsync(new Supplier() {
>
> @Override
> public String get() {
> try {
> return result.get();
> } catch (InterruptedException | ExecutionException e) {
> // Normally handled explicitly.
> return null;
> }
> }
> }).thenAccept( (String dbResult) -> {
> resultFuture.complete(Collections.singleton(new Tuple2<>(key,
> dbResult)));
> });
>
>
> Imagine that instead passing key to client.query(..) we will pass some
> value
> taken from Flinks Managed, keyed state. Later the supplier's get method
> will
> return a value that should be stored in that state. In other words, we use
> previous results as inputs for next computations.
>
> Is this achievable with Flinks AsyncIo? I can have many pending requests on
> client.query which can finished in a random order. The
> AsyncDataStream.orderedWait will not help he here since this affects only
> the way how Flink "releases" the messages from it's internal queue for
> Async
> operators.
>
>
> What is more, this scenario can result with multiple concurrent
> writes/reads
> to/from Flink's managed state for same key values. Is this thread safe?
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: State Processor API to boot strap keyed state for Stream Application.

2020-08-13 Thread Arvid Heise
For future readers: this thread has been resolved in "Please help, I need
to bootstrap keyed state into a stream" on the user mailing list asked by
Marco.

On Fri, Aug 7, 2020 at 11:52 PM Marco Villalobos 
wrote:

> I have read the documentation and various blogs that state that it is
> possible to load data into a data-set and use that data to bootstrap a
> stream application.
>
> The documentation literally says this, "...you can read a batch of data
> from any store, preprocess it, and write the result to a savepoint that you
> use to bootstrap the state of a streaming application." (source:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html
> ).
>
> Another blog states, "You can create both Batch and Stream environment in
> a single job." (source:
> https://www.kharekartik.dev/2019/12/14/bootstrap-your-flink-jobs/
>
> I want to try this approach, but I cannot find any real examples online.
>
> I have failed on numerous attempts.
>
> I have a few questions:
>
> 1) is there an example that demonstrate this feature?
> 2) how can you launch batch and stream environment from a single job?
> 3) does this require two jobs?
>
> Anybody, please help.
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Please help, I need to bootstrap keyed state into a stream

2020-08-12 Thread Marco Villalobos
Hi Seth,

Thank you for the advice. The solution you mentioned is exactly what I did.

I wrote a small tutorial that explains how to repeat that pattern.

You can read about my solution at 
https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream
 
<https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream>

Regarding the NullPointerException when running locally, thank you for filing a 
ticket. It would be very nice to get that fixed.

Sincerely, 

Marco A. Villalobos



> On Aug 12, 2020, at 9:40 AM, Seth Wiesman  wrote:
> 
> Just to summarize the conversation so far: 
> 
> The state processor api reads data from a 3rd party system - such as JDBC in 
> this example - and generates a savepoint file that is written out to some 
> DFS.  This savepoint can then be used to when starting a flink streaming 
> application. It is a two-step process, creating the savepoint in one job and 
> then starting a streaming application from that savepoint in another. 
> 
> These jobs do not have to be a single application, and in general, I 
> recommend they be developed as two separate jobs. The reason being, 
> bootstrapping state is a one-time process while your streaming application 
> runs forever. It will simplify your development and operations in the long 
> term if you do not mix concerns. 
> 
> Concerning the NullPointerException:
> 
> The max parallelism must be at least 128. I've opened a ticket to track and 
> resolve this issue. 
> 
> Seth 
> 
> On Mon, Aug 10, 2020 at 6:38 PM Marco Villalobos  <mailto:mvillalo...@kineteque.com>> wrote:
> I think there is a bug in Flink when running locally without a cluster.
> 
> My code worked in a cluster, but failed when run locally.
> 
> My code does not save null values in Map State.
> 
> > On Aug 9, 2020, at 11:27 PM, Tzu-Li Tai  > <mailto:tzuli...@gmail.com>> wrote:
> > 
> > Hi,
> > 
> > For the NullPointerException, what seems to be happening is that you are
> > setting NULL values in your MapState, that is not allowed by the API.
> > 
> > Otherwise, the code that you showed for bootstrapping state seems to be
> > fine.
> > 
> >> I have yet to find a working example that shows how to do both
> >> (bootstrapping state and start a streaming application with that state)
> > 
> > Not entirely sure what you mean here by "doing both".
> > The savepoint written using the State Processor API (what you are doing in
> > the bootstrap() method) is a savepoint that may be restored from as you
> > would with a typical Flink streaming job restore.
> > So, usually the bootstrapping part happens as a batch "offline" job, while
> > you keep your streaming job as a separate job. What are you trying to
> > achieve with having both written within the same job?
> > 
> > Cheers,
> > Gordon
> > 
> > 
> > 
> > --
> > Sent from: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 



Re: Please help, I need to bootstrap keyed state into a stream

2020-08-12 Thread Seth Wiesman
Just to summarize the conversation so far:

The state processor api reads data from a 3rd party system - such as JDBC
in this example - and generates a savepoint file that is written out to
some DFS.  This savepoint can then be used to when starting a flink
streaming application. It is a two-step process, creating the savepoint in
one job and then starting a streaming application from that savepoint in
another.

These jobs do not have to be a single application, and in general, I
recommend they be developed as two separate jobs. The reason being,
bootstrapping state is a one-time process while your streaming application
runs forever. It will simplify your development and operations in the long
term if you do not mix concerns.

Concerning the NullPointerException:

The max parallelism must be at least 128. I've opened a ticket to track and
resolve this issue.

Seth

On Mon, Aug 10, 2020 at 6:38 PM Marco Villalobos 
wrote:

> I think there is a bug in Flink when running locally without a cluster.
>
> My code worked in a cluster, but failed when run locally.
>
> My code does not save null values in Map State.
>
> > On Aug 9, 2020, at 11:27 PM, Tzu-Li Tai  wrote:
> >
> > Hi,
> >
> > For the NullPointerException, what seems to be happening is that you are
> > setting NULL values in your MapState, that is not allowed by the API.
> >
> > Otherwise, the code that you showed for bootstrapping state seems to be
> > fine.
> >
> >> I have yet to find a working example that shows how to do both
> >> (bootstrapping state and start a streaming application with that state)
> >
> > Not entirely sure what you mean here by "doing both".
> > The savepoint written using the State Processor API (what you are doing
> in
> > the bootstrap() method) is a savepoint that may be restored from as you
> > would with a typical Flink streaming job restore.
> > So, usually the bootstrapping part happens as a batch "offline" job,
> while
> > you keep your streaming job as a separate job. What are you trying to
> > achieve with having both written within the same job?
> >
> > Cheers,
> > Gordon
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Please help, I need to bootstrap keyed state into a stream

2020-08-10 Thread Marco Villalobos
I think there is a bug in Flink when running locally without a cluster.

My code worked in a cluster, but failed when run locally.

My code does not save null values in Map State.

> On Aug 9, 2020, at 11:27 PM, Tzu-Li Tai  wrote:
> 
> Hi,
> 
> For the NullPointerException, what seems to be happening is that you are
> setting NULL values in your MapState, that is not allowed by the API.
> 
> Otherwise, the code that you showed for bootstrapping state seems to be
> fine.
> 
>> I have yet to find a working example that shows how to do both
>> (bootstrapping state and start a streaming application with that state)
> 
> Not entirely sure what you mean here by "doing both".
> The savepoint written using the State Processor API (what you are doing in
> the bootstrap() method) is a savepoint that may be restored from as you
> would with a typical Flink streaming job restore.
> So, usually the bootstrapping part happens as a batch "offline" job, while
> you keep your streaming job as a separate job. What are you trying to
> achieve with having both written within the same job?
> 
> Cheers,
> Gordon
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Please help, I need to bootstrap keyed state into a stream

2020-08-10 Thread Marco Villalobos
Thank you. Your instruction was helpful in my solving this.

You can read about my solution at 
https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream
 
<https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream>

> On Aug 10, 2020, at 4:07 AM, orionemail  wrote:
> 
> I recently was in the same situation as Marco, the docs do explain what you 
> need to do, but without experience with Flink it might still not be obvious 
> what you need to do.
> 
> What I did initially:
> 
> Setup the job to run in a 'write a save state' mode by implementing a command 
> line switch I could use when running the job:
> 
> flink run somejob.jar -d /some/path
> 
> The code then when run with this switch ran *only* the required code to setup 
> a version of state and write that to a savestate.
> 
> This worked and I was on my way.
> 
> However, I then decided to split this out into a new flink 'jar' with the 
> sole purpose of creating a save state.  This is a cleaner approach in my case 
> and also removes dependancies (my state was loaded from DynamoDB) that were 
> only required in this one instance.
> 
> As rebuilding the state from this application is intended to only be done the 
> once, with checkpoints/savestates the main approach going forward.
> 
> Just remember to name your Operators with the same ID/name to make sure it is 
> compatible.
> 
> Sent with ProtonMail Secure Email.
> 
> ‐‐‐ Original Message ‐‐‐
> On Monday, 10 August 2020 07:27, Tzu-Li Tai  wrote:
> 
>> Hi,
>> 
>> For the NullPointerException, what seems to be happening is that you are
>> setting NULL values in your MapState, that is not allowed by the API.
>> 
>> Otherwise, the code that you showed for bootstrapping state seems to be
>> fine.
>> 
>>> I have yet to find a working example that shows how to do both
>>> (bootstrapping state and start a streaming application with that state)
>> 
>> Not entirely sure what you mean here by "doing both".
>> The savepoint written using the State Processor API (what you are doing in
>> the bootstrap() method) is a savepoint that may be restored from as you
>> would with a typical Flink streaming job restore.
>> So, usually the bootstrapping part happens as a batch "offline" job, while
>> you keep your streaming job as a separate job. What are you trying to
>> achieve with having both written within the same job?
>> 
>> Cheers,
>> Gordon
>> 
>> 
>> -
>> 
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 
> 



Re: Please help, I need to bootstrap keyed state into a stream

2020-08-10 Thread Marco Villalobos
First, thank you.

 I want to believe you, I don't see how that is possible.

All of the code is self-contained, and at the bottom of all the code, I print 
out the non-null values before I attempt to put in the map state.

All of the debug output before and after indicates that there is a null value 
in there.

I think something else is wrong.  I am going to run this in a distributed 
environment and see what happens.

> On Aug 9, 2020, at 11:27 PM, Tzu-Li Tai  wrote:
> 
> Hi,
> 
> For the NullPointerException, what seems to be happening is that you are
> setting NULL values in your MapState, that is not allowed by the API.
> 
> Otherwise, the code that you showed for bootstrapping state seems to be
> fine.
> 
>> I have yet to find a working example that shows how to do both
>> (bootstrapping state and start a streaming application with that state)
> 
> Not entirely sure what you mean here by "doing both".

By "doing both" I meant, no examples show how to create both Batch and Stream 
environment in a single job.  Another blog states that is possible.

> The savepoint written using the State Processor API (what you are doing in
> the bootstrap() method) is a savepoint that may be restored from as you
> would with a typical Flink streaming job restore.
> So, usually the bootstrapping part happens as a batch "offline" job, while
> you keep your streaming job as a separate job. What are you trying to
> achieve with having both written within the same job?

Again, thank you.

Marco A. Villalobos

Using managed keyed state with AsynIo

2020-08-10 Thread KristoffSC
Hi guys,
I'm using Flink 1.9.2

I have a question about uses case where I would like to use FLink's managed
keyed state with Async IO [1]


Lets take as a base line below example taken from [1] and lets assume that
we are executing this on a keyed stream.

final Future result = client.query(key);

CompletableFuture.supplyAsync(new Supplier() {

@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key,
dbResult)));
});


Imagine that instead passing key to client.query(..) we will pass some value
taken from Flinks Managed, keyed state. Later the supplier's get method will
return a value that should be stored in that state. In other words, we use
previous results as inputs for next computations. 

Is this achievable with Flinks AsyncIo? I can have many pending requests on
client.query which can finished in a random order. The
AsyncDataStream.orderedWait will not help he here since this affects only
the way how Flink "releases" the messages from it's internal queue for Async
operators. 


What is more, this scenario can result with multiple concurrent writes/reads
to/from Flink's managed state for same key values. Is this thread safe?


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Please help, I need to bootstrap keyed state into a stream

2020-08-10 Thread orionemail
I recently was in the same situation as Marco, the docs do explain what you 
need to do, but without experience with Flink it might still not be obvious 
what you need to do.

What I did initially:

Setup the job to run in a 'write a save state' mode by implementing a command 
line switch I could use when running the job:

flink run somejob.jar -d /some/path

The code then when run with this switch ran *only* the required code to setup a 
version of state and write that to a savestate.

This worked and I was on my way.

However, I then decided to split this out into a new flink 'jar' with the sole 
purpose of creating a save state.  This is a cleaner approach in my case and 
also removes dependancies (my state was loaded from DynamoDB) that were only 
required in this one instance.

As rebuilding the state from this application is intended to only be done the 
once, with checkpoints/savestates the main approach going forward.

Just remember to name your Operators with the same ID/name to make sure it is 
compatible.

Sent with ProtonMail Secure Email.

‐‐‐ Original Message ‐‐‐
On Monday, 10 August 2020 07:27, Tzu-Li Tai  wrote:

> Hi,
>
> For the NullPointerException, what seems to be happening is that you are
> setting NULL values in your MapState, that is not allowed by the API.
>
> Otherwise, the code that you showed for bootstrapping state seems to be
> fine.
>
> > I have yet to find a working example that shows how to do both
> > (bootstrapping state and start a streaming application with that state)
>
> Not entirely sure what you mean here by "doing both".
> The savepoint written using the State Processor API (what you are doing in
> the bootstrap() method) is a savepoint that may be restored from as you
> would with a typical Flink streaming job restore.
> So, usually the bootstrapping part happens as a batch "offline" job, while
> you keep your streaming job as a separate job. What are you trying to
> achieve with having both written within the same job?
>
> Cheers,
> Gordon
>
>
> -
>
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




Re: Please help, I need to bootstrap keyed state into a stream

2020-08-10 Thread Tzu-Li Tai
Hi,

For the NullPointerException, what seems to be happening is that you are
setting NULL values in your MapState, that is not allowed by the API.

Otherwise, the code that you showed for bootstrapping state seems to be
fine.

> I have yet to find a working example that shows how to do both
> (bootstrapping state and start a streaming application with that state)

Not entirely sure what you mean here by "doing both".
The savepoint written using the State Processor API (what you are doing in
the bootstrap() method) is a savepoint that may be restored from as you
would with a typical Flink streaming job restore.
So, usually the bootstrapping part happens as a batch "offline" job, while
you keep your streaming job as a separate job. What are you trying to
achieve with having both written within the same job?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Please help, I need to bootstrap keyed state into a stream

2020-08-08 Thread Marco Villalobos
According to the documentation, and various blogs, it is possible to use the 
Batch Execution Environment to bootstrap state into a save point, and then load 
that state in a Stream Execution Environment.

I am trying to use that feature.

State Processor API documentation states that "you can read a batch of data 
from any store, preprocess it, and write the result to a savepoint that you use 
to bootstrap the state of a streaming application."

(source: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html>)

Kartik Khare in his blog wrote that "You can create both Batch and Stream 
environment in a single job."

(source: https://www.kharekartik.dev/2019/12/14/bootstrap-your-flink-jobs 
<https://www.kharekartik.dev/2019/12/14/bootstrap-your-flink-jobs>)

However, I have yet to find a working example that shows how to do both.

I am reaching out to the community to help solve and document this common 
problem.

My progress is at 
https://github.com/minmay/flink-patterns/blob/master/bootstrap-keyed-state-into-stream/README.md
 
<https://github.com/minmay/flink-patterns/blob/master/bootstrap-keyed-state-into-stream/README.md>

I open to contributions.

Please help me solve this, my work and livelihood depend on me solving this.

I keep on getting a NullPointerException deep within the Flink API that makes 
no sense to me:

Caused by: java.lang.NullPointerException
at 
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
at 
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
at 
org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:100)
at 
org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
at 
mvillalobos.flink.patterns.bootstrap.stream.BootstrapKeyedStateIntoStreamApp$ConfigurationKeyedStateBootstrapFunction.processElement(BootstrapKeyedStateIntoStreamApp.java:180)
at 
mvillalobos.flink.patterns.bootstrap.stream.BootstrapKeyedStateIntoStreamApp$ConfigurationKeyedStateBootstrapFunction.processElement(BootstrapKeyedStateIntoStreamApp.java:159)


Can somebody please explain what I am doing wrong?

I have my code below just for convenience:

package mvillalobos.flink.patterns.bootstrap.stream;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

import java.io.File;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.concurrent.Callable;

@CommandLine.Command(name = "Boot Strap Keyed State into Stream", 
mixinStandardHelpOptions = true,
description = "This demo will attempt to boot strap a dataset into a 
save point that will be read by a stream.")
public class BootstrapKeyedStateIntoStreamApp implements Callable {

private final static Logger logger = 
LoggerFactory.getLogger(BootstrapKeyedStateIntoStreamApp.class);

@CommandLine.Option(names = {"-s", "--save-point-path"}, description = "The 
save point path.", required = true)
private transient File savePointPath;

public Integer call() throws Exception {
bootstrap();
stream();
return 0;
}

//writes dataset into a savepoint
public void bootstrap() throws Exception {

logger.info("Starting boot strap demo with save-point-path: file://{}", 
savePointPath);

final ExecutionEnvironment batchEnv = 
ExecutionEnvironment.getExecutionEnvironment();
final JDBCInputF

State Processor API to boot strap keyed state for Stream Application.

2020-08-07 Thread Marco Villalobos
I have read the documentation and various blogs that state that it is
possible to load data into a data-set and use that data to bootstrap a
stream application.

The documentation literally says this, "...you can read a batch of data
from any store, preprocess it, and write the result to a savepoint that you
use to bootstrap the state of a streaming application." (source:
https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html
).

Another blog states, "You can create both Batch and Stream environment in a
single job." (source:
https://www.kharekartik.dev/2019/12/14/bootstrap-your-flink-jobs/

I want to try this approach, but I cannot find any real examples online.

I have failed on numerous attempts.

I have a few questions:

1) is there an example that demonstrate this feature?
2) how can you launch batch and stream environment from a single job?
3) does this require two jobs?

Anybody, please help.


Re: keyed state在不同算子中使用

2020-06-10 Thread Congxian Qiu
你好
  现在 KeyedState 是不能跨算子使用的,也就是不同的算子使用的是不同的 state。
Best,
Congxian


Z-Z  于2020年6月11日周四 上午10:11写道:

> 请问,假设两个算子以相同的字段keyby,它们可以使用相同的StateDescriptor从而使用相同的的keyed state吗


keyed state????????????????

2020-06-10 Thread Z-Z
??keybyStateDescriptorkeyed
 state??

Re: Flink: For terabytes of keyed state.

2020-05-06 Thread Congxian Qiu
Hi Gowri

Please let us know if you meet any problem~

Best,
Congxian


Gowri Sundaram  于2020年5月6日周三 下午1:53写道:

> Hi Congxian,
> Thank you so much for your response! We will go ahead and do a POC to test
> out how Flink performs at scale.
>
> Regards,
> - Gowri
>
> On Wed, May 6, 2020 at 8:34 AM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> From my experience, you should care the state size for a single task(not
>> the whole job state size), the download speed for single thread is almost
>> 100 MB/s (this may vary in different env), and I do not have much
>> performance for loading state into RocksDB(we use an internal KV store in
>> my company), but loading state into RocksDB will not slower than
>> downloading from my experience.
>>
>> Best,
>> Congxian
>>
>>
>> Gowri Sundaram  于2020年5月3日周日 下午11:25写道:
>>
>>> Hi Congxian,
>>> Thank you so much for your response, that really helps!
>>>
>>> From your experience, how long does it take for Flink to redistribute
>>> terabytes of state data on node addition / node failure.
>>>
>>> Thanks!
>>>
>>> On Sun, May 3, 2020 at 6:56 PM Congxian Qiu 
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> 1. From my experience, Flink can support such big state, you can set
>>>> appropriate parallelism for the stateful operator. for RocksDB you may need
>>>> to care about the disk performance.
>>>> 2. Inside Flink, the state is separated by key-group, each
>>>> task/parallelism contains multiple key-groups.  Flink does not need to
>>>> restart when you add a node to the cluster, but every time restart from
>>>> savepoint/checkpoint(or failover), Flink needs to redistribute the
>>>> checkpoint data, this can be omitted if it's failover and local recovery[1]
>>>> is enabled
>>>> 3. for upload/download state, you can ref to the multiple thread
>>>> upload/download[2][3] for speed up them
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#task-local-recovery
>>>> [2] https://issues.apache.org/jira/browse/FLINK-10461
>>>> [3] https://issues.apache.org/jira/browse/FLINK-11008
>>>>
>>>> Best,
>>>> Congxian
>>>>
>>>>
>>>> Gowri Sundaram  于2020年5月1日周五 下午6:29写道:
>>>>
>>>>> Hello all,
>>>>> We have read in multiple
>>>>> <https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html>
>>>>> sources <https://flink.apache.org/usecases.html> that Flink has been
>>>>> used for use cases with terabytes of application state.
>>>>>
>>>>> We are considering using Flink for a similar use case with* keyed
>>>>> state in the range of 20 to 30 TB*. We had a few questions regarding
>>>>> the same.
>>>>>
>>>>>
>>>>>- *Is Flink a good option for this kind of scale of data* ? We are
>>>>>considering using RocksDB as the state backend.
>>>>>- *What happens when we want to add a node to the cluster *?
>>>>>   - As per our understanding, if we have 10 nodes in our cluster,
>>>>>   with 20TB of state, this means that adding a node would require the 
>>>>> entire
>>>>>   20TB of data to be shipped again from the external checkpoint remote
>>>>>   storage to the taskmanager nodes.
>>>>>   - Assuming 1Gb/s network speed, and assuming all nodes can read
>>>>>   their respective 2TB state parallely, this would mean a *minimum
>>>>>   downtime of half an hour*. And this is assuming the throughput
>>>>>   of the remote storage does not become the bottleneck.
>>>>>   - Is there any way to reduce this estimated downtime ?
>>>>>
>>>>>
>>>>> Thank you!
>>>>>
>>>>


Re: Flink: For terabytes of keyed state.

2020-05-05 Thread Gowri Sundaram
Hi Congxian,
Thank you so much for your response! We will go ahead and do a POC to test
out how Flink performs at scale.

Regards,
- Gowri

On Wed, May 6, 2020 at 8:34 AM Congxian Qiu  wrote:

> Hi
>
> From my experience, you should care the state size for a single task(not
> the whole job state size), the download speed for single thread is almost
> 100 MB/s (this may vary in different env), and I do not have much
> performance for loading state into RocksDB(we use an internal KV store in
> my company), but loading state into RocksDB will not slower than
> downloading from my experience.
>
> Best,
> Congxian
>
>
> Gowri Sundaram  于2020年5月3日周日 下午11:25写道:
>
>> Hi Congxian,
>> Thank you so much for your response, that really helps!
>>
>> From your experience, how long does it take for Flink to redistribute
>> terabytes of state data on node addition / node failure.
>>
>> Thanks!
>>
>> On Sun, May 3, 2020 at 6:56 PM Congxian Qiu 
>> wrote:
>>
>>> Hi
>>>
>>> 1. From my experience, Flink can support such big state, you can set
>>> appropriate parallelism for the stateful operator. for RocksDB you may need
>>> to care about the disk performance.
>>> 2. Inside Flink, the state is separated by key-group, each
>>> task/parallelism contains multiple key-groups.  Flink does not need to
>>> restart when you add a node to the cluster, but every time restart from
>>> savepoint/checkpoint(or failover), Flink needs to redistribute the
>>> checkpoint data, this can be omitted if it's failover and local recovery[1]
>>> is enabled
>>> 3. for upload/download state, you can ref to the multiple thread
>>> upload/download[2][3] for speed up them
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#task-local-recovery
>>> [2] https://issues.apache.org/jira/browse/FLINK-10461
>>> [3] https://issues.apache.org/jira/browse/FLINK-11008
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Gowri Sundaram  于2020年5月1日周五 下午6:29写道:
>>>
>>>> Hello all,
>>>> We have read in multiple
>>>> <https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html>
>>>> sources <https://flink.apache.org/usecases.html> that Flink has been
>>>> used for use cases with terabytes of application state.
>>>>
>>>> We are considering using Flink for a similar use case with* keyed
>>>> state in the range of 20 to 30 TB*. We had a few questions regarding
>>>> the same.
>>>>
>>>>
>>>>- *Is Flink a good option for this kind of scale of data* ? We are
>>>>considering using RocksDB as the state backend.
>>>>- *What happens when we want to add a node to the cluster *?
>>>>   - As per our understanding, if we have 10 nodes in our cluster,
>>>>   with 20TB of state, this means that adding a node would require the 
>>>> entire
>>>>   20TB of data to be shipped again from the external checkpoint remote
>>>>   storage to the taskmanager nodes.
>>>>   - Assuming 1Gb/s network speed, and assuming all nodes can read
>>>>   their respective 2TB state parallely, this would mean a *minimum
>>>>   downtime of half an hour*. And this is assuming the throughput
>>>>   of the remote storage does not become the bottleneck.
>>>>   - Is there any way to reduce this estimated downtime ?
>>>>
>>>>
>>>> Thank you!
>>>>
>>>


Re: Flink: For terabytes of keyed state.

2020-05-05 Thread Congxian Qiu
Hi

>From my experience, you should care the state size for a single task(not
the whole job state size), the download speed for single thread is almost
100 MB/s (this may vary in different env), and I do not have much
performance for loading state into RocksDB(we use an internal KV store in
my company), but loading state into RocksDB will not slower than
downloading from my experience.

Best,
Congxian


Gowri Sundaram  于2020年5月3日周日 下午11:25写道:

> Hi Congxian,
> Thank you so much for your response, that really helps!
>
> From your experience, how long does it take for Flink to redistribute
> terabytes of state data on node addition / node failure.
>
> Thanks!
>
> On Sun, May 3, 2020 at 6:56 PM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> 1. From my experience, Flink can support such big state, you can set
>> appropriate parallelism for the stateful operator. for RocksDB you may need
>> to care about the disk performance.
>> 2. Inside Flink, the state is separated by key-group, each
>> task/parallelism contains multiple key-groups.  Flink does not need to
>> restart when you add a node to the cluster, but every time restart from
>> savepoint/checkpoint(or failover), Flink needs to redistribute the
>> checkpoint data, this can be omitted if it's failover and local recovery[1]
>> is enabled
>> 3. for upload/download state, you can ref to the multiple thread
>> upload/download[2][3] for speed up them
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#task-local-recovery
>> [2] https://issues.apache.org/jira/browse/FLINK-10461
>> [3] https://issues.apache.org/jira/browse/FLINK-11008
>>
>> Best,
>> Congxian
>>
>>
>> Gowri Sundaram  于2020年5月1日周五 下午6:29写道:
>>
>>> Hello all,
>>> We have read in multiple
>>> <https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html>
>>> sources <https://flink.apache.org/usecases.html> that Flink has been
>>> used for use cases with terabytes of application state.
>>>
>>> We are considering using Flink for a similar use case with* keyed state
>>> in the range of 20 to 30 TB*. We had a few questions regarding the same.
>>>
>>>
>>>- *Is Flink a good option for this kind of scale of data* ? We are
>>>considering using RocksDB as the state backend.
>>>- *What happens when we want to add a node to the cluster *?
>>>   - As per our understanding, if we have 10 nodes in our cluster,
>>>   with 20TB of state, this means that adding a node would require the 
>>> entire
>>>   20TB of data to be shipped again from the external checkpoint remote
>>>   storage to the taskmanager nodes.
>>>   - Assuming 1Gb/s network speed, and assuming all nodes can read
>>>   their respective 2TB state parallely, this would mean a *minimum
>>>   downtime of half an hour*. And this is assuming the throughput of
>>>   the remote storage does not become the bottleneck.
>>>   - Is there any way to reduce this estimated downtime ?
>>>
>>>
>>> Thank you!
>>>
>>


Re: Flink: For terabytes of keyed state.

2020-05-03 Thread Gowri Sundaram
Hi Congxian,
Thank you so much for your response, that really helps!

>From your experience, how long does it take for Flink to redistribute
terabytes of state data on node addition / node failure.

Thanks!

On Sun, May 3, 2020 at 6:56 PM Congxian Qiu  wrote:

> Hi
>
> 1. From my experience, Flink can support such big state, you can set
> appropriate parallelism for the stateful operator. for RocksDB you may need
> to care about the disk performance.
> 2. Inside Flink, the state is separated by key-group, each
> task/parallelism contains multiple key-groups.  Flink does not need to
> restart when you add a node to the cluster, but every time restart from
> savepoint/checkpoint(or failover), Flink needs to redistribute the
> checkpoint data, this can be omitted if it's failover and local recovery[1]
> is enabled
> 3. for upload/download state, you can ref to the multiple thread
> upload/download[2][3] for speed up them
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#task-local-recovery
> [2] https://issues.apache.org/jira/browse/FLINK-10461
> [3] https://issues.apache.org/jira/browse/FLINK-11008
>
> Best,
> Congxian
>
>
> Gowri Sundaram  于2020年5月1日周五 下午6:29写道:
>
>> Hello all,
>> We have read in multiple
>> <https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html>
>> sources <https://flink.apache.org/usecases.html> that Flink has been
>> used for use cases with terabytes of application state.
>>
>> We are considering using Flink for a similar use case with* keyed state
>> in the range of 20 to 30 TB*. We had a few questions regarding the same.
>>
>>
>>- *Is Flink a good option for this kind of scale of data* ? We are
>>considering using RocksDB as the state backend.
>>- *What happens when we want to add a node to the cluster *?
>>   - As per our understanding, if we have 10 nodes in our cluster,
>>   with 20TB of state, this means that adding a node would require the 
>> entire
>>   20TB of data to be shipped again from the external checkpoint remote
>>   storage to the taskmanager nodes.
>>   - Assuming 1Gb/s network speed, and assuming all nodes can read
>>   their respective 2TB state parallely, this would mean a *minimum
>>   downtime of half an hour*. And this is assuming the throughput of
>>   the remote storage does not become the bottleneck.
>>   - Is there any way to reduce this estimated downtime ?
>>
>>
>> Thank you!
>>
>


Re: Flink: For terabytes of keyed state.

2020-05-03 Thread Congxian Qiu
Hi

1. From my experience, Flink can support such big state, you can set
appropriate parallelism for the stateful operator. for RocksDB you may need
to care about the disk performance.
2. Inside Flink, the state is separated by key-group, each
task/parallelism contains multiple key-groups.  Flink does not need to
restart when you add a node to the cluster, but every time restart from
savepoint/checkpoint(or failover), Flink needs to redistribute the
checkpoint data, this can be omitted if it's failover and local recovery[1]
is enabled
3. for upload/download state, you can ref to the multiple thread
upload/download[2][3] for speed up them

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#task-local-recovery
[2] https://issues.apache.org/jira/browse/FLINK-10461
[3] https://issues.apache.org/jira/browse/FLINK-11008

Best,
Congxian


Gowri Sundaram  于2020年5月1日周五 下午6:29写道:

> Hello all,
> We have read in multiple
> <https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html>
> sources <https://flink.apache.org/usecases.html> that Flink has been used
> for use cases with terabytes of application state.
>
> We are considering using Flink for a similar use case with* keyed state
> in the range of 20 to 30 TB*. We had a few questions regarding the same.
>
>
>- *Is Flink a good option for this kind of scale of data* ? We are
>considering using RocksDB as the state backend.
>- *What happens when we want to add a node to the cluster *?
>   - As per our understanding, if we have 10 nodes in our cluster,
>   with 20TB of state, this means that adding a node would require the 
> entire
>   20TB of data to be shipped again from the external checkpoint remote
>   storage to the taskmanager nodes.
>   - Assuming 1Gb/s network speed, and assuming all nodes can read
>   their respective 2TB state parallely, this would mean a *minimum
>   downtime of half an hour*. And this is assuming the throughput of
>   the remote storage does not become the bottleneck.
>   - Is there any way to reduce this estimated downtime ?
>
>
> Thank you!
>


Flink: For terabytes of keyed state.

2020-05-01 Thread Gowri Sundaram
Hello all,
We have read in multiple
<https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html>
sources <https://flink.apache.org/usecases.html> that Flink has been used
for use cases with terabytes of application state.

We are considering using Flink for a similar use case with* keyed state in
the range of 20 to 30 TB*. We had a few questions regarding the same.


   - *Is Flink a good option for this kind of scale of data* ? We are
   considering using RocksDB as the state backend.
   - *What happens when we want to add a node to the cluster *?
  - As per our understanding, if we have 10 nodes in our cluster, with
  20TB of state, this means that adding a node would require the
entire 20TB
  of data to be shipped again from the external checkpoint remote
storage to
  the taskmanager nodes.
  - Assuming 1Gb/s network speed, and assuming all nodes can read their
  respective 2TB state parallely, this would mean a *minimum downtime
  of half an hour*. And this is assuming the throughput of the remote
  storage does not become the bottleneck.
  - Is there any way to reduce this estimated downtime ?


Thank you!


Re:Re: Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-14 Thread chenxyz
tory{configuredOptions={}}.

2020-04-14 11:42:45,994 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) 
switched from DEPLOYING to RUNNING.

2020-04-14 11:42:45,994 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Initializing 
Source: Custom Source (1/1).

2020-04-14 11:42:45,994 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask   - Loading state 
backend via factory 
org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

2020-04-14 11:42:45,995 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using 
predefined options: DEFAULT.

2020-04-14 11:42:45,995 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using default 
options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.

2020-04-14 11:42:46,033 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Invoking 
Source: Custom Source (1/1)

2020-04-14 11:42:46,042 DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating 
operator state backend for StreamSource_bc764cd8ddf7a0cff126f51c16239658_(1/1) 
with empty state.

2020-04-14 11:42:46,057 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Invoking 
KeyedProcess -> Sink: Unnamed (1/1)

2020-04-14 11:42:46,060 DEBUG 
org.apache.flink.runtime.state.TaskStateManagerImpl   - Operator 
c09dc291fad93d575e015871097bfc60 has remote state 
SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, 
operatorStateFromStream=StateObjectCollection{[]}, 
keyedStateFromBackend=StateObjectCollection{[]}, 
keyedStateFromStream=StateObjectCollection{[]}, stateSize=0} from job manager 
and local state alternatives [] from local state store 
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09.

2020-04-14 11:42:46,060 DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating 
operator state backend for StreamSink_c09dc291fad93d575e015871097bfc60_(1/1) 
with empty state.

2020-04-14 11:42:46,069 DEBUG 
org.apache.flink.runtime.state.TaskStateManagerImpl   - Operator 
20ba6b65f97481d5570070de90e4e791 has remote state 
SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, 
operatorStateFromStream=StateObjectCollection{[]}, 
keyedStateFromBackend=StateObjectCollection{[IncrementalRemoteKeyedStateHandle{backendIdentifier=04ac09d6-1f1f-4a6c-a78d-74090c83b3c7,
 keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, checkpointId=1, 
sharedState={}, 
privateState={MANIFEST-06=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/2ff261b8-f51c-42bf-9fab-93c6b119dcff',
 dataBytes=206}, OPTIONS-10=File State: 
hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/426c66a6-d32e-43c8-9873-550237ee0963
 [10379 bytes], 
CURRENT=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/bbbce7c9-ea02-4590-9b18-d7a322deb2f4',
 dataBytes=16}}, metaStateHandle=File State: 
hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/chk-1/9215630d-632e-48f6-b668-7dc235a8ff7a
 [1163 bytes], registered=false}]}, 
keyedStateFromStream=StateObjectCollection{[]}, stateSize=11764} from job 
manager and local state alternatives [] from local state store 
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09.

2020-04-14 11:42:46,070 DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating 
keyed state backend for 
KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) and restoring with 
state from alternative (1/1).

2020-04-14 11:42:46,071 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to 
load RocksDB native library and store it under '/data/flink1_10/tmp'

2020-04-14 11:42:46,071 DEBUG 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to 
create RocksDB native library folder 
/data/flink1_10/tmp/rocksdb-lib-a5f35d4dd06539876a20dbabc82a7f33

2020-04-14 11:42:46,078 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf  - 
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkAccessible: true

2020-04-14 11:42:46,079 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf  - 
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkBounds: true

2020-04-14 11:42:46,080 DEBUG 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory  - 
Loaded default ResourceLeakDetector: 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector@28a9bbee

2020-04-14 11:42:46,150 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Successfully 
loaded RocksDB native library

2020-04-14 11:42:46,154 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Getting managed 
me

Re: Is it possible to emulate keyed state with operator state?

2020-04-10 Thread David Anderson
Hypothetically, yes, I think this is possible to some extent. You would
have to give up all the things that require a KeyedStream, such as timers,
and the RocksDB state backend. And performance would suffer.

As for the question of determining which key groups (and ultimately, which
keys) are handled by a specific instance, see [1] and [2].

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-can-I-find-out-which-key-group-belongs-to-which-subtask-td32032.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html


On Wed, Apr 8, 2020 at 9:10 AM Salva Alcántara 
wrote:
>
> Just for the sake of experimenting and learning. Let's assume that we
have a
> keyed process function using keyed state and we want to rewrite it using
> operator state. The question is, would that be possible to keep the exact
> same behaviour? For example, one could use operator union list state and
> then setup a timer to automatically remove the state not used within a
given
> time...that would probably work but I'd rather prefer a way to know which
> elements of the union list state to use right after a recovery/restore,
> discarding the others, depending on the set of keys the current operator
> instance has been assigned. Is it possible to achieve this?
>
>
>
> --
> Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Is it possible to emulate keyed state with operator state?

2020-04-08 Thread Salva Alcántara
Just for the sake of experimenting and learning. Let's assume that we have a
keyed process function using keyed state and we want to rewrite it using
operator state. The question is, would that be possible to keep the exact
same behaviour? For example, one could use operator union list state and
then setup a timer to automatically remove the state not used within a given
time...that would probably work but I'd rather prefer a way to know which
elements of the union list state to use right after a recovery/restore,
discarding the others, depending on the set of keys the current operator
instance has been assigned. Is it possible to achieve this?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


  1   2   >