Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
gyfora merged PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361898608 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java: ## @@ -47,6 +55,48 @@ public abstract class FlinkResourceContexthttps://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350028811 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361907227 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/EventCollector.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.autoscaler.JobAutoScalerContext; + +import lombok.Getter; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.time.Instant; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** The event handler for collecting events. */ +public class EventCollector> Review Comment: Thanks for these suggestions, these 3 classes are renamed at the last commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
gyfora commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361902603 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java: ## @@ -47,6 +55,48 @@ public abstract class FlinkResourceContext
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361898608 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java: ## @@ -47,6 +55,48 @@ public abstract class FlinkResourceContexthttps://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350028811 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
gyfora commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361881666 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java: ## @@ -47,6 +55,48 @@ public abstract class FlinkResourceContexthttp://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.realizer; + +import org.apache.flink.autoscaler.JobAutoScalerContext; + +import java.util.LinkedList; +import java.util.Map; + +/** The event handler for collecting scaling events. */ +public class ScalingRealizerCollector> Review Comment: Could we call this `TestingScalingRealizer` that way the intention is clear. ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/FlushCountableAutoscalerStateStore.java: ## @@ -15,19 +15,23 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler.state; -import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.autoscaler.JobAutoScalerContext; -import io.fabric8.kubernetes.client.KubernetesClient; -import lombok.SneakyThrows; +/** The state store counts the flush. */ +public class FlushCountableAutoscalerStateStore> Review Comment: Could we call this `TestingAutoscalerStateStore` that way the intention is clear. ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/EventCollector.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.autoscaler.JobAutoScalerContext; + +import lombok.Getter; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.time.Instant; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** The event handler for collecting events. */ +public class EventCollector> Review Comment: Could we call this `TestingEventCollector` that way the intention is clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1766110914 > I've also manually tested this change on Flink 1.18 (both reactive and non-reactive), and worked flawlessly. Thank you for the effort you've put into this, I really like the direction the autoscaler is heading. Thanks @mateczagany for the test! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mateczagany commented on PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1766010488 I've also manually tested this change on Flink 1.18 (both reactive and non-reactive), and worked flawlessly. Thank you for the effort you've put into this, I really like the direction the autoscaler is heading. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
gyfora commented on PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1765929234 > Hi @mxm @gyfora , thanks for your hard review! > > I addressed all comments of this PR so now. Some interfaces, code design and code style are improved, and no big changes during reviewing, I have updated the latest interfaces to the FLIP doc. > > About manual testing, I rebuilt the image to check whether it's fine just now, and run the `AutoscalingExample` with `autoscaling.yaml`. In general, it works well. There are some warn log due to these demo job still use the deprecated configuration key, and I have updated them in the last fix commit. > > Please help review again in your free time, big thanks~ ❤️ > > I can squash the last commit after your check. Also, please let me know if you think more manual testing is needed, thanks. > > ``` > 2023-10-17 15:33:56,930 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.enabled' instead of proper key 'job.autoscaler.enabled' > 2023-10-17 15:33:56,931 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.enabled' instead of proper key 'job.autoscaler.enabled' > 2023-10-17 15:33:57,205 o.a.f.a.ScalingMetricCollector [INFO ] [default.autoscaling-example] Job updated at 2023-10-17T07:32:56.133Z. Clearing metrics. > 2023-10-17 15:33:57,331 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.metrics.window' instead of proper key 'job.autoscaler.metrics.window' > 2023-10-17 15:33:57,332 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.stabilization.interval' instead of proper key 'job.autoscaler.stabilization.interval' > 2023-10-17 15:33:57,397 o.a.f.a.ScalingMetricCollector [WARN ] [default.autoscaling-example] pendingRecords metric for cbc357ccb763df2852fee8c4fc7d55f2 could not be found. Either a legacy source or an idle source. Assuming no pending records. > 2023-10-17 15:33:57,620 o.a.f.a.ScalingMetricCollector [INFO ] [default.autoscaling-example] Metric window not full until 2023-10-17T07:36:56.931915Z > 2023-10-17 15:33:57,623 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of proper key 'job.autoscaler.target.utilization.boundary' > 2023-10-17 15:33:57,624 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of proper key 'job.autoscaler.target.utilization.boundary' > ``` Thank you @1996fanrui for the persistence and all the nice work. I will do a final pass on this later today, let's try to merge this as soon as possible to unblock current work on the autoscaler :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1765865647 Hi @mxm @gyfora , thanks for your hard review! I addressed all comments of this PR so now. Some interfaces, code design and code style are improved, and no big changes during reviewing, I have updated the latest interfaces to the FLIP doc. About manual testing, I rebuilt the image to check whether it's fine just now, and run the `AutoscalingExample` with `autoscaling.yaml`. In general, it works well. There are some warn log due to these demo job still use the deprecated configuration key, and I have updated them in the last fix commit. Please help review again in your free time, big thanks~ ❤️ I can squash the last commit after your check. Also, please let me know if you think more manual testing is needed, thanks. ``` 2023-10-17 15:33:56,930 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.enabled' instead of proper key 'job.autoscaler.enabled' 2023-10-17 15:33:56,931 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.enabled' instead of proper key 'job.autoscaler.enabled' 2023-10-17 15:33:57,205 o.a.f.a.ScalingMetricCollector [INFO ] [default.autoscaling-example] Job updated at 2023-10-17T07:32:56.133Z. Clearing metrics. 2023-10-17 15:33:57,331 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.metrics.window' instead of proper key 'job.autoscaler.metrics.window' 2023-10-17 15:33:57,332 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.stabilization.interval' instead of proper key 'job.autoscaler.stabilization.interval' 2023-10-17 15:33:57,397 o.a.f.a.ScalingMetricCollector [WARN ] [default.autoscaling-example] pendingRecords metric for cbc357ccb763df2852fee8c4fc7d55f2 could not be found. Either a legacy source or an idle source. Assuming no pending records. 2023-10-17 15:33:57,620 o.a.f.a.ScalingMetricCollector [INFO ] [default.autoscaling-example] Metric window not full until 2023-10-17T07:36:56.931915Z 2023-10-17 15:33:57,623 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of proper key 'job.autoscaler.target.utilization.boundary' 2023-10-17 15:33:57,624 o.a.f.c.Configuration [WARN ] [default.autoscaling-example] Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of proper key 'job.autoscaler.target.utilization.boundary' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mateczagany commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1360856610 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -441,15 +442,21 @@ protected Collection queryAggregatedMetricNames( protected abstract Map> queryAllAggregatedMetrics( -AbstractFlinkResource cr, -FlinkService flinkService, -Configuration conf, +Context ctx, Map> filteredVertexMetricNames); -public void cleanup(AbstractFlinkResource cr) { -var resourceId = ResourceID.fromResource(cr); -histories.remove(resourceId); -availableVertexMetricNames.remove(resourceId); +public JobDetailsInfo getJobDetailsInfo( +JobAutoScalerContext context, Duration clientTimeout) throws Exception { Review Comment: That makes sense, I think you've explained it very well, thank you! I still think that the naming and the abstraction can be a bit confusing for a fresh pair of eyes, but as you've said and as I've said in my original comment, this is not related to your PR, so I think it's perfectly fine to leave it as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1360079090 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -441,15 +442,21 @@ protected Collection queryAggregatedMetricNames( protected abstract Map> queryAllAggregatedMetrics( -AbstractFlinkResource cr, -FlinkService flinkService, -Configuration conf, +Context ctx, Map> filteredVertexMetricNames); -public void cleanup(AbstractFlinkResource cr) { -var resourceId = ResourceID.fromResource(cr); -histories.remove(resourceId); -availableVertexMetricNames.remove(resourceId); +public JobDetailsInfo getJobDetailsInfo( +JobAutoScalerContext context, Duration clientTimeout) throws Exception { Review Comment: Thanks @mateczagany for this comment. IIUC, you mean `ScalingMetricCollector` is using the `RestClusterClient`, and `RestApiMetricsCollector` is totally based on `RestClusterClient`, so these 2 classes can be merged into one classes, right? If so, I try to explain the difference between : `RestApiMetricsCollector` and `ScalingMetricCollector`. - `RestApiMetricsCollector` calls `RestClusterClient`, and it's used to fetch specific metrics. - `ScalingMetricCollector` calls `RestClusterClient` and is not used to fetch specific metrics. - `RestClusterClient` is used in `ScalingMetricCollector` to get some job metadata, such as: `getJobDetailsInfo` to generate the `JobTopology`, `queryFilteredMetricNames`, `updateKafkaSourceMaxParallelisms`. - The JobTopology is the metadata of Job, and it cannot be fetched from metrics. That means the `RestClusterClient` is needed even if we query specific metrics from other system. Based on them, it may be better to keep `ScalingMetricCollector` as abstract class and not remove `RestApiMetricsCollector`. It's easy to fetch specific metrics from other system in the future. Also, we can see the `ScalingMetricCollector` also used the `RestClusterClient` on the current master branch. WDYT? And please correct me if my understanding is wrong, thanks~ [1] https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java#L368C45-L368C45 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1360032639 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.function.SupplierWithException; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +import javax.annotation.Nullable; + +/** An implementation of JobAutoscalerContext for Kubernetes. */ +public class KubernetesJobAutoScalerContext extends JobAutoScalerContext { + +private final AbstractFlinkResource resource; + +private final KubernetesClient kubernetesClient; + +public KubernetesJobAutoScalerContext( +JobID jobID, Review Comment: Thanks for pointing it out, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mateczagany commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1359305741 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.function.SupplierWithException; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +import javax.annotation.Nullable; + +/** An implementation of JobAutoscalerContext for Kubernetes. */ +public class KubernetesJobAutoScalerContext extends JobAutoScalerContext { + +private final AbstractFlinkResource resource; + +private final KubernetesClient kubernetesClient; + +public KubernetesJobAutoScalerContext( +JobID jobID, Review Comment: jobID should also be `@Nullable` ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -441,15 +442,21 @@ protected Collection queryAggregatedMetricNames( protected abstract Map> queryAllAggregatedMetrics( -AbstractFlinkResource cr, -FlinkService flinkService, -Configuration conf, +Context ctx, Map> filteredVertexMetricNames); -public void cleanup(AbstractFlinkResource cr) { -var resourceId = ResourceID.fromResource(cr); -histories.remove(resourceId); -availableVertexMetricNames.remove(resourceId); +public JobDetailsInfo getJobDetailsInfo( +JobAutoScalerContext context, Duration clientTimeout) throws Exception { Review Comment: I don't expect it to be fixed in this PR, just want to note that this is an abstract class with `RestApiMetricsCollector` extending it. This makes it seem like it would be possible to extend this class to be able to collect metrics via methods other than REST API. This would already be a challenge as there are several REST API calls already in this class, but with this PR, we will also tie this class to REST API with `JobAutoScalerContext`. Perhaps it makes sense to remove `RestApiMetricsCollector` at this point? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355991932 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() + .withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); + +public static final ConfigOption FLINK_CLIENT_TIMEOUT = +autoScalerConfig("flink.client.timeout") +.durationType() +.defaultValue(Duration.ofSeconds(10)) +.withDescription("The timeout for waiting the flink rest client to return."); Review Comment: Do you mean moving the `KubernetesOperatorConfigOptions#OPERATOR_FLINK_CLIENT_TIMEOUT` to here? It's a generic option for kubernetes operator, it's not only used for autoscaler, but also used for a lot of `AbstractFlinkService` methods. That's why I didn't move it here. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356620090 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java: ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.JobAutoScalerContext; + +import javax.annotation.Nullable; + +import java.time.Duration; + +/** + * Handler all loggable events during scaling. + * + * @param The job key. + * @param Instance of JobAutoScalerContext. + */ +@Experimental +public interface AutoScalerEventHandler> { + +/** + * Handle the event. + * + * @param interval When interval is great than 0, events that repeat within the interval will be + * ignored. + */ +void handleEvent( +Context context, +Type type, +String reason, +String message, +@Nullable String messageKey, +@Nullable Duration interval); + +/** The type of the events. */ +enum Type { +Normal, +Error Review Comment: Thanks for the feedback, I have reverted the `Error` to `Warning` by a force push. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356604652 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.realizer.ScalingRealizer; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism; +import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism; + +/** The default implementation of {@link JobAutoScaler}. */ +public class JobAutoScalerImpl> +implements JobAutoScaler { + +private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class); + +@VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError"; + +private final ScalingMetricCollector metricsCollector; +private final ScalingMetricEvaluator evaluator; +private final ScalingExecutor scalingExecutor; +private final AutoScalerEventHandler eventHandler; +private final ScalingRealizer scalingRealizer; +private final AutoScalerStateStore stateStore; + +@VisibleForTesting +final Map>> +lastEvaluatedMetrics = new ConcurrentHashMap<>(); + +@VisibleForTesting +final Map flinkMetrics = new ConcurrentHashMap<>(); + +public JobAutoScalerImpl( +ScalingMetricCollector metricsCollector, +ScalingMetricEvaluator evaluator, +ScalingExecutor scalingExecutor, +AutoScalerEventHandler eventHandler, +ScalingRealizer scalingRealizer, +AutoScalerStateStore stateStore) { +this.metricsCollector = metricsCollector; +this.evaluator = evaluator; +this.scalingExecutor = scalingExecutor; +this.eventHandler = eventHandler; +this.scalingRealizer = scalingRealizer; +this.stateStore = stateStore; +} + +@Override +public void scale(Context ctx) throws Exception { +var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx); + +try { +if (!ctx.getConfiguration().getBoolean(AUTOSCALER_ENABLED)) { +LOG.debug("Autoscaler is disabled"); +clearParallelismOverrides(ctx); +return; +} + +if (ctx.getJobStatus() != JobStatus.RUNNING) { +lastEvaluatedMetrics.remove(ctx.getJobKey()); +return; +} + +runScalingLogic(ctx, autoscalerMetrics); +stateStore.flush(ctx); Review Comment: Looks good to me. Just wanted to double-check! Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356605280 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java: ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.JobAutoScalerContext; + +import javax.annotation.Nullable; + +import java.time.Duration; + +/** + * Handler all loggable events during scaling. + * + * @param The job key. + * @param Instance of JobAutoScalerContext. + */ +@Experimental +public interface AutoScalerEventHandler> { + +/** + * Handle the event. + * + * @param interval When interval is great than 0, events that repeat within the interval will be + * ignored. + */ +void handleEvent( +Context context, +Type type, +String reason, +String message, +@Nullable String messageKey, +@Nullable Duration interval); + +/** The type of the events. */ +enum Type { +Normal, +Error Review Comment: Fine with me. Thanks for the explanation! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
XComp commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356512684 ## flink-autoscaler/pom.xml: ## @@ -45,6 +45,32 @@ under the License. provided + +org.projectlombok +lombok +${lombok.version} +provided + + + +org.junit.jupiter +junit-jupiter-params +test + + +
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356017440 ## flink-autoscaler/pom.xml: ## @@ -45,6 +45,32 @@ under the License. provided + +org.projectlombok +lombok +${lombok.version} +provided + + + +org.junit.jupiter +junit-jupiter-params +test + + +
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355987715 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Tests for {@link RestApiMetricsCollector}. */ +public class RestApiMetricsCollectorTest { + +@Test +public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception { +RestApiMetricsCollector> collector = +new RestApiMetricsCollector<>(); + +JobVertexID jobVertexID = new JobVertexID(); +Map flinkMetrics = +Map.of( +"a.pendingRecords", FlinkMetric.PENDING_RECORDS, +"b.pendingRecords", FlinkMetric.PENDING_RECORDS); +Map> metrics = Map.of(jobVertexID, flinkMetrics); + +List aggregatedMetricsResponse = +List.of( +new AggregatedMetric( +"a.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.), +new AggregatedMetric( +"b.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.), +new AggregatedMetric( +"c.unrelated", Double.NaN, Double.NaN, Double.NaN, 100.)); + +Configuration conf = new Configuration(); +RestClusterClient restClusterClient = +new RestClusterClient<>( +conf, +"test-cluster", +(c, e) -> new StandaloneClientHAServices("localhost")) { +@Override +public < +M extends MessageHeaders, +U extends MessageParameters, +R extends RequestBody, +P extends ResponseBody> +CompletableFuture sendRequest( +M messageHeaders, U messageParameters, R request) { +if (messageHeaders instanceof AggregatedSubtaskMetricsHeaders) { +return (CompletableFuture) +CompletableFuture.completedFuture( +new AggregatedMetricsResponseBody( + aggregatedMetricsResponse)); +} +return (CompletableFuture) + CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); +} +}; + +JobID jobID = new JobID(); +JobAutoScalerContext context = +new JobAutoScalerContext<>( +jobID, +jobID, +
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355181704 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Tests for {@link RestApiMetricsCollector}. */ +public class RestApiMetricsCollectorTest { + +@Test +public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception { +RestApiMetricsCollector> collector = +new RestApiMetricsCollector<>(); + +JobVertexID jobVertexID = new JobVertexID(); +Map flinkMetrics = +Map.of( +"a.pendingRecords", FlinkMetric.PENDING_RECORDS, +"b.pendingRecords", FlinkMetric.PENDING_RECORDS); +Map> metrics = Map.of(jobVertexID, flinkMetrics); + +List aggregatedMetricsResponse = +List.of( +new AggregatedMetric( +"a.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.), +new AggregatedMetric( +"b.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.), +new AggregatedMetric( +"c.unrelated", Double.NaN, Double.NaN, Double.NaN, 100.)); + +Configuration conf = new Configuration(); +RestClusterClient restClusterClient = +new RestClusterClient<>( +conf, +"test-cluster", +(c, e) -> new StandaloneClientHAServices("localhost")) { +@Override +public < +M extends MessageHeaders, +U extends MessageParameters, +R extends RequestBody, +P extends ResponseBody> +CompletableFuture sendRequest( +M messageHeaders, U messageParameters, R request) { +if (messageHeaders instanceof AggregatedSubtaskMetricsHeaders) { +return (CompletableFuture) +CompletableFuture.completedFuture( +new AggregatedMetricsResponseBody( + aggregatedMetricsResponse)); +} +return (CompletableFuture) + CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); +} +}; + +JobID jobID = new JobID(); +JobAutoScalerContext context = +new JobAutoScalerContext<>( +jobID, +jobID, +
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355175979 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java: ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.JobAutoScalerContext; + +import javax.annotation.Nullable; + +import java.time.Duration; + +/** + * Handler all loggable events during scaling. + * + * @param The job key. + * @param Instance of JobAutoScalerContext. + */ +@Experimental +public interface AutoScalerEventHandler> { + +/** + * Handle the event. + * + * @param interval When interval is great than 0, events that repeat within the interval will be + * ignored. + */ +void handleEvent( +Context context, +Type type, +String reason, +String message, +@Nullable String messageKey, +@Nullable Duration interval); + +/** The type of the events. */ +enum Type { +Normal, +Error Review Comment: Why not Normal, _Warning_, Error? ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.realizer.ScalingRealizer; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism; +import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism; + +/** The default implementation of {@link JobAutoScaler}. */ +public class JobAutoScalerImpl> +implements JobAutoScaler { + +private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class); + +@VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError"; + +private final ScalingMetricCollector metricsCollector; +private final ScalingMetricEvaluator evaluator; +private final ScalingExecutor scalingExecutor; +private final AutoScalerEventHandler eventHandler; +private final ScalingRealizer scalingRealizer; +private final AutoScalerStateStore stateStore; + +@VisibleForTesting +final Map>> +lastEvaluatedMetrics = new ConcurrentHashMap<>(); + +@VisibleForTesting +final Map flinkMetrics = new
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355174254 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() + .withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); + +public static final ConfigOption FLINK_CLIENT_TIMEOUT = +autoScalerConfig("flink.client.timeout") +.durationType() +.defaultValue(Duration.ofSeconds(10)) +.withDescription("The timeout for waiting the flink rest client to return."); Review Comment: Let's keep the existing option. We can move it here like you did but it should continue to support `flink.client.timeout`. We can add it as a deprecated key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355160155 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java: ## @@ -47,6 +55,48 @@ public abstract class FlinkResourceContext status = getResource().getStatus(); +String jobId = status.getJobStatus().getJobId(); + +JobStatus jobStatus = generateJobStatusEnum(status); + +return new KubernetesJobAutoScalerContext( +jobId == null ? null : JobID.fromHexString(jobId), +jobStatus, +conf, +getResourceMetricGroup(), +() -> getFlinkService().getClusterClient(conf), +resource, +getKubernetesClient()); +} + +@Nullable +private JobStatus generateJobStatusEnum(CommonStatus status) { +if (status.getLifecycleState() != ResourceLifecycleState.STABLE) { +return null; +} + +String state = status.getJobStatus().getState(); +if (state == null) { +return null; +} +return JobStatus.valueOf(state); Review Comment: (Didn't see your comments until I hit refresh). Fine to defer this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354624451 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java: ## @@ -47,6 +55,48 @@ public abstract class FlinkResourceContext status = getResource().getStatus(); +String jobId = status.getJobStatus().getJobId(); + +JobStatus jobStatus = generateJobStatusEnum(status); + +return new KubernetesJobAutoScalerContext( +jobId == null ? null : JobID.fromHexString(jobId), +jobStatus, +conf, +getResourceMetricGroup(), +() -> getFlinkService().getClusterClient(conf), +resource, +getKubernetesClient()); +} + +@Nullable +private JobStatus generateJobStatusEnum(CommonStatus status) { +if (status.getLifecycleState() != ResourceLifecycleState.STABLE) { +return null; +} + +String state = status.getJobStatus().getState(); +if (state == null) { +return null; +} +return JobStatus.valueOf(state); Review Comment: Sure, if that does not cause any problems with the CRD state, that is fine with me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
XComp commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354589163 ## flink-autoscaler/pom.xml: ## @@ -45,6 +45,32 @@ under the License. provided + +org.projectlombok +lombok +${lombok.version} +provided + + + +org.junit.jupiter +junit-jupiter-params +test + + +
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
gyfora commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354562864 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; + +/** + * The job autoscaler context, it includes all details related to the current job. + * + * @param The job key. + */ +@Experimental +public interface JobAutoScalerContext { + +/** The identifier of each flink job. */ +KEY getJobKey(); + +JobID getJobID(); + +Configuration getConfiguration(); + +MetricGroup getMetricGroup(); + +RestClusterClient getRestClusterClient() throws Exception; Review Comment: We intentionally avoided too many wrappers, generated clients etc whenever possible. Testing is not too bad and we would like to avoid the interface/factory hell that is currently present in core Flink :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
XComp commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354557198 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; + +/** + * The job autoscaler context, it includes all details related to the current job. + * + * @param The job key. + */ +@Experimental +public interface JobAutoScalerContext { + +/** The identifier of each flink job. */ +KEY getJobKey(); + +JobID getJobID(); + +Configuration getConfiguration(); + +MetricGroup getMetricGroup(); + +RestClusterClient getRestClusterClient() throws Exception; Review Comment: Sorry for responding with a delay due to vacation and other things. I guess, you're right. The `ClusterClient` interface doesn't cover all the methods that are provided by the `RestClusterClient`. It feels like we would have to revisit the `ClusterClient` to align things again. One workaround would be to provide a wrapper interface that only reveals the methods that are need by the Kubernetes Operator. WDYT? My concern is that using a concrete implementation might make testing things harder. Btw. have we thought of relying on the OpenAPI definition to generate a REST client? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354382276 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java: ## @@ -47,6 +55,48 @@ public abstract class FlinkResourceContext status = getResource().getStatus(); +String jobId = status.getJobStatus().getJobId(); + +JobStatus jobStatus = generateJobStatusEnum(status); + +return new KubernetesJobAutoScalerContext( +jobId == null ? null : JobID.fromHexString(jobId), +jobStatus, +conf, +getResourceMetricGroup(), +() -> getFlinkService().getClusterClient(conf), +resource, +getKubernetesClient()); +} + +@Nullable +private JobStatus generateJobStatusEnum(CommonStatus status) { +if (status.getLifecycleState() != ResourceLifecycleState.STABLE) { +return null; +} + +String state = status.getJobStatus().getState(); +if (state == null) { +return null; +} +return JobStatus.valueOf(state); Review Comment: Sure, I didn't change it at this PR, and I created [FLINK-33237](https://issues.apache.org/jira/browse/FLINK-33237) to optimize this after this PR~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
gyfora commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354365364 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java: ## @@ -47,6 +55,48 @@ public abstract class FlinkResourceContext status = getResource().getStatus(); +String jobId = status.getJobStatus().getJobId(); + +JobStatus jobStatus = generateJobStatusEnum(status); + +return new KubernetesJobAutoScalerContext( +jobId == null ? null : JobID.fromHexString(jobId), +jobStatus, +conf, +getResourceMetricGroup(), +() -> getFlinkService().getClusterClient(conf), +resource, +getKubernetesClient()); +} + +@Nullable +private JobStatus generateJobStatusEnum(CommonStatus status) { +if (status.getLifecycleState() != ResourceLifecycleState.STABLE) { +return null; +} + +String state = status.getJobStatus().getState(); +if (state == null) { +return null; +} +return JobStatus.valueOf(state); Review Comment: Let's not change this as part of this PR but feel free to open a JIRA for this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354340358 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java: ## @@ -47,6 +55,48 @@ public abstract class FlinkResourceContext status = getResource().getStatus(); +String jobId = status.getJobStatus().getJobId(); + +JobStatus jobStatus = generateJobStatusEnum(status); + +return new KubernetesJobAutoScalerContext( +jobId == null ? null : JobID.fromHexString(jobId), +jobStatus, +conf, +getResourceMetricGroup(), +() -> getFlinkService().getClusterClient(conf), +resource, +getKubernetesClient()); +} + +@Nullable +private JobStatus generateJobStatusEnum(CommonStatus status) { +if (status.getLifecycleState() != ResourceLifecycleState.STABLE) { +return null; +} + +String state = status.getJobStatus().getState(); +if (state == null) { +return null; +} +return JobStatus.valueOf(state); Review Comment: Hi @mxm @gyfora , could the type of `org.apache.flink.kubernetes.operator.api.status.JobStatus#state` be changed from `String` to `org.apache.flink.api.common.JobStatus`? I see all setters respect it. If it can be changed, the `JobStatus.valueOf(state)` isn't necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354332455 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } +private void scaling(FlinkResourceContext ctx) throws Exception { +KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext(); + +if (autoscalerDisabled(ctx)) { +autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false); +resourceScaler.scale(autoScalerContext); +return; +} +if (waitingForRunning(ctx.getResource().getStatus())) { +LOG.info("Autoscaler is waiting for stable, running state"); +resourceScaler.cleanup(autoScalerContext.getJobKey()); +return; Review Comment: Thanks to @mxm for the suggestion! I have moved the `waitingForRunning` logic back to autoscaler module, and related tests have been recovered. Also, these 2 comments[1][2] have been recovered as well. [1] https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350082356 [2] https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350079573 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352333625 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -72,7 +79,7 @@ public abstract class AbstractFlinkResourceReconciler< protected final EventRecorder eventRecorder; protected final StatusRecorder statusRecorder; -protected final JobAutoScaler resourceScaler; +protected final JobAutoScaler resourceScaler; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352290023 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } +private void scaling(FlinkResourceContext ctx) throws Exception { +KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext(); + +if (autoscalerDisabled(ctx)) { +autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false); +resourceScaler.scale(autoScalerContext); +return; +} +if (waitingForRunning(ctx.getResource().getStatus())) { +LOG.info("Autoscaler is waiting for stable, running state"); +resourceScaler.cleanup(autoScalerContext.getJobKey()); +return; Review Comment: Preferably, I would like any logic related to applying parallelism inside the autoscaler implementation. This shouldn't change when the autoscaler is waiting for the running state. In fact, the job state checks should also be performed by the autoscaler, not by the reconciler. The current code mixes control over the parallelism overrides between the reconciler and the autoscaler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352282612 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -72,7 +79,7 @@ public abstract class AbstractFlinkResourceReconciler< protected final EventRecorder eventRecorder; protected final StatusRecorder statusRecorder; -protected final JobAutoScaler resourceScaler; +protected final JobAutoScaler resourceScaler; Review Comment: Can we rename this? ```suggestion protected final JobAutoScaler autoscaler; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1351975279 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -15,149 +15,180 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.config; +package org.apache.flink.autoscaler.config; +import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator; import java.time.Duration; import java.util.List; -import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig; - /** Config options related to the autoscaler module. */ public class AutoScalerOptions { +public static final String DEPRECATED_K8S_OP_CONF_PREFIX = "kubernetes.operator."; +public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler."; + +private static String deprecatedOperatorConfigKey(String key) { +return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key; +} + +private static String autoScalerConfigKey(String key) { +return AUTOSCALER_CONF_PREFIX + key; +} + private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { -return operatorConfig("job.autoscaler." + key); +return ConfigOptions.key(autoScalerConfigKey(key)); } public static final ConfigOption AUTOSCALER_ENABLED = autoScalerConfig("enabled") .booleanType() .defaultValue(false) +.withDeprecatedKeys(deprecatedOperatorConfigKey("enabled")) Review Comment: Added: > Note: The option prefix `kubernetes.operator.` was removed in FLIP-334, because the autoscaler module was decoupled from flink-kubernetes-operator. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.realizer.ScalingRealizer; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism; +import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism; + +/** The default implementation of {@link JobAutoScaler}. */ +public class JobAutoScalerImpl> +implements JobAutoScaler { + +private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class); + +@VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError"; Review Comment: It's used at `BacklogBasedScalingTest`. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with +
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350028811 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() + .withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); + +public static final ConfigOption FLINK_CLIENT_TIMEOUT = +autoScalerConfig("flink.client.timeout") +.durationType() +.defaultValue(Duration.ofSeconds(10)) +.withDescription("The timeout for waiting the flink rest client to return."); Review Comment: Do we need to expose this as a configuration or can we just use the default? I think it is better to let the user configure standard Flink configs as listed here: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#client-timeout ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -15,149 +15,180 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.config; +package org.apache.flink.autoscaler.config; +import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator; import java.time.Duration; import java.util.List; -import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig; - /** Config options related to the autoscaler module. */ public class AutoScalerOptions { +public static final String DEPRECATED_K8S_OP_CONF_PREFIX = "kubernetes.operator."; +public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler."; + +private static String deprecatedOperatorConfigKey(String key) { +return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key; +} + +private static String autoScalerConfigKey(String key) { +return AUTOSCALER_CONF_PREFIX + key; +} + private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { -return operatorConfig("job.autoscaler." + key); +return ConfigOptions.key(autoScalerConfigKey(key)); } public static final ConfigOption AUTOSCALER_ENABLED = autoScalerConfig("enabled") .booleanType() .defaultValue(false) +.withDeprecatedKeys(deprecatedOperatorConfigKey("enabled")) Review Comment: This might confuse some existing users because the deprecated keys will not appear on the configuration page. Can we add a note on the configuration page that we renamed the configuration prefix? ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ## @@ -38,42 +36,42 @@ import java.util.Map; import java.util.SortedMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD; +import static
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1752727711 > If it's still needed, I can restore the `flink-kubernetes-operator-autoscaler` module. And moving all kubernetes-autoscaler related classes to this module. WDYT? Given that the autoscaler module is now decoupled from the operator version, it would make more sense than ever to have it pluggable. But given that the backends are not pluggable, the pluggability wouldn't be as useful anymore. I'm ok with removing the support for now if it proves difficult to maintain it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
gyfora commented on PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1752501151 @1996fanrui Sorry I merged a PR related to event triggering from the ScalingExecutor. I will try to avoid merging further autoscaler related PRs until this is finalised. Please take care when rebasing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1752467497 > The refactor also completely changes how the autoscaler module is loaded in the operator. Previously we intentionally loaded this as a service so it would be possible to potentially upgrade the autoscaler without the rest of the operator. > > Now @1996fanrui you moved the operator related autoscaler classes in the main operator module resulting in a tight coupling. > > @mxm how do you feel about this? Originally you were the one who introduced the service loader approach :) Sorry for breaking this due to I don't know the background before. And thanks to @gyfora for pointing it out. If it's still needed, I can restore the `flink-kubernetes-operator-autoscaler` module. And moving all kubernetes-autoscaler related classes to this module. WDYT? cc @mxm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
gyfora commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1343674332 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DefaultJobAutoScalerContext.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.function.SupplierWithException; + +/** The default job autoscaler context, the jobKey is JobID. */ +public class DefaultJobAutoScalerContext extends AbstractJobAutoScalerContext { Review Comment: Let's simply delete the class and simply have a non-abstract `JobAutoscalerContext` ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AbstractJobAutoScalerContext.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.function.SupplierWithException; + +/** + * The abstract job autoscaler context. + * + * @param The job key. + */ +public abstract class AbstractJobAutoScalerContext implements JobAutoScalerContext { Review Comment: I would simply rename this as `JobAutoScalerContext`, delete constructor , make all fields simply final and put `@Value` on it, to generate all the getters, toString etc. We don't need to have interfaces for everything, too much boilerplate ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; + +/** + * The job autoscaler context, it includes all details related to the current job. + * + * @param The job key. + */ +@Experimental +public interface JobAutoScalerContext { Review Comment: Let's delete the interface for now (see my previous comment) we can re-add this later if necessary but I doubt it. ##
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
gyfora commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1343663466 ## flink-autoscaler/pom.xml: ## @@ -45,6 +45,32 @@ under the License. provided + +org.projectlombok +lombok +${lombok.version} +provided + + + +org.junit.jupiter +junit-jupiter-params +test + + +