Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
1996fanrui merged PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1464522145 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** The utils of {@link AutoScalerEventHandler}. */ +@Experimental +public class AutoscalerEventUtils { + +private static final Pattern SCALING_REPORT_SEPARATOR = Pattern.compile("\\{(.+?)\\}"); +private static final Pattern VERTEX_SCALING_REPORT_PATTERN = +Pattern.compile( +"Vertex ID (.*?) \\| Parallelism (.*?) -> (.*?) \\| Processing capacity (.*?) -> (.*?) \\| Target data rate (.*)"); + +/** Parse the scaling report from original scaling report event. */ +public static List parseVertexScalingReports(String scalingReport) { Review Comment: Yeah, it needs push model instead pull model. > So no objections from my side moving ahead with the proposed changes. This prod code and test code is already ready. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
mxm commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1463522323 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** The utils of {@link AutoScalerEventHandler}. */ +@Experimental +public class AutoscalerEventUtils { + +private static final Pattern SCALING_REPORT_SEPARATOR = Pattern.compile("\\{(.+?)\\}"); +private static final Pattern VERTEX_SCALING_REPORT_PATTERN = +Pattern.compile( +"Vertex ID (.*?) \\| Parallelism (.*?) -> (.*?) \\| Processing capacity (.*?) -> (.*?) \\| Target data rate (.*)"); + +/** Parse the scaling report from original scaling report event. */ +public static List parseVertexScalingReports(String scalingReport) { Review Comment: Makes sense. I was thinking to expose the overrides on the status field of the custom resource, but that would require pull instead of push. So no objections from my side moving ahead with the proposed changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1463020446 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** The utils of {@link AutoScalerEventHandler}. */ +@Experimental +public class AutoscalerEventUtils { + +private static final Pattern SCALING_REPORT_SEPARATOR = Pattern.compile("\\{(.+?)\\}"); +private static final Pattern VERTEX_SCALING_REPORT_PATTERN = +Pattern.compile( +"Vertex ID (.*?) \\| Parallelism (.*?) -> (.*?) \\| Processing capacity (.*?) -> (.*?) \\| Target data rate (.*)"); + +/** Parse the scaling report from original scaling report event. */ +public static List parseVertexScalingReports(String scalingReport) { Review Comment: > Users have the spec, metrics, and the reconciliation status to programmatically extract the deployment state `Spec` and `Reconciliation status` are related to kubernetes, right? After analysis, I think the metric report isn't suitable here. In general, the metric is reported or fetched periodically, - When it's too frequent, the reported recommend parallelism is always same (It isn't needed) - When it's too infrequent, we might miss some recommend parallelism. So, I think the event driven is suitable for this requirement, and the event handler is the event driven. Also, the core change of this PR only added `{` and `}` between multiple vertices. If you think we don't need to provide the parse utils class, I can move it to the test namespace. What's your opinion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
mxm commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1458941912 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** The utils of {@link AutoScalerEventHandler}. */ +@Experimental +public class AutoscalerEventUtils { + +private static final Pattern SCALING_REPORT_SEPARATOR = Pattern.compile("\\{(.+?)\\}"); +private static final Pattern VERTEX_SCALING_REPORT_PATTERN = +Pattern.compile( +"Vertex ID (.*?) \\| Parallelism (.*?) -> (.*?) \\| Processing capacity (.*?) -> (.*?) \\| Target data rate (.*)"); + +/** Parse the scaling report from original scaling report event. */ +public static List parseVertexScalingReports(String scalingReport) { Review Comment: Thank you. If it is common practice to parse deployment events and there is no other good way to get this data, I'm also open to perusing the current approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1458859337 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** The utils of {@link AutoScalerEventHandler}. */ +@Experimental +public class AutoscalerEventUtils { + +private static final Pattern SCALING_REPORT_SEPARATOR = Pattern.compile("\\{(.+?)\\}"); +private static final Pattern VERTEX_SCALING_REPORT_PATTERN = +Pattern.compile( +"Vertex ID (.*?) \\| Parallelism (.*?) -> (.*?) \\| Processing capacity (.*?) -> (.*?) \\| Target data rate (.*)"); + +/** Parse the scaling report from original scaling report event. */ +public static List parseVertexScalingReports(String scalingReport) { Review Comment: Thank you @mxm , I will do more research next week to check how to let other services to get the recommend parallelism properly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
mxm commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1458798621 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** The utils of {@link AutoScalerEventHandler}. */ +@Experimental +public class AutoscalerEventUtils { + +private static final Pattern SCALING_REPORT_SEPARATOR = Pattern.compile("\\{(.+?)\\}"); +private static final Pattern VERTEX_SCALING_REPORT_PATTERN = +Pattern.compile( +"Vertex ID (.*?) \\| Parallelism (.*?) -> (.*?) \\| Processing capacity (.*?) -> (.*?) \\| Target data rate (.*)"); + +/** Parse the scaling report from original scaling report event. */ +public static List parseVertexScalingReports(String scalingReport) { Review Comment: I'm not 100% sure whether we should provide utils for parsing the events. Users have the spec, metrics, and the reconciliation status to programmatically extract the deployment state. Deployment events are not meant to be machine-readable. Maybe we are missing important infos in the status field? I would rather add scaling information there instead of parsing the events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1458204048 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** The utils of {@link AutoScalerEventHandler}. */ +@Experimental +public class AutoscalerEventUtils { + +private static final Pattern SCALING_REPORT_SEPARATOR = Pattern.compile("\\{(.+?)\\}"); +private static final Pattern VERTEX_SCALING_REPORT_PATTERN = +Pattern.compile( +"Vertex ID (.*?) \\| Parallelism (.*?) -> (.*?) \\| Processing capacity (.*?) -> (.*?) \\| Target data rate (.*)"); + +/** Parse the scaling report from original scaling report event. */ +public static List parseVertexScalingReports(String scalingReport) { Review Comment: Keep it here for 2 reasons: 1. For test and ensure the ScalingReport can be parsed correctly - In the future, we need to be careful when modifying the contents of `ScalingReport` because its format is being parsed by some users. 2. The public utils can be used for all autoscaler users. - That's why I added the `@Experimental` for `AutoscalerEventUtils` class. For reason1, keep it here or test namespace are fine. For reason2, keep it here is easy to use for users. I can move it to test namespace if you think it's not necessary, thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
mxm commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1457237051 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** The utils of {@link AutoScalerEventHandler}. */ +@Experimental +public class AutoscalerEventUtils { + +private static final Pattern SCALING_REPORT_SEPARATOR = Pattern.compile("\\{(.+?)\\}"); +private static final Pattern VERTEX_SCALING_REPORT_PATTERN = +Pattern.compile( +"Vertex ID (.*?) \\| Parallelism (.*?) -> (.*?) \\| Processing capacity (.*?) -> (.*?) \\| Target data rate (.*)"); + +/** Parse the scaling report from original scaling report event. */ +public static List parseVertexScalingReports(String scalingReport) { Review Comment: Why are we parsing the output here? Just for the tests? I would move this class to the test namespace. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1454656984 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link AutoScalerEventHandler}. */ +class AutoScalerEventHandlerTest { + +@Test +void testScalingReport() throws JsonProcessingException { +var expectedJson = +"{\"scalingMessage\":\"Scaling execution enabled, begin scaling vertices:\",\"scalingResults\":[" ++ "{\"vertexId\":\"0a448493b4782967b150582570326227\",\"currentParallelism\":5,\"newParallelism\":8," ++ "\"currentProcessCapacity\":404.727,\"expectedProcessCapacity\":645.0,\"targetDataRate\":404.268}," ++ "{\"vertexId\":\"bc764cd8ddf7a0cff126f51c16239658\",\"currentParallelism\":4,\"newParallelism\":2," ++ "\"currentProcessCapacity\":\"Infinity\",\"expectedProcessCapacity\":\"Infinity\",\"targetDataRate\":812.583}," ++ "{\"vertexId\":\"ea632d67b7d595e5b851708ae9ad79d6\",\"currentParallelism\":3,\"newParallelism\":1," ++ "\"currentProcessCapacity\":424.678,\"expectedProcessCapacity\":123.4,\"targetDataRate\":403.673}]}"; Review Comment: Thanks for the feedback! I only added the `{` and `}` as the separator between multiple vertices. It will be more readable and parsable, I added the `AutoscalerEventUtils` and `AutoScalerEventHandlerTest` to test the deserialization. They will ensure the compatibility of serialization and deseriazliation. And `AutoscalerEventUtils` can be used for users. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
mxm commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1453532627 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link AutoScalerEventHandler}. */ +class AutoScalerEventHandlerTest { + +@Test +void testScalingReport() throws JsonProcessingException { +var expectedJson = +"{\"scalingMessage\":\"Scaling execution enabled, begin scaling vertices:\",\"scalingResults\":[" ++ "{\"vertexId\":\"0a448493b4782967b150582570326227\",\"currentParallelism\":5,\"newParallelism\":8," ++ "\"currentProcessCapacity\":404.727,\"expectedProcessCapacity\":645.0,\"targetDataRate\":404.268}," ++ "{\"vertexId\":\"bc764cd8ddf7a0cff126f51c16239658\",\"currentParallelism\":4,\"newParallelism\":2," ++ "\"currentProcessCapacity\":\"Infinity\",\"expectedProcessCapacity\":\"Infinity\",\"targetDataRate\":812.583}," ++ "{\"vertexId\":\"ea632d67b7d595e5b851708ae9ad79d6\",\"currentParallelism\":3,\"newParallelism\":1," ++ "\"currentProcessCapacity\":424.678,\"expectedProcessCapacity\":123.4,\"targetDataRate\":403.673}]}"; Review Comment: I'm ok with adding separators for the current format, if that helps to make it more parsable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
gyfora commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1453407844 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link AutoScalerEventHandler}. */ +class AutoScalerEventHandlerTest { + +@Test +void testScalingReport() throws JsonProcessingException { +var expectedJson = +"{\"scalingMessage\":\"Scaling execution enabled, begin scaling vertices:\",\"scalingResults\":[" ++ "{\"vertexId\":\"0a448493b4782967b150582570326227\",\"currentParallelism\":5,\"newParallelism\":8," ++ "\"currentProcessCapacity\":404.727,\"expectedProcessCapacity\":645.0,\"targetDataRate\":404.268}," ++ "{\"vertexId\":\"bc764cd8ddf7a0cff126f51c16239658\",\"currentParallelism\":4,\"newParallelism\":2," ++ "\"currentProcessCapacity\":\"Infinity\",\"expectedProcessCapacity\":\"Infinity\",\"targetDataRate\":812.583}," ++ "{\"vertexId\":\"ea632d67b7d595e5b851708ae9ad79d6\",\"currentParallelism\":3,\"newParallelism\":1," ++ "\"currentProcessCapacity\":424.678,\"expectedProcessCapacity\":123.4,\"targetDataRate\":403.673}]}"; Review Comment: I think the JSON format is notoriously hard to read (for humans), so I think it's not really a good fit for event messages :/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1453296427 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link AutoScalerEventHandler}. */ +class AutoScalerEventHandlerTest { + +@Test +void testScalingReport() throws JsonProcessingException { +var expectedJson = +"{\"scalingMessage\":\"Scaling execution enabled, begin scaling vertices:\",\"scalingResults\":[" ++ "{\"vertexId\":\"0a448493b4782967b150582570326227\",\"currentParallelism\":5,\"newParallelism\":8," ++ "\"currentProcessCapacity\":404.727,\"expectedProcessCapacity\":645.0,\"targetDataRate\":404.268}," ++ "{\"vertexId\":\"bc764cd8ddf7a0cff126f51c16239658\",\"currentParallelism\":4,\"newParallelism\":2," ++ "\"currentProcessCapacity\":\"Infinity\",\"expectedProcessCapacity\":\"Infinity\",\"targetDataRate\":812.583}," ++ "{\"vertexId\":\"ea632d67b7d595e5b851708ae9ad79d6\",\"currentParallelism\":3,\"newParallelism\":1," ++ "\"currentProcessCapacity\":424.678,\"expectedProcessCapacity\":123.4,\"targetDataRate\":403.673}]}"; Review Comment: I found the json format is easy to read as well. I added the `JsonPropertyOrder` annotation to control the property order in json format. > It's also a breaking change. Our users expect to see the current format. Could we add an option to determine which format will be used? Also, if current format is useful for read, how about add some separators between 2 vertices? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1453296427 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link AutoScalerEventHandler}. */ +class AutoScalerEventHandlerTest { + +@Test +void testScalingReport() throws JsonProcessingException { +var expectedJson = +"{\"scalingMessage\":\"Scaling execution enabled, begin scaling vertices:\",\"scalingResults\":[" ++ "{\"vertexId\":\"0a448493b4782967b150582570326227\",\"currentParallelism\":5,\"newParallelism\":8," ++ "\"currentProcessCapacity\":404.727,\"expectedProcessCapacity\":645.0,\"targetDataRate\":404.268}," ++ "{\"vertexId\":\"bc764cd8ddf7a0cff126f51c16239658\",\"currentParallelism\":4,\"newParallelism\":2," ++ "\"currentProcessCapacity\":\"Infinity\",\"expectedProcessCapacity\":\"Infinity\",\"targetDataRate\":812.583}," ++ "{\"vertexId\":\"ea632d67b7d595e5b851708ae9ad79d6\",\"currentParallelism\":3,\"newParallelism\":1," ++ "\"currentProcessCapacity\":424.678,\"expectedProcessCapacity\":123.4,\"targetDataRate\":403.673}]}"; Review Comment: I found the json format is easy to read as well. I added some the `JsonPropertyOrder` annotation to control the property order in json format. > It's also a breaking change. Our users expect to see the current format. Could we add an option to determine which format will be used? Also, if current format is useful for read, how about add some separators between 2 vertices? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
mxm commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1453248767 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link AutoScalerEventHandler}. */ +class AutoScalerEventHandlerTest { + +@Test +void testScalingReport() throws JsonProcessingException { +var expectedJson = +"{\"scalingMessage\":\"Scaling execution enabled, begin scaling vertices:\",\"scalingResults\":[" ++ "{\"vertexId\":\"0a448493b4782967b150582570326227\",\"currentParallelism\":5,\"newParallelism\":8," ++ "\"currentProcessCapacity\":404.727,\"expectedProcessCapacity\":645.0,\"targetDataRate\":404.268}," ++ "{\"vertexId\":\"bc764cd8ddf7a0cff126f51c16239658\",\"currentParallelism\":4,\"newParallelism\":2," ++ "\"currentProcessCapacity\":\"Infinity\",\"expectedProcessCapacity\":\"Infinity\",\"targetDataRate\":812.583}," ++ "{\"vertexId\":\"ea632d67b7d595e5b851708ae9ad79d6\",\"currentParallelism\":3,\"newParallelism\":1," ++ "\"currentProcessCapacity\":424.678,\"expectedProcessCapacity\":123.4,\"targetDataRate\":403.673}]}"; Review Comment: CC @gyfora -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
mxm commented on code in PR #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757#discussion_r1453248297 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link AutoScalerEventHandler}. */ +class AutoScalerEventHandlerTest { + +@Test +void testScalingReport() throws JsonProcessingException { +var expectedJson = +"{\"scalingMessage\":\"Scaling execution enabled, begin scaling vertices:\",\"scalingResults\":[" ++ "{\"vertexId\":\"0a448493b4782967b150582570326227\",\"currentParallelism\":5,\"newParallelism\":8," ++ "\"currentProcessCapacity\":404.727,\"expectedProcessCapacity\":645.0,\"targetDataRate\":404.268}," ++ "{\"vertexId\":\"bc764cd8ddf7a0cff126f51c16239658\",\"currentParallelism\":4,\"newParallelism\":2," ++ "\"currentProcessCapacity\":\"Infinity\",\"expectedProcessCapacity\":\"Infinity\",\"targetDataRate\":812.583}," ++ "{\"vertexId\":\"ea632d67b7d595e5b851708ae9ad79d6\",\"currentParallelism\":3,\"newParallelism\":1," ++ "\"currentProcessCapacity\":424.678,\"expectedProcessCapacity\":123.4,\"targetDataRate\":403.673}]}"; Review Comment: I'm not sure about emitting json events. Kubernetes events are normally not directly machine-readable. It's possible to read the overrides programmatically. It's also a breaking change. Our users expect to see the current format. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling [flink-kubernetes-operator]
1996fanrui opened a new pull request, #757: URL: https://github.com/apache/flink-kubernetes-operator/pull/757 ## What is the purpose of the change Currently, the scaling report format is `Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f`. It has 2 disadvantages: 1. When one job has multiple vertices, the report of all vertices are mixed together without any separator, here is an example: - `Scaling execution enabled, begin scaling vertices: Vertex ID ea632d67b7d595e5b851708ae9ad79d6 | Parallelism 2 -> 1 | Processing capacity 800466.67 -> 320186.67 | Target data rate 715.10 Vertex ID bc764cd8ddf7a0cff126f51c16239658 | Parallelism 2 -> 1 | Processing capacity 79252.08 -> 31700.83 | Target data rate 895.93 Vertex ID 0a448493b4782967b150582570326227 | Parallelism 8 -> 16 | Processing capacity 716.05 -> 1141.00 | Target data rate 715.54` - We can see the Vertex ID is the beginning of each vertex report, it doesn't have any separator with the last vertex. 2. This format is non-standard, it's hard to deserialize. - When job enables the autoscaler and disable the scaling. - Flink platform maintainer wants to show the scaling report in WebUI, it's helpful to using the report result for flink users. - So easy to deserialize is useful for these flink platform. ## Brief change log Serializing the scaling report with json format, it's easy to read and deserialize. Here is the scaling report message after this PR: `` ## Verifying this change - Added `AutoScalerEventHandlerTest` to test the ScalingReport format. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org