On efficient checkpoints with dynamic (self-evolving) keyed state

2020-04-06 Thread Salva Alcántara
In a KeyedCoProcessFunction, I am managing a keyed state which consists of third-party library models. These models are created on reception of new data on the control stream within `processElement1`. Because the models are self-evolving, in the sense that have their own internal state, I need

Re: Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-03 Thread Congxian Qiu
gt; >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst > >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因 > > > >Best, > >Congxian > > > > > >chenxyz 于2020年4月1日周三 下午3:02写道: > > >

Re:Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-01 Thread chenxyz
.sst >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因 > >Best, >Congxian > > >chenxyz 于2020年4月1日周三 下午3:02写道: > >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for >> KeyedProcessOperator。这个问题怎么解决呢? >> >> 版本:1.10 standalone >

Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-01 Thread Congxian Qiu
check 下,如果是下载失败,你需要确认下下载失败的原因 Best, Congxian chenxyz 于2020年4月1日周三 下午3:02写道: > 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for > KeyedProcessOperator。这个问题怎么解决呢? > > 版本:1.10 standalone > > 配置信息: > > state.backend: rocksdb > > state.checkp

rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-01 Thread chenxyz
任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for KeyedProcessOperator。这个问题怎么解决呢? 版本:1.10 standalone 配置信息: state.backend: rocksdb state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint state.backend.incremental: true jobmanager.execution.failover

Re: State Processor API Keyed State

2020-02-18 Thread Tzu-Li (Gordon) Tai
There might be a possible workaround for this, for now: Basically, the trick is to explicitly tell the State Processor API to use a specified type information to access the keyed state. You can do that with the `ExistingSavepoint#readKeyedState(String uid, KeyedStateReaderFunction function

Re: State Processor API Keyed State

2020-02-18 Thread Tzu-Li (Gordon) Tai
a reference repository that will > demonstrate the issue (repository: > https://github.com/segmentio/flink-state-management). > > The current implementation of the pipeline has left us with keyed state > that we no longer need, and we don't have references some of the old keys. > My plan w

State Processor API Keyed State

2020-02-18 Thread Mark Niehe
with keyed state that we no longer need, and we don't have references some of the old keys. My plan was to: 1. create a savepoint 2. read the keys from each operator (using State Processor API) 3. filter out all the keys that are longer used 4. bootstrap a new savepoint that contains the filtered

Re: State Processor API: StateMigrationException for keyed state

2019-12-13 Thread Peter Westermann
: Could not restore keyed state backend for ebe1b56bc6601c8bccba93887bec8059_ebe1b56bc6601c8bccba93887bec8059_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135

Re: State Processor API: StateMigrationException for keyed state

2019-12-12 Thread vino yang
Hi pwestermann, Can you share the relevant detailed exception message? Best, Vino pwestermann 于2019年12月13日周五 上午2:00写道: > I am trying to get the new State Processor API but I am having trouble with > keyed state (this is for Flink 1.9.1 with RocksDB on S3 as the backend). > I can r

State Processor API: StateMigrationException for keyed state

2019-12-12 Thread pwestermann
I am trying to get the new State Processor API but I am having trouble with keyed state (this is for Flink 1.9.1 with RocksDB on S3 as the backend). I can read keyed state for simple key type such as Strings but whenever I tried to read state with a more complex key type - such as a named tuple

Re: Custom Partitioning with keyed state

2019-10-28 Thread Congxian Qiu
more keyed processing using keyed state. > > What happened: I have implemented my logic using the keyBy so I can use a > keyed state but it suffers from a great skewness some of the nodes had > received no records and other ones received more than one. I have tried to > use cus

Custom Partitioning with keyed state

2019-10-27 Thread Heidi Hazem Mohamed
to the same node to do such more keyed processing using keyed state. What happened: I have implemented my logic using the keyBy so I can use a keyed state but it suffers from a great skewness some of the nodes had received no records and other ones received more than one. I have tried to use

Re: Manually clean SQL keyed state

2018-11-09 Thread shkob1
Thanks Fabian! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Manually clean SQL keyed state

2018-11-09 Thread Fabian Hueske
Hi Shahar, That's not possible at the moment. The SQL API does not provide any knobs to control state size besides the idle state retention. The reason is that it aims to be as accurate as possible. In the future it might be possible to provide more information to the system (like constraints in

Manually clean SQL keyed state

2018-11-08 Thread shkob1
I have a scenario in which i do a non-windowed group by using SQL. something like "Select count(*) as events, shouldTrigger(..) as shouldTrigger from source group by sessionId" i'm then converting to a retracted stream, filtering by "add" messages, then further filtering by "shouldTrigger" field

Re: Fail to recover Keyed State afeter ReinterpretAsKeyedStream

2018-10-24 Thread Tzu-Li (Gordon) Tai
Hi Jose, As far as I know, you should be able to use keyed state on a stream returned by DataStreamUtils.reinterpretAsKeyedStream function. That shouldn’t be the issue here. Have you looked into the logs for any meaningful exceptions of why the restore failed? That would be helpful here

Fail to recover Keyed State afeter ReinterpretAsKeyedStream

2018-10-24 Thread Jose Cisneros
Hi, To avoid reshuffling in my job, I started using DataStreamUtils. reinterpretAsKeyedStream to avoid having to do another keyBy for the same key. The BackEndState is RocksDB. When the job recovers after a failure, the ProcessFunction after the keyBy restores its Keyed State correctly

Re: Using Managed Keyed State

2018-08-31 Thread Andrey Zagrebin
Hi, If you ask about keyed state, you probably mean ListState, because in any case List is just java object for a concrete value of state. ListState is also scoped by current record key as ValueState but adds some list specific functionality. They are state object handles. Keyed state is also

Using Managed Keyed State

2018-08-31 Thread Boris Lublinsky
Documentation https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#using-managed-keyed-state <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#using-managed-keyed-state> lists ValueState and List, but their sem

Re: Managed Keyed state update

2018-08-14 Thread Fabian Hueske
ay, August 13, 2018 4:33 PM > *To:* Alexey Trenikhun > *Cc:* user@flink.apache.org > *Subject:* Re: Managed Keyed state update > > Hi, Alexey: > It depends on the state backend you use. If you use heap memory backend, > then you don't need to do put again. > However, if you use rocksdb

Re: Managed Keyed state update

2018-08-13 Thread Alexey Trenikhun
Clear. Thank you Get Outlook for iOS<https://aka.ms/o0ukef> From: Renjie Liu Sent: Monday, August 13, 2018 4:33 PM To: Alexey Trenikhun Cc: user@flink.apache.org Subject: Re: Managed Keyed state update Hi, Alexey: It depends on the state backend you use.

Re: Managed Keyed state update

2018-08-13 Thread Renjie Liu
wrote: > Let’s say I have Managed Keyed state - > MapState> x, I initialize for state for “k0” - > x.put(“k0”, new Tuple2<>(“a”, “b”)); > Later I retried state Tuple2 v = x.get(“k0”); and change > value: v.f0=“U”;, does it make state ‘dirty’? In other words, do I need to >

Managed Keyed state update

2018-08-13 Thread Alexey Trenikhun
Let’s say I have Managed Keyed state - MapState> x, I initialize for state for “k0” - x.put(“k0”, new Tuple2<>(“a”, “b”)); Later I retried state Tuple2 v = x.get(“k0”); and change value: v.f0=“U”;, does it make state ‘dirty’? In other words, do I need to call x.put(“k0”, v) again

Using CheckpointedFunction interface with Keyed state

2018-06-10 Thread Jayant Ameta
Hi, I'm trying to understand the state functionality. Is there any case where using CheckpointedFunction with Keyed state makes any sense? Or the CheckpointedFunction is only to be used with operator state? Also, trying to understand the BufferinSink example here: https://ci.apache.org/projects

Re: ConcurrentModificationException while accessing managed keyed state

2018-06-02 Thread sihua zhou
/10db9d4b6eb41135332fba13d908e36c I will reply back with the Stacktrace soon. Thanks, On Sat, Jun 2, 2018 at 6:49 PM, aitozi wrote: Hi Garvit Sharma, Flink run with per parallel with a single thread. Can you show a little code about how you use the keyed state in processFunction Garvit

Re: ConcurrentModificationException while accessing managed keyed state

2018-06-02 Thread aitozi
gt;> >> Flink run with per parallel with a single thread. Can you show a little >> code >> about how you use the keyed state in processFunction >> >> Garvit Sharma wrote >> > Hi, >> > >> > I have a use case where I am keeping the keyed state

Re: ConcurrentModificationException while accessing managed keyed state

2018-06-02 Thread Garvit Sharma
parallel with a single thread. Can you show a little > code > about how you use the keyed state in processFunction > > Garvit Sharma wrote > > Hi, > > > > I have a use case where I am keeping the keyed state in ProcessFunction. > > > > Key: Intege

Re: ConcurrentModificationException while accessing managed keyed state

2018-06-02 Thread aitozi
Hi Garvit Sharma, Flink run with per parallel with a single thread. Can you show a little code about how you use the keyed state in processFunction Garvit Sharma wrote > Hi, > > I have a use case where I am keeping the keyed state in ProcessFunction. > > Key: I

Re: ConcurrentModificationException while accessing managed keyed state

2018-06-01 Thread sihua zhou
the state and the exception log would also definitely help a lot if you could share with us. Best, Sihua On 06/02/2018 12:08, Garvit Sharma wrote: Hi, I have a use case where I am keeping the keyed state in ProcessFunction. Key: Integer personId;/** * The data type stored in the state

ConcurrentModificationException while accessing managed keyed state

2018-06-01 Thread Garvit Sharma
Hi, I have a use case where I am keeping the keyed state in ProcessFunction. Key: Integer personId; /** * The data type stored in the state */public class PersonDetails { public long count; public long lastModified;} I have encountered a lot of ConcurrentModificationException. I

Re: Regarding Keyed State of Connected Stream

2018-05-25 Thread sihua zhou
n the same thread sync. Best, Sihua 在2018年05月25日 10:26,Garvit Sharma 写道: Hi, Let's consider, I have two keyed streams one for rules and another for data and I have created a connected stream. I am maintaining a managed keyed state (ValueState), rules stream will keep updating the state a

Re: Regarding Keyed State of Connected Stream

2018-05-25 Thread Garvit Sharma
e thread > sync. > > Best, Sihua > > > > > > > 在2018年05月25日 10:26,Garvit Sharma <garvit...@gmail.com> 写道: > Hi, > > Let's consider, I have two keyed streams one for rules and another for > data and I have created a connected stream. > > I am mai

Re: Regarding Keyed State of Connected Stream

2018-05-24 Thread sihua zhou
keyed state (ValueState), rules stream will keep updating the state and data stream will keep reading from it. Do I need to take a lock explicitly while updating the state or not? I need to understand. Please let me know. Thanks, -- Garvit Sharma github.com/garvitlnmiit/ No Body is a Scholar

Regarding Keyed State of Connected Stream

2018-05-24 Thread Garvit Sharma
Hi, Let's consider, I have two keyed streams one for rules and another for data and I have created a connected stream. I am maintaining a managed keyed state (ValueState), rules stream will keep updating the state and data stream will keep reading from it. Do I need to take a lock explicitly

Re: Cannot used managed keyed state in sink

2018-02-25 Thread Tzu-Li (Gordon) Tai
Hi, Are you using a `RichSinkFunction`? There you should have access to the runtime context, with which you can use to access keyed state. Cheers, Gordon On 24 February 2018 at 3:04:55 PM, Kien Truong (duckientru...@gmail.com) wrote: Hi, It seems that I can't used managed keyed state inside

Cannot used managed keyed state in sink

2018-02-23 Thread Kien Truong
Hi, It seems that I can't used managed keyed state inside sink functions. Is this unsupported with Flink 1.4 or am I doing something wrong ? Regards, Kien ⁣Sent from TypeApp ​

Re: Keyed State

2018-01-14 Thread Fabian Hueske
Sure. A CoProcessFunction is executed in parallel by running multiple instances of the CoProcessFunction. Each instance runs in a separate TaskManager slot and is responsible for a subset of all keys. Keys are assigned by hash partitioning to function instances. All calls to methods of an

Re: Keyed State

2018-01-13 Thread Boris Lublinsky
Thanks Fabian Can you also explain a thread model? What is the paralelization between multiple keys? Is it hash based? And also are processElement 1 and 2 are executed on different threads? More specifically if processElement is an order of magnitude slower then 2, will it impact processElement

Re: Keyed State

2018-01-13 Thread Boris Lublinsky
Thanks Fabian Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/ > On Jan 13, 2018, at 11:06 AM, Fabian Hueske wrote: > > Yes, that is correct. > You can treat keyed ValueState like a distributed hashmap and Flink routes > all state

Re: Keyed State

2018-01-13 Thread Fabian Hueske
Yes, that is correct. You can treat keyed ValueState like a distributed hashmap and Flink routes all state accesses to the entry for the key of the current record. 2018-01-13 17:07 GMT+01:00 Boris Lublinsky : > Can you, please confirm that my understanding is

Keyed State

2018-01-13 Thread Boris Lublinsky
Can you, please confirm that my understanding is correct? I am looking at the documentation on low level joins https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#low-level-joins

Re: How to deal with java.lang.IllegalStateException: Could not initialize keyed state backend after a code update

2017-11-28 Thread Federico D'Ambrosio
p functions So, I guess the culprit was either the window/maxBy operator or the cassandra sink. I guess the window/maxBy operator, since the initialization of a keyed state is specified. I'm attaching the complete log. Cheers, Federico 2017-11-28 15:32 GMT+01:00 Tzu-Li (Gordon) Tai <tzuli.

Re: How to deal with java.lang.IllegalStateException: Could not initialize keyed state backend after a code update

2017-11-28 Thread Tzu-Li (Gordon) Tai
not initialize keyed state backend. Caused by: java.io.InvalidClassException: lab.vardata.events.Event; local class incompatible: stream classdesc serial        VersionUID = 8728793377341765980, local class serialVersionUID = -4253404384162522764 because I have

How to deal with java.lang.IllegalStateException: Could not initialize keyed state backend after a code update

2017-11-28 Thread Federico D'Ambrosio
Hi, I recently had to do a code update of a long running Flink Stream job (1.3.2) and on the restart from the savepoint I had to deal with: java.lang.IllegalStateException: Could not initialize keyed state backend. Caused by: java.io.InvalidClassException: lab.vardata.events.Event; local class

Could not initialize keyed state backend on restart from checkpoint

2017-10-24 Thread Federico D'Ambrosio
Hello everyone, while trying to restart a flink job from an externalized checkpoint I'm getting the following exception: java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState

Re: Could not initialize keyed state backend.

2017-09-18 Thread Stefan Richter
Hi, that is not the case, and it also would not make too much sense if you think about restoring from a checkpoint in case of a machine failure. Is there a section in the Flink documentation that was confusing and has brought you to this assumption? Best, Stefan > Am 18.09.2017 um 15:56

Re: Could not initialize keyed state backend.

2017-09-18 Thread PedroMrChaves
Hello, I thought that the checkpoints would be propagated to all the machines in the cluster when using a local filesystem. Thank you, Regards. - Best Regards, Pedro Chaves -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Could not initialize keyed state backend.

2017-09-18 Thread Stefan Richter
teException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.in

Re: Flink 1.2-SNAPSHOT fails to initialize keyed state backend

2016-09-28 Thread Stephan Ewen
Walther <twal...@apache.org> wrote: > Hi Seth, > > the 1.2-SNAPSHOT is very fragile at the moment because of multiple big > changes for dynamic scaling. > Maybe Stefan (in CC) has an idea what is happening with the keyed state > backend here? > > Timo > > > Am 27/

Re: Flink 1.2-SNAPSHOT fails to initialize keyed state backend

2016-09-27 Thread Timo Walther
Hi Seth, the 1.2-SNAPSHOT is very fragile at the moment because of multiple big changes for dynamic scaling. Maybe Stefan (in CC) has an idea what is happening with the keyed state backend here? Timo Am 27/09/16 um 16:14 schrieb swiesman: Hi all, I am working on an analytics project

Flink 1.2-SNAPSHOT fails to initialize keyed state backend

2016-09-27 Thread swiesman
. Whenever the application runs for more than approximately 5 - 10 minutes I get an RuntimeException: Could not initialize keyed state backed caused by a class not found exception, full stack trace posted below. This confuses me because for the program to have run and output data for the first 5 - 10

<    1   2