Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-24 Thread via GitHub


afedulov commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1825920714

   Thanks @mxm and @gyfora !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-24 Thread via GitHub


mxm merged PR #711:
URL: https://github.com/apache/flink-kubernetes-operator/pull/711


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-24 Thread via GitHub


mxm commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1404530659


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -142,6 +142,27 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Expected restart time to be used until the 
operator can determine it reliably from history.");
 
+public static final ConfigOption PREFER_TRACKED_RESTART_TIME =
+autoScalerConfig("restart.time-tracking.enabled")
+.booleanType()
+.defaultValue(false)

Review Comment:
   This should probably be enabled by default. We can still change this default 
though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-24 Thread via GitHub


mxm commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1825907949

   Great work! Thanks @afedulov!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-24 Thread via GitHub


mxm commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1825846863

   The docs need to be regenerated after the last config change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-24 Thread via GitHub


mxm commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1404437571


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-24 Thread via GitHub


mxm commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r140447


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403659023


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   I guess it could be argued that we can always send statistics about previous 
rescalings as metrics, but why do we then keep vertex-based scalingHistory?



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   I guess it could be argued that we can always send statistics about previous 
rescalings as metrics, but why do we then keep the vertex-based scalingHistory?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403659023


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   I guess it could be argued that we can always send previous stats as 
metrics, but why do we then keep vertex-based scalingHistory?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403656624


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   General question - we are missing the job-based data structure that keeps 
track of the past rescaling details. Should be need to add something in the 
future, with the current structure it is as simple as adding data fields to the 
ScalingRecord. I am OK with removing the map, but the question is - are we sure 
we won't require something similar in the future anyways?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


gyfora commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403619658


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   `estimatedEMA = estimatedEMA * x + newMeasurmenet * (1-x)`
   we could start with `x=0.5` which is pretty aggressive smoothing but should 
be fine give we don't have many scalings



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403615188


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   EMA requires to know how many previous records in the window are taken into 
account because this determines the weight coefficient of the new record 
(smoothing factor). The length of the "window" of observation is also supposed 
to be fixed and not span all time from the beginning, so I am not sure we are 
talking about the classic definition of EMA. Maybe you could sketch the 
calculation you have in mind?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403615188


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   EMA requires to know how many previous records in the window are taken into 
account because this determines the weight coefficient of the new record 
(smoothing factor). The length of the "window" of observation is also supposed 
to be fixed and not span all time from the beginning, so I am not sure we are 
talking about the classic definition of EMA. Maybe you could sketch a 
calculation you have in mind?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


gyfora commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403612096


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   I saw this but this doesn't mention anything about history etc and refers to 
an offline discussion :) 
   Combined with the other comment related to the trimming issue (losing the 
restart info after 24h) I think the exponential moving avg is a simpler and 
slightly more robust initial approach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403603955


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();

Review Comment:
   I guess 
[this](https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1815381974)
 got buried in the notifications:
   ```
   @gyfora me and Max briefly discussed offline and came to the conclusion that 
starting with
evaluating the maximum restart time capped by the RESTART_TIME setting is 
probably 
   good enough for the first step. It has the benefit of giving the most 
"conservative" 
   evaluation and we can add the moving average after some baseline testing. 
What do you think?
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


gyfora commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403543895


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


gyfora commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403540366


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


gyfora commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1403521447


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);

Review Comment:
   Should we maybe log this on debug? so we have an overview if we want to 
debug this?



##
docs/layouts/shortcodes/generated/auto_scaler_configuration.html:
##
@@ -80,6 +80,18 @@
 Duration
 Expected restart time to be used until the operator can 
determine it reliably from history.
 
+
+job.autoscaler.restart.time.tracked.enabled
+false
+Boolean
+Whether to use the actually observed rescaling restart times 
instead of the fixed 'job.autoscaler.restart.time' configuration. If set to 
true, the maximum restart duration over a number of samples will be used. The 
value of 'job.autoscaler.restart.time' will act as an upper bound.
+
+
+job.autoscaler.restart.time.tracked.limit

Review Comment:
   Sorry for the late comment, we could consider changing this to 
`job.autoscaler.restart.time-tracking.enabled` 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-23 Thread via GitHub


mxm commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1824128401

   Thanks!
   
   > On Nov 22, 2023, at 18:21, Alexander Fedulov ***@***.***> wrote:
   > 
   > 
   > @afedulov commented on this pull request.
   > 
   > In 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java 
:
   > 
   > > +.allMatch(
   > +entry -> {
   > +var vertexID = entry.getKey();
   > +var targetParallelism = entry.getValue();
   > +var actualParallelism = 
actualParallelisms.getOrDefault(vertexID, -1);
   > +return 
actualParallelism.equals(targetParallelism);
   > +});
   > +}
   > +
   > +/**
   > + * Retrieves the maximum restart time in seconds based on the 
provided configuration and scaling
   > + * records. Defaults to the RESTART_TIME from configuration if the 
PREFER_TRACKED_RESTART_TIME
   > + * option is set to false, or if there are no tracking records 
available. Otherwise, the maximum
   > + * observed restart time is capped by the MAX_RESTART_TIME.
   > + */
   > +public double getMaxRestartTimeSecondsOrDefault(Configuration conf) {
   > 93ae804 

   > —
   > Reply to this email directly, view it on GitHub 
,
 or unsubscribe 
.
   > You are receiving this because you were mentioned.
   > 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-22 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1402445510


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-22 Thread via GitHub


mxm commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1402359219


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -142,6 +142,27 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Expected restart time to be used until the 
operator can determine it reliably from history.");
 
+public static final ConfigOption PREFER_TRACKED_RESTART_TIME =
+autoScalerConfig("restart.time.tracked.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Whether to use the actually observed rescaling 
restart times instead of the fixed '"

Review Comment:
   ```suggestion
   "Whether to use the actual observed rescaling 
restart times instead of the fixed '"
   ```



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -142,6 +142,27 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Expected restart time to be used until the 
operator can determine it reliably from history.");
 
+public static final ConfigOption PREFER_TRACKED_RESTART_TIME =
+autoScalerConfig("restart.time.tracked.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Whether to use the actually observed rescaling 
restart times instead of the fixed '"
++ RESTART_TIME.key()
++ "' configuration. If set to true, the 
maximum restart duration over a number of "
++ "samples will be used. The value of '"
++ RESTART_TIME.key()
++ "' will act as an upper bound.");
+
+public static final ConfigOption TRACKED_RESTART_TIME_LIMIT =
+autoScalerConfig("restart.time.tracked.limit")
+.durationType()
+.defaultValue(Duration.ofMinutes(15))
+.withDescription(
+"Maximum cap for the calculated restart time when 
'"

Review Comment:
   ```suggestion
   "Maximum cap for the observed restart time when 
'"
   ```



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-21 Thread via GitHub


gyfora commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1400811069


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -142,9 +142,11 @@ public double 
getMaxRestartTimeSecondsOrDefault(Configuration conf) {
 }
 }
 }
+long restartTimeFromConfig = 
conf.get(AutoScalerOptions.RESTART_TIME).toSeconds();
+long maxRestartTimeFromConfig = 
conf.get(AutoScalerOptions.MAX_RESTART_TIME).toSeconds();
 return maxRestartTime == -1
 ? restartTimeFromConfig

Review Comment:
   sounds good!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-21 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1400542478


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -142,9 +142,11 @@ public double 
getMaxRestartTimeSecondsOrDefault(Configuration conf) {
 }
 }
 }
+long restartTimeFromConfig = 
conf.get(AutoScalerOptions.RESTART_TIME).toSeconds();
+long maxRestartTimeFromConfig = 
conf.get(AutoScalerOptions.MAX_RESTART_TIME).toSeconds();
 return maxRestartTime == -1
 ? restartTimeFromConfig

Review Comment:
   Synced with Max offline - we decided to rename the option to 
`TRACKED_RESTART_TIME_LIMIT` to make the scope clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-21 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1400399560


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -154,6 +154,15 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 + RESTART_TIME.key()
 + "' will act as an upper bound.");
 
+public static final ConfigOption MAX_RESTART_TIME =
+autoScalerConfig("restart.time.max")
+.durationType()
+.defaultValue(Duration.ofMinutes(30))

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-21 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1400396766


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -142,9 +142,11 @@ public double 
getMaxRestartTimeSecondsOrDefault(Configuration conf) {
 }
 }
 }
+long restartTimeFromConfig = 
conf.get(AutoScalerOptions.RESTART_TIME).toSeconds();
+long maxRestartTimeFromConfig = 
conf.get(AutoScalerOptions.MAX_RESTART_TIME).toSeconds();
 return maxRestartTime == -1
 ? restartTimeFromConfig

Review Comment:
   @gyfora I'd like to hear your thoughts on this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-21 Thread via GitHub


mxm commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1400366983


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -154,6 +154,15 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 + RESTART_TIME.key()
 + "' will act as an upper bound.");
 
+public static final ConfigOption MAX_RESTART_TIME =
+autoScalerConfig("restart.time.max")
+.durationType()
+.defaultValue(Duration.ofMinutes(30))

Review Comment:
   We have internal overrides for all configuration defaults, but I don't think 
this overly pessimistic value is a good default for the general public. I would 
set this to not more than 15 minutes, even that is still conservative.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -142,9 +142,11 @@ public double 
getMaxRestartTimeSecondsOrDefault(Configuration conf) {
 }
 }
 }
+long restartTimeFromConfig = 
conf.get(AutoScalerOptions.RESTART_TIME).toSeconds();
+long maxRestartTimeFromConfig = 
conf.get(AutoScalerOptions.MAX_RESTART_TIME).toSeconds();
 return maxRestartTime == -1
 ? restartTimeFromConfig

Review Comment:
   I think we should always cap the RESTART_TIME by the configured 
MAX_RESTART_TIME (if configured).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1819644904

   @mxm @gyfora I added the proposed `MAX_RESTART_TIME` option with the default 
value of 30minutes:
   
https://github.com/apache/flink-kubernetes-operator/pull/711/commits/ae79219025729d90a74a5cf1eb765f14c5b45f33#diff-c63eb3ce6a3229c2e0e664d8032f966df8e0b90a67b5e1119d8ec1d862599348
   
   To keep things simple for the users I decided to only cap by it only when 
the `PREFER_TRACKED_RESTART_TIME` options is enabled. Otherwise, the restart 
time is completely governed by the current `RESTART_TIME` setting, even it it 
exceeds the `MAX_RESTART_TIME`. 
   
https://github.com/apache/flink-kubernetes-operator/pull/711/commits/ae79219025729d90a74a5cf1eb765f14c5b45f33#diff-2241a4e55db07ff736cd174042179254aea0ca8d6884635b1146e5b5a9c17633R57
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


mxm commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1819233149

   What you describe is already the case. Any deployment which currently takes 
longer than the configured rescale time, will not scale accurately. By adding 
the feature in its current state, we are conservatively addressing those 
deployment which come back up much quicker. In our environment, typical rescale 
time is 1 minute or less.
   
   I think your request requires adding a new configuration `MAX_RESTART_TIME` 
because we want to (1) have a good default when we haven't yet estimated the 
rescale time (e.g. first scaling) via the existing setting, and (2) cap the 
determined rescale time for safety reasons (e.g. unexpected cluster downtime).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


mxm commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399290650


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399219639


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -159,19 +161,24 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 throws Exception {
 
 var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+var jobTopology = collectedMetrics.getJobTopology();
 
 if (collectedMetrics.getMetricHistory().isEmpty()) {
 return;
 }
 LOG.debug("Collected metrics: {}", collectedMetrics);
 
-var evaluatedMetrics = evaluator.evaluate(ctx.getConfiguration(), 
collectedMetrics);
+var now = clock.instant();
+// Scaling tracking data contains previous restart times that are 
taken into account
+var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+var evaluatedMetrics =
+evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, 
scalingTracking);

Review Comment:
   The way the scalingHistory is currently structured, this would currently 
lead to duplicating the restart time unnecessarily per job vertex.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


mxm commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399142219


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


mxm commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399140704


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399100532


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -159,19 +161,24 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 throws Exception {
 
 var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+var jobTopology = collectedMetrics.getJobTopology();
 
 if (collectedMetrics.getMetricHistory().isEmpty()) {
 return;
 }
 LOG.debug("Collected metrics: {}", collectedMetrics);
 
-var evaluatedMetrics = evaluator.evaluate(ctx.getConfiguration(), 
collectedMetrics);
+var now = clock.instant();
+// Scaling tracking data contains previous restart times that are 
taken into account
+var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+var evaluatedMetrics =
+evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, 
scalingTracking);

Review Comment:
   How do you propose to make the association between the rescaling decision 
and the collected metrics record? My understanding is that `updateMetrics` 
writes collected metrics irrespective from the rescalings, so we need to 
somehow decide which history entry to associate the restart time with. To be 
honest this feels like misusing the data structure not for its original purpose 
and adding complexity that will be hard to interpret when someone reads the 
code. It is already hard enough to interpret what is going on in the 
`updateMetrics` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399107259


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399115644


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399107259


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 
getLatestScalingRecordEntry() {
+if (!scalingRecords.isEmpty()) {
+return Optional.of(scalingRecords.lastEntry());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Sets the end time for the latest scaling record if its parallelism 
matches the current job
+ * parallelism.
+ *
+ * @param now The current instant to be set as the end time of the scaling 
record.
+ * @param jobTopology The current job topology containing details of the 
job's parallelism.
+ * @param scalingHistory The scaling history.
+ * @return true if the end time is successfully set, false if the end time 
is already set, the
+ * latest scaling record cannot be found, or the target parallelism 
does not match the
+ * actual parallelism.
+ */
+public boolean setEndTimeIfTrackedAndParallelismMatches(
+Instant now,
+JobTopology jobTopology,
+Map> 
scalingHistory) {
+return getLatestScalingRecordEntry()
+.map(
+entry -> {
+var value = entry.getValue();
+var scalingTimestamp = entry.getKey();
+if (value.getEndTime() == null) {
+var targetParallelism =
+getTargetParallelismOfScaledVertices(
+scalingTimestamp, 
scalingHistory);
+var actualParallelism = 
jobTopology.getParallelisms();
+
+if (targetParallelismMatchesActual(
+targetParallelism, actualParallelism)) 
{
+value.setEndTime(now);
+return true;
+}
+}
+return false;
+})
+.orElse(false);
+}
+
+private static Map 
getTargetParallelismOfScaledVertices(
+Instant scalingTimestamp,
+Map> 
scalingHistory) {
+return scalingHistory.entrySet().stream()
+.filter(entry -> 
entry.getValue().containsKey(scalingTimestamp))
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry ->
+entry.getValue()
+.get(scalingTimestamp)
+.getNewParallelism()));
+}
+
+private static boolean targetParallelismMatchesActual(
+Map 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399100532


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -159,19 +161,24 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 throws Exception {
 
 var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+var jobTopology = collectedMetrics.getJobTopology();
 
 if (collectedMetrics.getMetricHistory().isEmpty()) {
 return;
 }
 LOG.debug("Collected metrics: {}", collectedMetrics);
 
-var evaluatedMetrics = evaluator.evaluate(ctx.getConfiguration(), 
collectedMetrics);
+var now = clock.instant();
+// Scaling tracking data contains previous restart times that are 
taken into account
+var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+var evaluatedMetrics =
+evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, 
scalingTracking);

Review Comment:
   How do you propose to make the association between the rescaling decision 
and the collected metrics record? My understanding is that `updateMetrics` 
writes collected metrics irrespective from the rescalings, so we need to 
somehow decide which history entry to associate the restart time with. To be 
honest this feels like misusing the data structure not for its original purpose 
and adding complexity that will be hard to interpret when reading someone reads 
the code. It is already hard enough to interpret what is going on in the 
`updateMetrics` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399100532


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -159,19 +161,24 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 throws Exception {
 
 var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+var jobTopology = collectedMetrics.getJobTopology();
 
 if (collectedMetrics.getMetricHistory().isEmpty()) {
 return;
 }
 LOG.debug("Collected metrics: {}", collectedMetrics);
 
-var evaluatedMetrics = evaluator.evaluate(ctx.getConfiguration(), 
collectedMetrics);
+var now = clock.instant();
+// Scaling tracking data contains previous restart times that are 
taken into account
+var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+var evaluatedMetrics =
+evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, 
scalingTracking);

Review Comment:
   How do you propose to make the association between the rescaling decision 
and the collected metrics record? My understanding is that `updateMetrics` 
writes collected metrics irrespective from the rescalings, so we need to 
somehow decide which history entry to associate the restart time with. To be 
honest this feels like misusing the data structure not for its original 
purposes and making code harder to interpret.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399055639


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -159,19 +161,24 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 throws Exception {
 
 var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+var jobTopology = collectedMetrics.getJobTopology();
 
 if (collectedMetrics.getMetricHistory().isEmpty()) {
 return;
 }
 LOG.debug("Collected metrics: {}", collectedMetrics);
 
-var evaluatedMetrics = evaluator.evaluate(ctx.getConfiguration(), 
collectedMetrics);
+var now = clock.instant();
+// Scaling tracking data contains previous restart times that are 
taken into account
+var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+var evaluatedMetrics =
+evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, 
scalingTracking);

Review Comment:
   I don't think this works with the current metrics scoping since it would 
lead to duplicating the restart time per vertex and we are striving to 
minimizing the size of the config map. Also, instead of just checking one 
tracking entry, should we instead iterate over all records of this metric 
across all vertices and take the maximum over that? Or just trust one 
observation? If we'll just use one observation, why would we need this data in 
every vertex if we know it is supposed to be the same for all vertices, why 
store it at the vertex level? 
   
   Ultimately if comes to the fact that we need two different views - one from 
the vertex perspective and one from the overall job and trying to put data for 
one into the other causes a lot of issues and does not seem justified, 
especially since we do not win anything in terms of the configmap size but make 
things worse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399055639


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -159,19 +161,24 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 throws Exception {
 
 var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+var jobTopology = collectedMetrics.getJobTopology();
 
 if (collectedMetrics.getMetricHistory().isEmpty()) {
 return;
 }
 LOG.debug("Collected metrics: {}", collectedMetrics);
 
-var evaluatedMetrics = evaluator.evaluate(ctx.getConfiguration(), 
collectedMetrics);
+var now = clock.instant();
+// Scaling tracking data contains previous restart times that are 
taken into account
+var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+var evaluatedMetrics =
+evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, 
scalingTracking);

Review Comment:
   I don't think this works with the current metrics scoping since it would 
lead to duplicating the restart time per vertex and we are striving to 
minimizing the size of the config map. Also, instead of just checking one 
tracking entry, should we instead iterate over all records of this metric 
across all vertices and take the maximum over that? Or just trust one 
observation? If we'll just use one observation, why would we need this data in 
every vertex if we know it is supposed to be the same for all vertices, why 
store it at the vertex level? 
   
   Ultimately if comes to the fact that we need two different views - one from 
the vertex perspective and one from the overall job and trying to put data for 
one into the other causes a lot of issues and does not seem justified, 
especially since we do not win anything in terms of the configmap size but make 
things worse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399055639


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -159,19 +161,24 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 throws Exception {
 
 var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+var jobTopology = collectedMetrics.getJobTopology();
 
 if (collectedMetrics.getMetricHistory().isEmpty()) {
 return;
 }
 LOG.debug("Collected metrics: {}", collectedMetrics);
 
-var evaluatedMetrics = evaluator.evaluate(ctx.getConfiguration(), 
collectedMetrics);
+var now = clock.instant();
+// Scaling tracking data contains previous restart times that are 
taken into account
+var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+var evaluatedMetrics =
+evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, 
scalingTracking);

Review Comment:
   I don't think this works with the current metrics scoping since it would 
lead to duplicating the restart time per vertex and we are striving to 
minimizing the size of the config map. Also, instead of just checking one 
tracking entry, should we instead iterate over all records of this metric 
across all vertices and take the maximum over that? Or just trust one 
observation? If we'll just use one observation, why would we need this data in 
every vertex if we know it is supposed to be the same for all vertices, why 
store it at the vertex level? 
   
   Ultimately if comes to the fact that we need two different views - one from 
the vertex perspective and one from the overall job and trying to put data for 
one into the other causes a lot of issues and does not seem justified, 
especially since we do not win anything but make things worse in terms of the 
configmap size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399049527


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingRecord.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Instant;
+
+/**
+ * Class for tracking scaling details, including time it took for the job to 
transition to the
+ * target parallelism.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ScalingRecord {
+private Instant endTime;
+}

Review Comment:
   My idea was to have structures in place that would be easily extensible. If 
we are sure we do not not ever need to store anything else here we can store 
the instant directly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399055639


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -159,19 +161,24 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 throws Exception {
 
 var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+var jobTopology = collectedMetrics.getJobTopology();
 
 if (collectedMetrics.getMetricHistory().isEmpty()) {
 return;
 }
 LOG.debug("Collected metrics: {}", collectedMetrics);
 
-var evaluatedMetrics = evaluator.evaluate(ctx.getConfiguration(), 
collectedMetrics);
+var now = clock.instant();
+// Scaling tracking data contains previous restart times that are 
taken into account
+var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+var evaluatedMetrics =
+evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, 
scalingTracking);

Review Comment:
   I don't think this works with the current metrics scoping since it would 
lead to duplicating the restart time per vertex and we are striving to 
minimizing the size of the config map. Also, instead of just checking one 
tracking entry, should we instead iterate over all records of this metric 
across all vertices and take the maximum over that? Or just trust one 
observation? If we'll just use one observation, why would we need this data in 
every vertex if we know it is supposed to be the same for all vertices, why 
store it at the vertex level? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399055639


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -159,19 +161,24 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 throws Exception {
 
 var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+var jobTopology = collectedMetrics.getJobTopology();
 
 if (collectedMetrics.getMetricHistory().isEmpty()) {
 return;
 }
 LOG.debug("Collected metrics: {}", collectedMetrics);
 
-var evaluatedMetrics = evaluator.evaluate(ctx.getConfiguration(), 
collectedMetrics);
+var now = clock.instant();
+// Scaling tracking data contains previous restart times that are 
taken into account
+var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+var evaluatedMetrics =
+evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, 
scalingTracking);

Review Comment:
   I don't think this works with the current metrics scoping since it would 
lead to duplicating the restart time per vertex and we are striving to 
minimizing the size of the config map. Also, instead of just checking one 
tracking entry, should we instead iterate over all records of this metric 
across all vertices and take the maximum over that? Or just trust one 
observation? If we'll just use one observation, why would we need this data in 
every vertex and if we know it is supposed to be the same for all vertices, why 
store it at the vertex level? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


afedulov commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1399049527


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingRecord.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Instant;
+
+/**
+ * Class for tracking scaling details, including time it took for the job to 
transition to the
+ * target parallelism.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ScalingRecord {
+private Instant endTime;
+}

Review Comment:
   My idea was to have structures in place that would be easily extensible. If 
we are sure we do not not ever need to store anything else here we can store 
the instant directly. 
   For example, if we had a wrapper on top of the history's jobvertex-scoped 
maps object, if would have been much easier to add the job-scoped restart time 
on top.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-20 Thread via GitHub


mxm commented on code in PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#discussion_r1398939440


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -159,19 +161,24 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 throws Exception {
 
 var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+var jobTopology = collectedMetrics.getJobTopology();
 
 if (collectedMetrics.getMetricHistory().isEmpty()) {
 return;
 }
 LOG.debug("Collected metrics: {}", collectedMetrics);
 
-var evaluatedMetrics = evaluator.evaluate(ctx.getConfiguration(), 
collectedMetrics);
+var now = clock.instant();
+// Scaling tracking data contains previous restart times that are 
taken into account
+var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
+var evaluatedMetrics =
+evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, 
scalingTracking);

Review Comment:
   Could we treat `scalingTracking` as a metric, just like the other scaling 
metrics? Apart from being consistent with the current code, this also has some 
advantages because the scaling time will be reported as a metric out of the box.
   
   So basically I'm asking to insert the current rescale time into the 
`collectedMetrics` map.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingRecord.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Instant;
+
+/**
+ * Class for tracking scaling details, including time it took for the job to 
transition to the
+ * target parallelism.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ScalingRecord {
+private Instant endTime;
+}

Review Comment:
   Could we just store Instant directly and remove this wrapper class?



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingTracking.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/** Stores rescaling related information for the job. */
+@Experimental
+@Data
+@NoArgsConstructor
+@Builder
+public class ScalingTracking {
+
+/** Details related to recent rescaling operations. */
+private final TreeMap scalingRecords = new 
TreeMap<>();
+
+public void addScalingRecord(Instant startTimestamp, ScalingRecord 
scalingRecord) {
+scalingRecords.put(startTimestamp, scalingRecord);
+}
+
+@JsonIgnore
+public Optional> 

Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-16 Thread via GitHub


afedulov commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1815381974

   @mxm @gyfora the PR is ready for review
   @gyfora me and Max briefly discussed offline and came to the conclusion that 
starting with evaluating the maximum restart time capped by the RESTART_TIME 
setting is probably good enough for the first step. It has the benefit of 
giving the most "conservative" evaluation and we can add the moving average 
after some baseline testing. What do you think? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-15 Thread via GitHub


mxm commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1812285350

   > Sounds reasonable. If we are optimizing state size at this level, how 
about we also remove `RECOMMENDED_PARALLELISM` and `PARALLELISM` from the 
history metrics, since they are duplicated as `ScalingSummary` top level fields?
   
   We can file a JIRA for this. This requires more changes since those metrics 
are also directly reported via Prometheus and removing them would be unexpected 
to users.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-14 Thread via GitHub


afedulov commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1811283881

   @gyfora I'd like to propose to use this format:
```
   scalingTracking: |   
  
--- 
   
   scalingRecords:
 "2023-11-14T20:20:39.250503Z": 
   endTime: "2023-11-14T20:22:04.360972Z"
  "2023-11-14T20:22:04.360972Z": 
   endTime: "2023-11-14T20:23:39.00Z"
   ```
   Some reasoning:
   - It allows to associate tracked restart time with scaling history, which 
could be useful for debugging. Without the start timestamp, this won't be 
possible.
   - It lays out a data structure that is open for extension. This makes it 
trivial to add additional fields if we ever need some other scaling- and not 
vertex-scoped data which we currently do not anticipate in the future (the 
decision to move it into a different config map in such case is a separate 
issue).
   - The overhead is pretty minimal. 
   
   @mxm, I removed the redundant tracking of the target parallelism from the 
`ScalingRecords`. The current implementation completely relies on the 
`scalingHistory` for this. I had change the `scaleResource` signature to avoid 
fetching the tracking and the history twice (we need both in the 
JobAutoScalerImpl now). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-14 Thread via GitHub


afedulov commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1810934027

   Sounds reasonable. If we are optimizing state size at this level, how about 
we also remove `RECOMMENDED_PARALLELISM` and `PARALLELISM` from the history 
metrics, since they are duplicated as `ScalingSummary` top level fields? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-14 Thread via GitHub


mxm commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1810690806

   > Got it, this is how it actually works currently (modulo additional target 
vertices parallelism tracking that I will attempt to fetch from the history 
instead) - I thought it was more about the configmap vs in-memory discussion.
   
   Yes, I saw that. I think Gyula is right though, that it's enough to use 
in-memory state to start tracking a scaling execution. When the scaling is 
completed, we then persist the duration in the ConfigMap. The reason is that 
keeping track of the start time when we begin tracking would only be useful if 
we could use that state to recover, e.g. in case of downtime. However, that's 
only useful when the job does not complete rescaling during the downtime. If it 
completes before, we don't know how long a scaling took when we come back up. 
For simplicity, I think it makes sense to just track the scaling start time in 
memory and then persist the actual rescale time in the ConfigMap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-14 Thread via GitHub


afedulov commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1810668734

   >I meant a flag in the ConfigMap. I guess something like startTime: endTime 
would be sufficient? The endTime would initially be null.
   
   Got it, this is how it actually works currently (modulo additional vertices 
parallelism tracking that I will attempt to fetch from the history instead) - I 
thought it was more about the configmap vs in-memory discussion.
   
   > In the past we ran into the 1MB size limit for large pipelines with 
hundreds of vertices. 
   
   I see, let's I'll try to keep the size to the minimum then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-14 Thread via GitHub


mxm commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1810649406

   >Are we talking about having a field in the ScalingExecutor? Because 
fetching scalingHistory won't be sufficient - we need some indication that a 
new rescaling was applied by the time we see the transition into RUNNING with 
the expected parallelism. This "flag" then needs to be cleaned.
   
   I meant a flag in the ConfigMap. I guess something like `startTime: endTime` 
would be sufficient? The endTime would initially be null.
   
   >General question: it feels like we are very focused on optimizing the size 
of this particular configmap. Can't we create a separate configmap, if this is 
a concern? 
   
   In the past we ran into the 1MB size limit for large pipelines with hundreds 
of vertices. That's why we don't want to increase the state size too much. We 
also added compression because of this. Of course we can add a new configmap. 
We have just not come around to do it and it might not be a good idea to put 
too much load on etcd. While it would be nice to have a separate config map for 
the scaling metrics and the scaling history, but it is much simpler to do it in 
one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-14 Thread via GitHub


afedulov commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1810584902

   @gyfora @mxm thanks for the feedback.
   >We already have the start time of the last scaling in memory via the 
scaling history. We can then keep note of the end time once we detect the 
scaling is over. That leaves a little bit of error in case of downtime of the 
operator which will produce a long rescaling time. I think that should be fine 
though, since we cap at the max configured rescale time.
   
   Are we talking about having a field in the `ScalingExecutor`? Because 
fetching `scalingHistory` won't be sufficient - we need some indication that a 
**new** rescaling was applied by the time we see the transition into RUNNING 
with the expected parallelism. This "flag" then needs to be cleaned. 
   General question: it feels like we are very focused on optimizing the size 
of this particular configmap. Can't we create a separate configmap, if this is 
a concern? We already store so much stuff in the `flink-config-autoscaling-job` 
configmap (see the amount of logging configuration alone) that it feels like 
obsession over whether we store two timestamps or one for 5-10 values we'll 
keep in state at the expense of significantly increased code complexity and 
potentially loosing restart data is not worth it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler [flink-kubernetes-operator]

2023-11-14 Thread via GitHub


mxm commented on PR #711:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/711#issuecomment-1810524690

   Commenting on some of the requests: 
   > 1. Remove the newly introduced configs, this should be automatic and 
always on
   
   I think it is fair to have a ON/OFF switch. It should be on by default but 
we want to keep the ability to roll back to the old behavior.
   
   > 2. Track the start/end times for the restart in memory and only record 
the `observed_restart_time` in the autoscaler state store. This way we add 
minimal extra state that is easy to implement.
   
   We already have the start time of the last scaling in memory via the scaling 
history. We can then keep note of the end time once we detect the scaling is 
over. That leaves a little bit of error in case of downtime of the operator 
which will produce a long rescaling time. I think that should be fine though, 
since we cap at the max configured rescale time. 

   > 3. Instead of computing restart time from a fixed number of samples, 
use a simple moving average: `observed_restart_time = (prev_observed + 
new_observed) / 2`
   
   I think we can do an exponentially weighted average.
   
   > 
   > 4. During autoscaler logic use: `restart_time = min(conf_restart_time, 
observed_restart_time)`
   
   +1
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org