[jira] [Created] (FLINK-25958) OOME Checkpoints & Savepoints were shown as COMPLETE in Flink UI

2022-02-04 Thread Victor Xu (Jira)
Victor Xu created FLINK-25958:
-

 Summary: OOME Checkpoints & Savepoints were shown as COMPLETE in 
Flink UI
 Key: FLINK-25958
 URL: https://issues.apache.org/jira/browse/FLINK-25958
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.13.5
 Environment: Ververica Platform 2.6.2

Flink 1.13.5
Reporter: Victor Xu
 Attachments: JIRA-1.jpg

Flink job was running but the checkpoints & savepoints were failing all the 
time due to OOM Exception. However, the Flink UI showed COMPLETE for those 
checkpoints & savepoints.

For example (checkpoint 39 & 40):
{noformat}
2022-01-27 02:41:39,969 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 39 (type=CHECKPOINT) @ 1643251299952 for job 
ab2217e5ce144087bbddf6bd6c3
668eb.
2022-01-27 02:43:19,678 WARN  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Error while processing AcknowledgeCheckpoint message
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete the 
pending checkpoint 39. Failure reason: Failure to finalize checkpoint.
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1227)
 ~[flink-dist_2.12-1.13.5-stream2.jar:1.13.5-stream2]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1072)
 ~[flink-dist_2.12-1.13.5-stream2.jar:1.13.5-stream2]
        at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
 ~[flink-dist_2.12-1.13.5-stream2.jar:1.13.5-stream2]
        at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
 ~[flink-dist_2.12-1.13.5-stream2.jar:1.13.5-s
tream2]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.IllegalArgumentException: Self-suppression not permitted
        at java.lang.Throwable.addSuppressed(Throwable.java:1054) ~[?:?]
        at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:627)
 ~[flink-dist_2.12-1.13.5-stream2.jar:1.13.5-stream2]
        at 
com.ververica.platform.flink.ha.kubernetes.KubernetesHaCheckpointStore.serializeCheckpoint(KubernetesHaCheckpointStore.java:204)
 ~[vvp-flink-ha-kubernetes-flink113-1.4-20211013.09
1138-2.jar:?]
        at 
com.ververica.platform.flink.ha.kubernetes.KubernetesHaCheckpointStore.addCheckpoint(KubernetesHaCheckpointStore.java:83)
 ~[vvp-flink-ha-kubernetes-flink113-1.4-20211013.091138-2.
jar:?]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1209)
 ~[flink-dist_2.12-1.13.5-stream2.jar:1.13.5-stream2]
        ... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
2022-01-27 03:41:39,970 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 40 (type=CHECKPOINT) @ 1643254899952 for job 
ab2217e5ce144087bbddf6bd6c3
668eb.
2022-01-27 03:43:22,326 WARN  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Error while processing AcknowledgeCheckpoint message
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete the 
pending checkpoint 40. Failure reason: Failure to finalize checkpoint.
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1227)
 ~[flink-dist_2.12-1.13.5-stream2.jar:1.13.5-stream2]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1072)
 ~[flink-dist_2.12-1.13.5-stream2.jar:1.13.5-stream2]
        at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
 ~[flink-dist_2.12-1.13.5-stream2.jar:1.13.5-stream2]
        at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
 ~[flink-dist_2.12-1.13.5-stream2.jar:1.13.5-s
tream2]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at 

[jira] [Commented] (FLINK-24543) Zookeeper connection issue causes inconsistent state in Flink

2021-10-14 Thread Victor Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17428726#comment-17428726
 ] 

Victor Xu commented on FLINK-24543:
---

After the KeeperException$NodeExistsException was thrown, the 
*ZooKeeperStateHandleStore.indicatesPossiblyInconsistentState(e)* returned 
*false* so the state cleaning logic cleaned both the metadata file and the 
corresponding checkpoints data files which then led to the 
FileNotFoundException:
{noformat}
Caused by: java.io.FileNotFoundException: 
/mnt/flink/.flink/ha/flink-.../completedCheckpoint42683d1121c7 (No such file or 
directory)
{noformat}
The reason was that *NodeExistsException* was included in the 
PRE_COMMIT_EXCEPTIONS. But shouldn't we consider this NodeExistsException as an 
inconsistent state and remove it from the PRE_COMMIT_EXCEPTIONS? It's different 
from other ZK commit exceptions (e.g. AuthFailedException, BadVersionException, 
etc.) as others mean that the ZK write is failed but this one means the ZK node 
is already there, though we don't know if it's the correct one or not. So I 
think we should mark it as an inconsistent state and don't remove the 
corresponding metadata and checkpoint data files.

 

 

> Zookeeper connection issue causes inconsistent state in Flink
> -
>
> Key: FLINK-24543
> URL: https://issues.apache.org/jira/browse/FLINK-24543
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.13.2
>Reporter: Jun Qin
>Priority: Major
>
> Env: Flink 1.13.2 with Zookeeper HA
> Here is what happened:
> {code:bash}
> # checkpoint 1116 was triggered
> 2021-10-09 00:16:49,555 [Checkpoint Timer] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 1116 (type=CHECKPOINT) @ 1633738609538 for job 
> a8a4fb85b681a897ba118db64333c9e5.
> # a few seconds later, zookeeper connection suspended, it turned out to be a 
> disk issue at zookeeper side caused slow fsync and commit)
> 2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.
> 2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Connection to ZooKeeper suspended. The contender LeaderContender: 
> DefaultDispatcherRunner no longer participates in the leader election.
> # job was switching to suspended
> 2021-10-09 00:16:58,564 [flink-akka.actor.default-dispatcher-61] INFO  
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
> Disconnect job manager 
> b79b79fe513fb5f47e7bf447b7d94...@akka.tcp://flink@flink-...-jobmanager:50010/user/rpc/jobmanager_3
>  for job a8a4fb85b681a897ba118db64333c9e5 from the resource manager.
> 2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-92] INFO  
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
> Registering job manager 
> b79b79fe513fb5f47e7bf447b7d94...@akka.tcp://flink@flink-...-jobmanager:50010/user/rpc/jobmanager_3
>  for job a8a4fb85b681a897ba118db64333c9e5.
> 2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping 
> the JobMaster for job Flink ...(a8a4fb85b681a897ba118db64333c9e5).
> 2021-10-09 00:16:58,567 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job Flink 
> ... (a8a4fb85b681a897ba118db64333c9e5) switched from state RUNNING to 
> SUSPENDED.
> 2021-10-09 00:16:58,570 [flink-akka.actor.default-dispatcher-86] INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
> Closing 
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/a8a4fb85b681a897ba118db64333c9e5/job_manager_lock'}.
> 2021-10-09 00:16:58,667 [flink-akka.actor.default-dispatcher-92] INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
> a8a4fb85b681a897ba118db64333c9e5 reached terminal state SUSPENDED.
> # zookeeper connector restored
> 2021-10-09 00:17:08,225 [Curator-ConnectionStateManager-0] INFO  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Connection to ZooKeeper was reconnected. Leader election can be restarted.
> # received checkpoint acknowledgement, trying to finalize it, then failed to 
> add to zookeeper due to KeeperException$NodeExistsException
> 2021-10-09 00:17:14,382 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: ... 
> (1/5) (09d25852e3e206d6b7fe0d6bc965870f) switched from RUNNING to 

[jira] [Created] (FLINK-24460) Rocksdb Iterator Error Handling Improvement

2021-10-06 Thread Victor Xu (Jira)
Victor Xu created FLINK-24460:
-

 Summary: Rocksdb Iterator Error Handling Improvement
 Key: FLINK-24460
 URL: https://issues.apache.org/jira/browse/FLINK-24460
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.14.0
Reporter: Victor Xu


In FLINK-9373, we introduced RocksIteratorWrapper which was a wrapper around 
RocksIterator to check the iterator status for all the methods. At that time, 
it was required because the iterator may pass the blocks or files it had 
difficulties in reading (because of IO errors, data corruptions, or other 
issues) and continue with the next available keys. *The status flag may not be 
OK, even if the iterator is valid.*

However, the above behaviour changed after 
[3810|https://github.com/facebook/rocksdb/pull/3810] was merged on May 17, 2018:

 *- If the iterator is valid, the status() is guaranteed to be OK;*

 *- If the iterator is not valid, there are two possibilities:*

    *1) We have reached the end of the data. And in this case, status() is OK;*

    *2) There is an error. In this case, status() is not OK;*

More information can be found here: 
https://github.com/facebook/rocksdb/wiki/Iterator#error-handling

Thus, it should be safe to proceed with other operations (e.g. seek, next, 
seekToFirst, seekToLast, seekForPrev, and prev) without checking status(). And 
we only need to check the status if the iterator is invalid. After the change, 
there will be less status() native calls and could theoretically improve 
performance.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24432) RocksIteratorWrapper.seekToLast() calls the wrong RocksIterator method

2021-09-30 Thread Victor Xu (Jira)
Victor Xu created FLINK-24432:
-

 Summary: RocksIteratorWrapper.seekToLast() calls the wrong 
RocksIterator method
 Key: FLINK-24432
 URL: https://issues.apache.org/jira/browse/FLINK-24432
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.14.0
Reporter: Victor Xu


The RocksIteratorWrapper is a wrapper of RocksIterator to do additional status 
check for all the methods. However, there's a typo that 
RocksIteratorWrapper.*seekToLast*() method calls RocksIterator's 
*seekToFirst*(), which is obviously wrong. I guess this issue wasn't found 
before as it was only referenced in the 
RocksTransformingIteratorWrapper.seekToLast() method and nowhere else.
{code:java}
@Override
public void seekToFirst() {
 iterator.seekToFirst();
 status();
}

@Override
public void seekToLast() {
 iterator.seekToFirst();
 status();
}{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)