After set this configuration, I have some exceptions :
java.lang.Exception: Could not restore checkpointed state to operators and
functions
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor
for field
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
at
java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
... 3 more
Caused by: java.lang.IllegalArgumentException: illegal signature
at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122)
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
... 13 more
If I run the application in not-HA mode, there is no problem.
What can cause this kind of error ?
Thanks
Thomas
________________________________________De : Thomas Lamirault
[thomas.lamira...@ericsson.com]Envoyé : vendredi 19 février 2016 09:39À :
user@flink.apache.orgObjet : RE:Flink HAThanks for the quick reply !>
state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I
will modify the recovery.zookeeper.path.root> This is only relevant if you are
using YARN. From your completeYes, I omit to say we will use YARN.>Does this
help?Yes, a lot :-)Thomas________________________________________De : Ufuk
Celebi [u...@apache.org]Envoyé : jeudi 18 février 2016 19:19À :
user@flink.apache.orgObjet : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM,
Thomas Lamirault<thomas.lamira...@ericsson.com> wrote:> We are trying flink in
HA mode.Great to hear!> We set in the flink yaml :>> state.backend:
filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:<Our quorum>>>
recovery.zookeeper.path.root: <path>>> recovery.zookeeper.storageDir:
<storageDir>>> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be
state.backend.fs.checkpointdir.Just to check: Both
state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to
a file system path.> yarn.application-attempts: 100This is only relevant if you
are using YARN. From your complete> We want in case of application crash, the
pending window has to be restore> when the application restart.>> Pending data
are store into the <storageDir>/blob directory ?>> Also, we try to write a
script who restart the application after exceed the> max attempts, with the
last pending window.>> How can I do that ? A simple restart of the application
is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore
happens automatically to the most recently checkpointed state.Everything under
<storageDir> contains the actual state (includingJARs and JobGraph). ZooKeeper
contains pointers to this state.Therefore, you must not delete the ZooKeeper
root path.For the automatic restart, I would recommend using YARN. If you
wantto do it manually, you need to restart the JobManager/TaskManagerinstances.
The application will be recovered automatically fromZooKeeper/state
backend.Does this help?– Ufuk