Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-05 Thread via GitHub


pnowojski merged PR #23908:
URL: https://github.com/apache/flink/pull/23908


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-05 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1442770858


##
docs/content/docs/ops/traces.md:
##
@@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint 
once checkpoint reach
   
   
 
-  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
+  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker

Review Comment:
   I've added line breaks to the scope. More readable and the effect is the 
same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-05 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1442749273


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -155,7 +167,12 @@ public CheckpointStatsSnapshot createSnapshot() {
 counts.createSnapshot(),
 summary.createSnapshot(),
 history.createSnapshot(),
-latestRestoredCheckpoint);
+jobInitializationMetricsBuilder
+.map(
+JobInitializationMetricsBuilder
+
::buildRestoredCheckpointStats)
+.orElse(Optional.empty())
+.orElse(null));

Review Comment:
   Ahh, nice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-05 Thread via GitHub


StefanRRichter commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1442679306


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -730,57 +739,87 @@ void restoreInternal() throws Exception {
 
getEnvironment().getMetricGroup().getIOMetricGroup().markTaskInitializationStarted();
 LOG.debug("Initializing {}.", getName());
 
-operatorChain =
-
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
-? new FinishedOperatorChain<>(this, recordWriter)
-: new RegularOperatorChain<>(this, recordWriter);
-mainOperator = operatorChain.getMainOperator();
+SubTaskInitializationMetricsBuilder initializationMetrics =
+new SubTaskInitializationMetricsBuilder(
+SystemClock.getInstance().absoluteTimeMillis());
+try {
+operatorChain =
+
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
+? new FinishedOperatorChain<>(this, recordWriter)
+: new RegularOperatorChain<>(this, recordWriter);
+mainOperator = operatorChain.getMainOperator();
 
-getEnvironment()
-.getTaskStateManager()
-.getRestoreCheckpointId()
-.ifPresent(restoreId -> latestReportCheckpointId = restoreId);
+getEnvironment()

Review Comment:
   Argh, yes sorry reviewing in GH sometimes stinks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-05 Thread via GitHub


StefanRRichter commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1442676217


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -155,7 +167,12 @@ public CheckpointStatsSnapshot createSnapshot() {
 counts.createSnapshot(),
 summary.createSnapshot(),
 history.createSnapshot(),
-latestRestoredCheckpoint);
+jobInitializationMetricsBuilder
+.map(
+JobInitializationMetricsBuilder
+
::buildRestoredCheckpointStats)
+.orElse(Optional.empty())
+.orElse(null));

Review Comment:
   Ah I see, makes sense. You could probably rewrite it as flatMap().orElse() 
though...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


rkhachatryan commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1442110980


##
docs/content/docs/ops/traces.md:
##
@@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint 
once checkpoint reach
   
   
 
-  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
+  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker

Review Comment:
   Good idea, should work!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441999383


##
docs/content/docs/ops/traces.md:
##
@@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint 
once checkpoint reach
   
   
 
-  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
+  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker

Review Comment:
   use an abbreviation `o.a.f`? 樂 Could we use a tooltip to expand the scope? 
For example display `(...).CheckpointStatsTracker` that expands in the tooltip 
to the fully qualified name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


rkhachatryan commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441837132


##
docs/content/docs/ops/traces.md:
##
@@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint 
once checkpoint reach
   
   
 
-  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
+  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker

Review Comment:
   NIT: this field is so wide that I don't see the Description column; I can 
scroll horizontally, but would only do that if I know there is a column on the 
right.
   
   Don't know how to fix that easily though 路‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441683484


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -155,7 +167,12 @@ public CheckpointStatsSnapshot createSnapshot() {
 counts.createSnapshot(),
 summary.createSnapshot(),
 history.createSnapshot(),
-latestRestoredCheckpoint);
+jobInitializationMetricsBuilder
+.map(
+JobInitializationMetricsBuilder
+
::buildRestoredCheckpointStats)
+.orElse(Optional.empty())
+.orElse(null));

Review Comment:
   Because
   1. builder can be empty
   2. if it's not empty,  `buildRestoredCheckpointStats` can also return empty 
result
   
   This is an equivalent of
   ```
   if (builder.isEmpty()) {
 return null;
   }
   Optional stats = builder.get().buildRestoredCheckpointStats();
   return stats.orElse(null);
   ```
   
   Because of that `map` here returns `Optional>` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441782436


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##
@@ -159,6 +160,11 @@ public ExecutionGraph createAndRestoreExecutionGraph(
 }
 };
 
+int totalNumberOfSubTasks =
+StreamSupport.stream(jobGraph.getVertices().spliterator(), 
false)
+.mapToInt(jobVertex -> jobVertex.getParallelism())

Review Comment:
   Thanks! fixed and I've added a new test 
`org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactoryTest#testCheckpointStatsTrackerUpdatedWithNewParallelism`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441711709


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -730,57 +739,87 @@ void restoreInternal() throws Exception {
 
getEnvironment().getMetricGroup().getIOMetricGroup().markTaskInitializationStarted();
 LOG.debug("Initializing {}.", getName());
 
-operatorChain =
-
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
-? new FinishedOperatorChain<>(this, recordWriter)
-: new RegularOperatorChain<>(this, recordWriter);
-mainOperator = operatorChain.getMainOperator();
+SubTaskInitializationMetricsBuilder initializationMetrics =
+new SubTaskInitializationMetricsBuilder(
+SystemClock.getInstance().absoluteTimeMillis());
+try {
+operatorChain =
+
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
+? new FinishedOperatorChain<>(this, recordWriter)
+: new RegularOperatorChain<>(this, recordWriter);
+mainOperator = operatorChain.getMainOperator();
 
-getEnvironment()
-.getTaskStateManager()
-.getRestoreCheckpointId()
-.ifPresent(restoreId -> latestReportCheckpointId = restoreId);
+getEnvironment()

Review Comment:
   try/catch was added here



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubTaskInitializationMetricsBuilder.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A builder for {@link SubTaskInitializationMetrics}.
+ *
+ * This class is not thread safe, but parts of it can actually be used from 
different threads.

Review Comment:
   Is it needed? Just construct the object and optionally call setters.
   
   note, 
   > This class is not thread safe, but parts of it can actually be used 
from different threads.
   
   was a copy/paste artifact. I removed that comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


rkhachatryan commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441711155


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java:
##
@@ -159,6 +160,11 @@ public ExecutionGraph createAndRestoreExecutionGraph(
 }
 };
 
+int totalNumberOfSubTasks =
+StreamSupport.stream(jobGraph.getVertices().spliterator(), 
false)
+.mapToInt(jobVertex -> jobVertex.getParallelism())

Review Comment:
   We need to read the DoP from the store to account for rescaling; 
   otherwise, on downscaling, we'll get the old (high) DoP and won't report 
downscaling: 
   ```suggestion
   .mapToInt(
   jobVertex ->
   vertexParallelismStore
   
.getParallelismInfo(jobVertex.getID())
   .getParallelism())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441710885


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -343,6 +376,62 @@ public void reportIncompleteStats(
 }
 }
 
+public void reportInitializationStartTs(long initializationStartTs) {
+jobInitializationMetricsBuilder =
+Optional.of(
+new JobInitializationMetricsBuilder(
+totalNumberOfSubTasks, initializationStartTs));
+}
+
+public void reportInitializationMetrics(SubTaskInitializationMetrics 
initializationMetrics) {
+statsReadWriteLock.lock();
+try {
+if (!jobInitializationMetricsBuilder.isPresent()) {
+LOG.warn(
+"Attempted to report SubTaskInitializationMetrics [{}] 
without jobInitializationMetricsBuilder present",
+initializationMetrics);
+return;
+}
+jobInitializationMetricsBuilder
+.get()
+.reportInitializationMetrics(initializationMetrics);
+if (jobInitializationMetricsBuilder.get().isComplete()) {
+
traceInitializationMetrics(jobInitializationMetricsBuilder.get().build());
+}
+} catch (Exception ex) {
+LOG.warn("Fail to log SubTaskInitializationMetrics[{}]", ex, 
initializationMetrics);
+} finally {
+statsReadWriteLock.unlock();
+}
+}
+
+private void traceInitializationMetrics(JobInitializationMetrics 
jobInitializationMetrics) {
+SpanBuilder span =
+Span.builder(CheckpointStatsTracker.class, "JobInitialization")
+
.setStartTsMillis(jobInitializationMetrics.getStartTs())
+.setEndTsMillis(jobInitializationMetrics.getEndTs())
+.setAttribute(
+"initializationStatus",
+jobInitializationMetrics.getStatus().name());
+for (JobInitializationMetrics.SumMaxDuration duration :
+jobInitializationMetrics.getDurationMetrics().values()) {
+setDurationSpanAttribute(span, duration);
+}
+if (jobInitializationMetrics.getCheckpointId() != 
JobInitializationMetrics.UNSET) {
+span.setAttribute("checkpointId", 
jobInitializationMetrics.getCheckpointId());
+}
+if (jobInitializationMetrics.getStateSize() != 
JobInitializationMetrics.UNSET) {
+span.setAttribute("fullSize", 
jobInitializationMetrics.getStateSize());
+}
+metricGroup.addSpan(span);
+}
+
+private void setDurationSpanAttribute(
+SpanBuilder span, JobInitializationMetrics.SumMaxDuration 
duration) {
+span.setAttribute("max" + duration.getName(), duration.getMax());
+span.setAttribute("sum" + duration.getName(), duration.getSum());

Review Comment:
   路 those seemed to be the most important to me. `avg` can be derived from the 
`sum`. `max` is better when looking for bottlenecks. `sum` is better when 
looking at optimising the amount of used resources during the process as a 
whole.
   
   `sum` can also be helpful on it's own, like for example "sum of local state 
size vs sum of remote state size" during recovery. 
   
   Reporting unaggregated values would require child spans from subtasks to be 
reported. This was discussed as the future work.
   
   I also didn't want to spam the metrics too much. Always if needed we can add 
`avg` or sth else later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441707237


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -343,6 +376,62 @@ public void reportIncompleteStats(
 }
 }
 
+public void reportInitializationStartTs(long initializationStartTs) {
+jobInitializationMetricsBuilder =
+Optional.of(
+new JobInitializationMetricsBuilder(
+totalNumberOfSubTasks, initializationStartTs));
+}
+
+public void reportInitializationMetrics(SubTaskInitializationMetrics 
initializationMetrics) {
+statsReadWriteLock.lock();
+try {
+if (!jobInitializationMetricsBuilder.isPresent()) {
+LOG.warn(
+"Attempted to report SubTaskInitializationMetrics [{}] 
without jobInitializationMetricsBuilder present",
+initializationMetrics);
+return;
+}
+jobInitializationMetricsBuilder
+.get()
+.reportInitializationMetrics(initializationMetrics);
+if (jobInitializationMetricsBuilder.get().isComplete()) {
+
traceInitializationMetrics(jobInitializationMetricsBuilder.get().build());
+}
+} catch (Exception ex) {
+LOG.warn("Fail to log SubTaskInitializationMetrics[{}]", ex, 
initializationMetrics);

Review Comment:
   I don't think it's worth optimising this, as that's not an expected 
exception, but a bug in the code. 
   
   Also currently there is an invariant `jobInitializationMetricsBuilder` is 
not empty while metric could be collected. I'm not sure if change that you 
propose would work out of the box, or if some `checkState` or sth else would 
fail. So I would prefer to leave it as it is to save up a bit of work/time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441683484


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -155,7 +167,12 @@ public CheckpointStatsSnapshot createSnapshot() {
 counts.createSnapshot(),
 summary.createSnapshot(),
 history.createSnapshot(),
-latestRestoredCheckpoint);
+jobInitializationMetricsBuilder
+.map(
+JobInitializationMetricsBuilder
+
::buildRestoredCheckpointStats)
+.orElse(Optional.empty())
+.orElse(null));

Review Comment:
   Because
   1. builder can be empty
   2. if it's not empty,  `buildRestoredCheckpointStats` can also return empty 
result
   
   This is an equivalent of
   ```
   if (builder.isEmpty()) {
 return null;
   }
   Optional stats = builder.get().buildRestoredCheckpointStats();
   return stats.orElse(null);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1441676954


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -86,9 +88,10 @@ public class CheckpointStatsTracker {
 
 private final JobID jobID;
 private final MetricGroup metricGroup;
+private int totalNumberOfSubTasks;
 
-/** The latest restored checkpoint. */
-@Nullable private RestoredCheckpointStats latestRestoredCheckpoint;
+private Optional 
jobInitializationMetricsBuilder =

Review Comment:
   Matter of preferences. `Optional`'s compile time checks > not very usable 
`@Nullable`'s warnings in our code base.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-03 Thread via GitHub


StefanRRichter commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1440576635


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -155,7 +167,12 @@ public CheckpointStatsSnapshot createSnapshot() {
 counts.createSnapshot(),
 summary.createSnapshot(),
 history.createSnapshot(),
-latestRestoredCheckpoint);
+jobInitializationMetricsBuilder
+.map(
+JobInitializationMetricsBuilder
+
::buildRestoredCheckpointStats)
+.orElse(Optional.empty())
+.orElse(null));

Review Comment:
   Why 2x orElse?



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -86,9 +88,10 @@ public class CheckpointStatsTracker {
 
 private final JobID jobID;
 private final MetricGroup metricGroup;
+private int totalNumberOfSubTasks;
 
-/** The latest restored checkpoint. */
-@Nullable private RestoredCheckpointStats latestRestoredCheckpoint;
+private Optional 
jobInitializationMetricsBuilder =

Review Comment:
   Why this? I think optional isn't even intended to be used for fields.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -730,57 +739,87 @@ void restoreInternal() throws Exception {
 
getEnvironment().getMetricGroup().getIOMetricGroup().markTaskInitializationStarted();
 LOG.debug("Initializing {}.", getName());
 
-operatorChain =
-
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
-? new FinishedOperatorChain<>(this, recordWriter)
-: new RegularOperatorChain<>(this, recordWriter);
-mainOperator = operatorChain.getMainOperator();
+SubTaskInitializationMetricsBuilder initializationMetrics =
+new SubTaskInitializationMetricsBuilder(
+SystemClock.getInstance().absoluteTimeMillis());
+try {
+operatorChain =
+
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
+? new FinishedOperatorChain<>(this, recordWriter)
+: new RegularOperatorChain<>(this, recordWriter);
+mainOperator = operatorChain.getMainOperator();
 
-getEnvironment()
-.getTaskStateManager()
-.getRestoreCheckpointId()
-.ifPresent(restoreId -> latestReportCheckpointId = restoreId);
+getEnvironment()

Review Comment:
   Revert formatting changes in this file?



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobInitializationMetricsBuilder.java:
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import 
org.apache.flink.runtime.checkpoint.JobInitializationMetrics.SumMaxDuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.checkpoint.JobInitializationMetrics.UNSET;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+class JobInitializationMetricsBuilder {
+private static final Logger LOG =
+LoggerFactory.getLogger(JobInitializationMetricsBuilder.class);
+
+private final List reportedMetrics = new 
ArrayList<>();
+private final int totalNumberOfSubTasks;
+private final long startTs;
+private Optional stateSize = Optional.empty();

Review Comment:
   nit: Optional abused for field.



##

Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2023-12-11 Thread via GitHub


flinkbot commented on PR #23908:
URL: https://github.com/apache/flink/pull/23908#issuecomment-1850185232

   
   ## CI report:
   
   * a2f09acdf5ef582f951296d7b5ed5f65a87d049f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33775] Report JobInitialization traces [flink]

2023-12-11 Thread via GitHub


pnowojski opened a new pull request, #23908:
URL: https://github.com/apache/flink/pull/23908