[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

2023-10-01 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1342079785


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The config map store, it's responsible for store/get/remove the state with 
String type. */
+public class ConfigMapStore {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);
+
+private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
+
+private final KubernetesClient kubernetesClient;
+
+/**
+ * The cache for each resourceId may be in three situations: 1. The 
resourceId isn't exist :
+ * ConfigMap isn't loaded from kubernetes, or it's removed. 2. The 
resourceId is exist, and
+ * value is the Optional.empty() : We have loaded the ConfigMap from 
kubernetes, but the
+ * ConfigMap isn't created at kubernetes side. 3. The resourceId is exist, 
and the Optional
+ * isn't empty : We have loaded the ConfigMap from kubernetes, it may be 
not same with
+ * kubernetes side due to it's not flushed after updating.
+ */
+private final ConcurrentHashMap> cache =
+new ConcurrentHashMap<>();
+
+public ConfigMapStore(KubernetesClient kubernetesClient) {
+this.kubernetesClient = kubernetesClient;
+}
+
+protected void putSerializedState(
+KubernetesJobAutoScalerContext jobContext, String key, String 
value) {
+getOrCreateState(jobContext).put(key, value);
+}
+
+protected Optional getSerializedState(
+KubernetesJobAutoScalerContext jobContext, String key) {
+return getConfigMap(jobContext).map(configMap -> 
configMap.getData().get(key));
+}
+
+protected void removeSerializedState(KubernetesJobAutoScalerContext 
jobContext, String key) {
+getConfigMap(jobContext)
+.ifPresentOrElse(
+configMap -> configMap.getData().remove(key),
+() -> {
+throw new IllegalStateException(
+"The configMap isn't created, so the 
remove is unavailable.");
+});
+}
+
+public void flush(KubernetesJobAutoScalerContext jobContext) {
+Optional configMapOpt = cache.get(jobContext.getJobKey());
+Preconditions.checkState(

Review Comment:
   Some callers call the `flush` in the end even if the state isn't updated. So 
you suggestion is easy to caller.
   
   Updated the prod code, and I will add more tests for it later due to I'm on 
vacation, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

2023-10-01 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1342082710


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext ctx) throws 
Exception {
 }
 }
 
+private void scaling(FlinkResourceContext ctx) throws Exception {
+KubernetesJobAutoScalerContext autoScalerContext = 
ctx.getJobAutoScalerContext();
+
+if (autoscalerDisabled(ctx)) {
+autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, 
false);
+resourceScaler.scale(autoScalerContext);
+return;
+}
+if (waitingForRunning(ctx.getResource().getStatus())) {
+LOG.info("Autoscaler is waiting for  stable, running state");
+resourceScaler.cleanup(autoScalerContext.getJobKey());
+return;

Review Comment:
   Thanks for pointing it out.
   
   If the `applyParallelismOverrides` is expected here, we can call the related 
code here. The `applyParallelismOverrides` just calls the 
`stateStore.getParallelismOverrides(ctx)` and `scalingRealizer.realize(ctx, 
userOverrides);`, and `stateStore` and `scalingRealizer` can be reached here. 
   
   I plan to extract the 
`org.apache.flink.autoscaler.JobAutoScalerImpl#applyParallelismOverrides` to a 
static method, WDYT?



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The config map store, it's responsible for store/get/remove the state with 
String type. */
+public class ConfigMapStore {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);
+
+private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
+
+private final KubernetesClient kubernetesClient;
+
+/**
+ * The cache for each resourceId may be in three situations: 1. The 
resourceId isn't exist :
+ * ConfigMap isn't loaded from kubernetes, or it's removed. 2. The 
resourceId is exist, and
+ * value is the Optional.empty() : We have loaded the ConfigMap from 
kubernetes, but the
+ * ConfigMap isn't created at kubernetes side. 3. The resourceId is exist, 
and the Optional
+ * isn't empty : We have loaded the ConfigMap from kubernetes, it may be 
not same with
+ * kubernetes side due to it's not flushed after updating.
+ */
+private final ConcurrentHashMap> cache =
+new ConcurrentHashMap<>();
+
+public ConfigMapStore(KubernetesClient kubernetesClient) {
+this.kubernetesClient = kubernetesClient;
+}
+
+protected void putSerializedState(
+KubernetesJobAutoScalerContext jobContext, String key, String 
value) {
+getOrCreateState(jobContext).put(key, value);
+}
+
+protected Optional getSerializedState(
+KubernetesJobAutoScalerContext jobContext, String key) {
+return getConfigMap(jobContext).map(configMap -> 
configMap.getData().get(key));
+}
+
+protected void removeSerializedState(KubernetesJobAutoScalerContext 
jobContext, String key) {
+getConfigMap(jobContext)
+.ifPresentOrElse(
+configMap -> configMap.getData().remove(key),
+() -> {
+

[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

2023-09-26 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1336584237


##
flink-autoscaler/pom.xml:
##
@@ -45,6 +45,32 @@ under the License.
 provided
 
 
+
+org.projectlombok
+lombok
+${lombok.version}
+provided
+
+
+
+org.junit.jupiter
+junit-jupiter-params
+test
+
+
+

[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

2023-09-26 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1336584237


##
flink-autoscaler/pom.xml:
##
@@ -45,6 +45,32 @@ under the License.
 provided
 
 
+
+org.projectlombok
+lombok
+${lombok.version}
+provided
+
+
+
+org.junit.jupiter
+junit-jupiter-params
+test
+
+
+

[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

2023-09-25 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1336584237


##
flink-autoscaler/pom.xml:
##
@@ -45,6 +45,32 @@ under the License.
 provided
 
 
+
+org.projectlombok
+lombok
+${lombok.version}
+provided
+
+
+
+org.junit.jupiter
+junit-jupiter-params
+test
+
+
+

[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

2023-09-21 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1333849523


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+/**
+ * The state store is responsible for store all states during scaling.
+ *
+ * @param  The job key.
+ * @param  Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerStateStore> {
+
+void storeScalingHistory(Context jobContext, String scalingHistory);

Review Comment:
   Hi @Samrat002 , thanks for your feedback!
   
   I have updated these method parameters of `AutoScalerStateStore` to the 
specific class instead of String, such as: `Map> scalingHistory`.
   
   ```
   public interface AutoScalerStateStore> {
   
   void storeScalingHistory(
   Context jobContext,
   Map> 
scalingHistory);
   
   Optional>> 
getScalingHistory(
   Context jobContext);
   
   void removeScalingHistory(Context jobContext);
   }
   ```
   
   The PR has been updated as well.
   
   Do you think is it ok? It means the state store is responsible for how to 
serialize and deserialize, for example:
   
   - The default `KubernetesAutoScalerStateStore` will serialize all states to 
String inside of `KubernetesAutoScalerStateStore`
   - As you mentioned before: if there is any complex type in the future. Each 
state store to determine how to serialize them.
   
   Also, let me add a reason why update these parameters here:
   
   Currently, all states are stored at ConfigMap, and it has size limitation. 
The size limitation should just work with `KubernetesAutoScalerStateStore`, and 
size limitation is a part of serialization. So we should move the serialization 
and deserialization in the `AutoScalerStateStore`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #677: [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces

2023-09-18 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1328450023


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+/**
+ * The state store is responsible for store all states during scaling.
+ *
+ * @param  The job key.
+ * @param  Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerStateStore> {
+
+void storeScalingHistory(Context jobContext, String scalingHistory);

Review Comment:
   > improvement1
   
   The improvement1 sounds make sense to me, and I need to improve the 
`SerializableState`. The current `deserialize` method needs to create a object 
first, and then call `object.deserialize(serializedResult)`. In general, the 
serialize is a separate object.
   
   I'm afraid whether it's too complex If we introduce 2 class for each state.
   
   > improvement2
   
   For improvement2, my concern is the serialized type is changed, and all old 
jobs cannot be compatible directly. 
   
   The compatibility of `byte[]` must be stronger than String, but the benefits 
it brings are uncertain (because there may not be classes that can only be 
serialized into `byte[]` in the future).
   
   The negative impact is certain, and it will bring additional migration costs 
to historical users.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+/**
+ * The state store is responsible for store all states during scaling.
+ *
+ * @param  The job key.
+ * @param  Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerStateStore> {
+
+void storeScalingHistory(Context jobContext, String scalingHistory);

Review Comment:
   Hi @gyfora  @mxm , because of some offline comments and ease of review, I 
split the decoupling into the FLINK-33097 and FLINK-33098. Of course, if you 
think they should be merged to one PR. I will go ahead at this PR with multiple 
commits.
   
   @Samrat002 provided 2 improvements about the `StateStore`.
   
   ## 1. Using the structured class instead of `String`
   
   The structured class is clearer than String.
   
   First of all, we define the `SerializableState` interface to abstract the 
`serialize` and `deserialize`.
   
   ```
   interface SerializableState {
   
   String serialize();
   
   State deserialize(String serializedResult);
   }
   ```
   
   And then, define a `ScalingHistory` class, it implement the 
`SerializableState`.
   
   
   ## 2. Using the `byte[]` instead of `String` as the serialized result
   
   Reason: In the future there may be some complex state objects that cannot be 
serialized to String.
   
   
   Hi @Samrat002 , please correct me if my description is wrong, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific