[GitHub] [flink] rkhachatryan commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric

2023-07-18 Thread via GitHub


rkhachatryan commented on code in PR #22772:
URL: https://github.com/apache/flink/pull/22772#discussion_r1266570337


##
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java:
##
@@ -128,6 +134,29 @@ public TaskIOMetricGroup(TaskMetricGroup parent) {
 this.mailboxLatency =
 histogram(MetricNames.MAILBOX_LATENCY, new 
DescriptiveStatisticsHistogram(60));
 this.mailboxSize = gauge(MetricNames.MAILBOX_SIZE, new SizeGauge());
+this.initializationDuration =
+counter(
+CHECKPOINT_RESTORE_TIME,
+new Counter() {
+@Override
+public void inc() {}
+
+@Override
+public void inc(long n) {}
+
+@Override
+public void dec() {}
+
+@Override
+public void dec(long n) {}
+
+@Override
+public long getCount() {
+return getTaskInitializationDuration();

Review Comment:
   Nice! :+1: 



##
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java:
##
@@ -196,6 +225,28 @@ public void markTaskStart() {
 this.taskStartTime = System.currentTimeMillis();
 }
 
+public void markTaskInitializationStarted() {
+if (taskInitializeTime == INVALID_TIMESTAMP) {
+this.taskInitializeTime = System.currentTimeMillis();
+}
+}
+
+/**
+ * Returns the duration of time required for a task's 
restoring/initialization, which reaches
+ * its maximum when the task begins running and remains constant 
throughout the task's running.
+ * Return 0 when the task is not in initialization/running status.
+ */
+@VisibleForTesting
+public long getTaskInitializationDuration() {
+if (taskInitializeTime != INVALID_TIMESTAMP) {
+return taskStartTime == INVALID_TIMESTAMP
+? System.currentTimeMillis() - taskInitializeTime
+: taskStartTime - taskInitializeTime;
+} else {
+return 0L;
+}

Review Comment:
   NIT: 
   ```
   if (taskInitializeTime == INVALID_TIMESTAMP) {
   return 0L;
   } else if (taskStartTime == INVALID_TIMESTAMP) {
   return System.currentTimeMillis() - taskInitializeTime;
   } else {
   return taskStartTime - taskInitializeTime;
   }
   ```
   Without nested ternary it seems easier to read to me



##
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java:
##
@@ -128,6 +134,29 @@ public TaskIOMetricGroup(TaskMetricGroup parent) {
 this.mailboxLatency =
 histogram(MetricNames.MAILBOX_LATENCY, new 
DescriptiveStatisticsHistogram(60));
 this.mailboxSize = gauge(MetricNames.MAILBOX_SIZE, new SizeGauge());
+this.initializationDuration =
+counter(
+CHECKPOINT_RESTORE_TIME,
+new Counter() {
+@Override
+public void inc() {}
+
+@Override
+public void inc(long n) {}
+
+@Override
+public void dec() {}
+
+@Override
+public void dec(long n) {}
+
+@Override
+public long getCount() {
+return getTaskInitializationDuration();
+}
+});
+this.taskStartTime = INVALID_TIMESTAMP;

Review Comment:
   This slightly changes the result of `getAccumulatedBusyTime` until the task 
is started.
   But the issue is that `getAccumulatedBusyTime` doesn't  check whether 
`taskStartTime > 0` - already in master.
   
   Do you agree? 
   If so, would you mind adding a hotfix commit to your PR to check 
`taskStartTime` in `getAccumulatedBusyTime`?



##
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java:
##
@@ -95,6 +95,15 @@ void testTaskIOMetricGroup() throws InterruptedException {
 .isGreaterThanOrEqualTo(softSleepTime);
 assertThat(taskIO.getHardBackPressuredTimePerSecond().getCount())
 .isGreaterThanOrEqualTo(hardSleepTime);
+
+// test initializing time
+assertThat(taskIO.getTaskInitializationDuration()).isEqualTo(0L);
+taskIO.markTaskInitializationStarted();
+Thread.sleep(1000);
+assertThat(taskIO.getTaskInitializationDuration()).isGreaterThan(0L);

Review Comment:
   We could avoid relying 

[GitHub] [flink] rkhachatryan commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric

2023-07-12 Thread via GitHub


rkhachatryan commented on code in PR #22772:
URL: https://github.com/apache/flink/pull/22772#discussion_r1261375807


##
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java:
##
@@ -67,10 +70,12 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup {
 private final Meter mailboxThroughput;
 private final Histogram mailboxLatency;
 private final SizeGauge mailboxSize;
+private final Gauge initializingTime;

Review Comment:
   I see. One way to still make it a counter would be to implement `View` + 
`Counter` which would monotinically increase the value in `update()`. That 
would keep semantics clearer.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric

2023-07-11 Thread via GitHub


rkhachatryan commented on code in PR #22772:
URL: https://github.com/apache/flink/pull/22772#discussion_r1260137154


##
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java:
##
@@ -67,10 +70,12 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup {
 private final Meter mailboxThroughput;
 private final Histogram mailboxLatency;
 private final SizeGauge mailboxSize;
+private final Gauge initializingTime;

Review Comment:
   Thanks! 
   I see in the latest update that the type of the field is still gauge and not 
counter. Is it intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22772: [FLINK-19010][metric] Introduce subtask level restore metric

2023-06-20 Thread via GitHub


rkhachatryan commented on code in PR #22772:
URL: https://github.com/apache/flink/pull/22772#discussion_r1234865147


##
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java:
##
@@ -67,10 +70,12 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup {
 private final Meter mailboxThroughput;
 private final Histogram mailboxLatency;
 private final SizeGauge mailboxSize;
+private final Gauge initializingTime;

Review Comment:
   1. Why is it a `Gauge` and not a `Counter`? Per my understanding, 
initialization duration either grows or stays the same. And we can start 
reporting some non-zero value as soon as `taskInitializeTime` is set.
   
   2. Should it be `initializationDuration` or something like that, instead of 
time?



##
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java:
##
@@ -196,6 +204,23 @@ public void markTaskStart() {
 this.taskStartTime = System.currentTimeMillis();
 }
 
+public void markTaskInitialize() {
+this.taskInitializeTime = System.currentTimeMillis();

Review Comment:
   It's not trivial to follow all the code paths inside `StreamTask` that could 
lead to this call (`restoreInternal` can be called from `invoke`).
   Should we make this update conditional - i.e. only update the field if it 
wasn't already updated?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -695,6 +695,7 @@ void restoreInternal() throws Exception {
 }
 isRestoring = true;
 closedOperators = false;
+
getEnvironment().getMetricGroup().getIOMetricGroup().markTaskInitialize();

Review Comment:
   This marks the **start** of the initialization, right?
   Should we call it something like `markTaskInitializationStarted`? (I was 
confused at first)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org