Re: [PR] [FLINK-20886] Support printing the thread dump when checkpoint expired [flink]

2025-01-16 Thread via GitHub


Zakelly closed pull request #23347: [FLINK-20886] Support printing the thread 
dump when checkpoint expired
URL: https://github.com/apache/flink/pull/23347


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20886] Support printing the thread dump when checkpoint expired [flink]

2025-01-16 Thread via GitHub


github-actions[bot] commented on PR #23347:
URL: https://github.com/apache/flink/pull/23347#issuecomment-2596380472

   This PR is being marked as stale since it has not had any activity in the 
last 180 days. 
   If you would like to keep this PR alive, please leave a comment asking for a 
review. 
   If the PR has merge conflicts, update it with the latest from the base 
branch.
   
   If you are having difficulty finding a reviewer, please reach out to the 
   community, contact details can be found here: 
https://flink.apache.org/what-is-flink/community/
   
   If this PR is no longer valid or desired, please feel free to close it. 
   If no activity occurs in the next 90 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20886] Support printing the thread dump when checkpoint expired [flink]

2023-12-16 Thread via GitHub


Zakelly commented on PR #23347:
URL: https://github.com/apache/flink/pull/23347#issuecomment-1858831032

   @Myasuka Sorry to bother you, it's just a ping...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20886] Support printing the thread dump when checkpoint expired [flink]

2023-10-31 Thread via GitHub


Zakelly commented on PR #23347:
URL: https://github.com/apache/flink/pull/23347#issuecomment-1786722061

   Hi @Myasuka , sorry for the late reply.
   
   I have changed my code. Currently, I provide two options named 
'cluster.thread-dump.log-level' and 
'execution.checkpointing.thread-dump-when-checkpoint-expired.enabled'. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20886] Support printing the thread dump when checkpoint expired [flink]

2023-10-16 Thread via GitHub


Myasuka commented on code in PR #23347:
URL: https://github.com/apache/flink/pull/23347#discussion_r1360198299


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##
@@ -314,4 +317,28 @@ public class ExecutionCheckpointingOptions {
 "Defines the maximum number of subtasks that share 
the same channel state file. "
 + "It can reduce the number of small files 
when enable unaligned checkpoint. "
 + "Each subtask will create a new channel 
state file when this is configured to 1.");
+
+public static final ConfigOption
+THREAD_DUMP_LOG_LEVEL_WHEN_CHECKPOINT_TIMEOUT =
+
key("execution.checkpointing.thread-dump-log-level-when-checkpoint-timeout")

Review Comment:
   First of all, I think not all scenarios obey the `execution.checkpointing` 
prefix. And I think making different log level for different scenarios would be 
a bit too complex. We only need two parameters no matter how many scenarios 
supported in my proposal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20886] Support printing the thread dump when checkpoint expired [flink]

2023-10-13 Thread via GitHub


Zakelly commented on code in PR #23347:
URL: https://github.com/apache/flink/pull/23347#discussion_r1357951738


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java:
##
@@ -431,4 +432,9 @@ public void setCheckpointResponder(CheckpointResponder 
checkpointResponder) {
 public ChannelStateWriteRequestExecutorFactory 
getChannelStateExecutorFactory() {
 return channelStateExecutorFactory;
 }
+
+@Override
+public CheckpointExpiredThreadDumper getCheckpointExpiredThreadDumper() {
+return null;

Review Comment:
   I removed all the explicit null value return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20886] Support printing the thread dump when checkpoint expired [flink]

2023-10-13 Thread via GitHub


Zakelly commented on code in PR #23347:
URL: https://github.com/apache/flink/pull/23347#discussion_r1357949989


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##
@@ -314,4 +317,28 @@ public class ExecutionCheckpointingOptions {
 "Defines the maximum number of subtasks that share 
the same channel state file. "
 + "It can reduce the number of small files 
when enable unaligned checkpoint. "
 + "Each subtask will create a new channel 
state file when this is configured to 1.");
+
+public static final ConfigOption
+THREAD_DUMP_LOG_LEVEL_WHEN_CHECKPOINT_TIMEOUT =
+
key("execution.checkpointing.thread-dump-log-level-when-checkpoint-timeout")

Review Comment:
   That's a very good point. And is it meaningful to define different log level 
for different scenarios? Thus I prefer change the option name to 
`execution.checkpointing.thread-dump-log-level.checkpoint-expired`, and provide 
different suffixes for various scenarios. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20886] Support printing the thread dump when checkpoint expired [flink]

2023-10-07 Thread via GitHub


Myasuka commented on code in PR #23347:
URL: https://github.com/apache/flink/pull/23347#discussion_r1349497297


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointExpiredThreadDumper.java:
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.util.JvmUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * A worker that can print thread dump to log when checkpoint aborted. There 
is a throttling
+ * strategy that it can only output once for each job and each checkpoint 
abortion.
+ */
+public class CheckpointExpiredThreadDumper {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointExpiredThreadDumper.class);
+
+/**
+ * When a checkpoint is notified timeout, the task manager could produce a 
thread dump to log
+ * for debugging purpose. This enum defines the log level of this thread 
dump.
+ */
+public enum ThreadDumpLogLevel {
+ERROR,
+WARN,
+INFO,
+DEBUG,
+TRACE,
+OFF
+}
+
+/** A map records the last dumped checkpoint for each job. */
+private final Map jobIdLastDumpCheckpointIdMap;
+
+public CheckpointExpiredThreadDumper() {
+this.jobIdLastDumpCheckpointIdMap = new HashMap<>();
+}
+
+/**
+ * Print the thread dump to log if needed. A thread dump is captured and 
print only when:
+ *
+ * 1. It is the first time to request a thread dump for this checkpoint 
within current job.
+ *
+ * 2. The configured log level is enabled.
+ *
+ * @param jobId id of the current job.
+ * @param checkpointId id of the aborted checkpoint.
+ * @param threadDumpLogLevelWhenAbort the configured log level for thread 
dump.
+ * @param maxDepthOfThreadDump the max stack depth for thread dump.
+ * @return true if really print the log.
+ */
+public boolean threadDumpIfNeeded(
+JobID jobId,
+long checkpointId,
+ThreadDumpLogLevel threadDumpLogLevelWhenAbort,
+int maxDepthOfThreadDump) {
+synchronized (this) {
+long lastCheckpointId = 
jobIdLastDumpCheckpointIdMap.computeIfAbsent(jobId, (k) -> -1L);
+if (lastCheckpointId >= checkpointId) {
+return false;
+}
+jobIdLastDumpCheckpointIdMap.put(jobId, checkpointId);
+}

Review Comment:
   I think we can use a ConcurrentHashMap of `jobIdLastDumpCheckpointIdMap` 
here to avoid expensive `synchronized (this)`.
   How about code below:
   
   ~~~java
   AtomicBoolean needDump = new AtomicBoolean(false);
   jobIdLastDumpCheckpointIdMap.compute(
   jobId,
   (k, v) -> {
   long lastCheckpointId = v == null ? -1L: v;
   needDump.set(lastCheckpointId < checkpointId);
   return Math.max(lastCheckpointId, checkpointId);
   });
   if (needDump.get()) {
   ..
   }
   ~~~
   



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java:
##
@@ -101,6 +101,14 @@ void notifyCheckpointSubsumed(
 /** Waits for all the pending checkpoints to finish their asynchronous 
step. */
 void waitForPendingCheckpoints() throws Exception;
 
+/**
+ * Check whether a checkpoint is registered (running) in this coordinator.
+ *
+ * @param checkpointId of the target checkpoint
+ * @return true if the checkpoint is running.
+ */
+boolean checkCheckpointRegistered(long checkpointId);

Review Comment:
   I think `check-check` looks a bit weird, how about `isCheckpointRegistered`?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -1316,6 +1334,24 @@ public void abortCheckpointOnBarrier(long checkpointId, 
CheckpointException caus
 subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, 
cause, operatorChain);
 }
 
+@Overrid

Re: [PR] [FLINK-20886] Support printing the thread dump when checkpoint expired [flink]

2023-10-07 Thread via GitHub


Zakelly commented on PR #23347:
URL: https://github.com/apache/flink/pull/23347#issuecomment-1751636305

   @Myasuka Could you please review this implementation? Alternatively, I am 
open to exploring another approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org