Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
Myasuka merged PR #23764: URL: https://github.com/apache/flink/pull/23764 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
Myasuka commented on code in PR #23764: URL: https://github.com/apache/flink/pull/23764#discussion_r1403259017 ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionHaServices.java: ## @@ -46,7 +46,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; Review Comment: Thanks for the suggestion, I prefer the 2nd option and leave the constants as deprecated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
XComp commented on code in PR #23764: URL: https://github.com/apache/flink/pull/23764#discussion_r1403255647 ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionHaServices.java: ## @@ -46,7 +46,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; Review Comment: hm, I see what you mean. We have treated ConfigMap content as non-public in the past, though (e.g. FLINK-24038 which changed the way the HA data is organized). Having this mentioned in the release notes of FLINK-33598 would be good enough in that case. Another option would be to mark the constants as deprecated and document why they are still lingering around even though they are not really used. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
Myasuka commented on code in PR #23764: URL: https://github.com/apache/flink/pull/23764#discussion_r1403244878 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java: ## @@ -72,56 +71,41 @@ void setUp() throws Exception { @AfterEach void tearDown() throws Exception { ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, watchCallbackExecutorService); -client.deleteConfigMapsByLabels(labels).get(); +client.deleteConfigMap(configMapName).get(); } @Test @Timeout(12) public void testWatch() throws Exception { try (KubernetesConfigMapSharedWatcher watcher = -client.createConfigMapSharedWatcher(labels)) { +client.createConfigMapSharedWatcher(configMapName)) { for (int i = 0; i < 10; i++) { -List callbackHandlers = new ArrayList<>(); -List watchers = new ArrayList<>(); - -watchInRange(watcher, callbackHandlers, watchers, 0, 20); -createConfigMapsInRange(0, 5); -watchInRange(watcher, callbackHandlers, watchers, 20, 40); -createConfigMapsInRange(5, 10); -updateConfigMapInRange(0, 10, ImmutableMap.of("foo", "bar")); -for (TestingCallbackHandler handler : callbackHandlers) { -handler.addFuture.get(); -handler.addOrUpdateFuture.get(); - assertThat(handler.assertFuture).isNotCompletedExceptionally(); +Tuple2 results = watch(configMapName, watcher); +TestingCallbackHandler handler = results.f0; +createConfigMap(configMapName); +updateConfigMap(configMapName, ImmutableMap.of("foo", "bar")); + +handler.addFuture.get(); +handler.addOrUpdateFuture.get(); +assertThat(handler.assertFuture).isNotCompletedExceptionally(); + +client.deleteConfigMap(configMapName).get(); +handler.deleteFuture.get(); +if (handler.assertFuture.isCompletedExceptionally()) { +handler.assertFuture.get(); Review Comment: Thanks for the suggestion, I will refactor this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
Myasuka commented on code in PR #23764: URL: https://github.com/apache/flink/pull/23764#discussion_r1403242664 ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionHaServices.java: ## @@ -46,7 +46,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; Review Comment: I don't know whether some external systems relay on these flags, I prefer to not delete those labels. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
Myasuka commented on code in PR #23764: URL: https://github.com/apache/flink/pull/23764#discussion_r1403010154 ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesConfigMapSharedInformer.java: ## @@ -38,10 +39,21 @@ public KubernetesConfigMapSharedInformer( super(client, getInformableConfigMaps(client, labels), KubernetesConfigMap::new); Review Comment: I agree, will remove this constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
Myasuka commented on code in PR #23764: URL: https://github.com/apache/flink/pull/23764#discussion_r1403008922 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java: ## @@ -72,56 +71,41 @@ void setUp() throws Exception { @AfterEach void tearDown() throws Exception { ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, watchCallbackExecutorService); -client.deleteConfigMapsByLabels(labels).get(); +client.deleteConfigMap(configMapName).get(); } @Test @Timeout(12) public void testWatch() throws Exception { try (KubernetesConfigMapSharedWatcher watcher = -client.createConfigMapSharedWatcher(labels)) { +client.createConfigMapSharedWatcher(configMapName)) { for (int i = 0; i < 10; i++) { Review Comment: I agree, I think we can drop the loop here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
XComp commented on code in PR #23764: URL: https://github.com/apache/flink/pull/23764#discussion_r1401885433 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java: ## @@ -72,56 +71,41 @@ void setUp() throws Exception { @AfterEach void tearDown() throws Exception { ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, watchCallbackExecutorService); -client.deleteConfigMapsByLabels(labels).get(); +client.deleteConfigMap(configMapName).get(); } @Test @Timeout(12) public void testWatch() throws Exception { try (KubernetesConfigMapSharedWatcher watcher = -client.createConfigMapSharedWatcher(labels)) { +client.createConfigMapSharedWatcher(configMapName)) { for (int i = 0; i < 10; i++) { -List callbackHandlers = new ArrayList<>(); -List watchers = new ArrayList<>(); - -watchInRange(watcher, callbackHandlers, watchers, 0, 20); -createConfigMapsInRange(0, 5); -watchInRange(watcher, callbackHandlers, watchers, 20, 40); -createConfigMapsInRange(5, 10); -updateConfigMapInRange(0, 10, ImmutableMap.of("foo", "bar")); -for (TestingCallbackHandler handler : callbackHandlers) { -handler.addFuture.get(); -handler.addOrUpdateFuture.get(); - assertThat(handler.assertFuture).isNotCompletedExceptionally(); +Tuple2 results = watch(configMapName, watcher); +TestingCallbackHandler handler = results.f0; +createConfigMap(configMapName); +updateConfigMap(configMapName, ImmutableMap.of("foo", "bar")); + +handler.addFuture.get(); +handler.addOrUpdateFuture.get(); +assertThat(handler.assertFuture).isNotCompletedExceptionally(); + +client.deleteConfigMap(configMapName).get(); +handler.deleteFuture.get(); +if (handler.assertFuture.isCompletedExceptionally()) { +handler.assertFuture.get(); Review Comment: ```suggestion createConfigMap(configMapName); FlinkAssertions.assertThatFuture(handler.addFuture) .as("The creation of the ConfigMap should have been processed, eventually.") .eventuallySucceeds(); updateConfigMap(configMapName, ImmutableMap.of("foo", "bar")); FlinkAssertions.assertThatFuture(handler.addOrUpdateFuture) .as("The update of the ConfigMap should have been processed, eventually.") .eventuallySucceeds(); assertThat(handler.assertFuture).isNotCompletedExceptionally(); client.deleteConfigMap(configMapName).get(); FlinkAssertions.assertThatFuture(handler.deleteFuture) .as("The deletion of the ConfigMap should have been processed, eventually.") .eventuallySucceeds(); FlinkAssertions.assertThatFuture(handler.assertFuture).as("No error should have appeared while processing the data.").eventuallySucceeds(); ``` I know that this is a bit out-of-scope of this PR, but I'm wondering whether we could make this test a bit more readable. It's hard to grasp why certain future are meant to complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
XComp commented on code in PR #23764: URL: https://github.com/apache/flink/pull/23764#discussion_r1401826621 ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesTestFixture.java: ## @@ -76,10 +74,8 @@ class KubernetesTestFixture { configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId); flinkKubeClient = createFlinkKubeClient(); -configMapSharedWatcher = -flinkKubeClient.createConfigMapSharedWatcher( -KubernetesUtils.getConfigMapLabels( -clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)); +// use the configmap name as the informer Review Comment: ```suggestion ``` nit: the comment doesn't seem to add any value ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java: ## @@ -456,6 +450,8 @@ public static class TestingKubernetesConfigMapSharedWatcher public TestingKubernetesConfigMapSharedWatcher(Map labels) {} Review Comment: ```suggestion public TestingKubernetesConfigMapSharedWatcher(Map labels) {} ``` This one is not used anymore and can be deleted. ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesConfigMapSharedInformer.java: ## @@ -38,10 +39,21 @@ public KubernetesConfigMapSharedInformer( super(client, getInformableConfigMaps(client, labels), KubernetesConfigMap::new); Review Comment: What about removing this constructor and the then unused method `getInformableConfigMaps(NamespacedKubernetesClient, Map)` as part of this change? This code won't be covered by any tests anymore. ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java: ## @@ -72,56 +71,41 @@ void setUp() throws Exception { @AfterEach void tearDown() throws Exception { ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, watchCallbackExecutorService); -client.deleteConfigMapsByLabels(labels).get(); +client.deleteConfigMap(configMapName).get(); } @Test @Timeout(12) public void testWatch() throws Exception { try (KubernetesConfigMapSharedWatcher watcher = -client.createConfigMapSharedWatcher(labels)) { +client.createConfigMapSharedWatcher(configMapName)) { for (int i = 0; i < 10; i++) { Review Comment: ```suggestion ``` Can't we get rid of the for loop as well here? Or is there some value in running this test multiple times? :thinking: ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionHaServices.java: ## @@ -46,7 +46,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; Review Comment: I guess, `Constants.LABEL_CONFIGMAP_TYPE_KEY` would also be subject for deletion. ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesTestFixture.java: ## @@ -199,12 +195,11 @@ TestingFlinkKubeClient.Builder createFlinkKubeClientBuilder() { leaderConfig, callbackHandler); }) .setCreateConfigMapSharedWatcherFunction( -(labels) -> { +(name) -> { Review Comment: ```suggestion name -> { ``` nit ## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java: ## @@ -72,56 +71,41 @@ void setUp() throws Exception { @AfterEach void tearDown() throws Exception { ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, watchCallbackExecutorService); -client.deleteConfigMapsByLabels(labels).get(); +client.deleteConfigMap(configMapName).get(); } @Test @Timeout(12) public void testWatch() throws Exception { try (KubernetesConfigMapSharedWatcher watcher = -client.createConfigMapSharedWatcher(labels)) { +client.createConfigMapSharedWatcher(configMapName)) { for (int i = 0; i < 10; i++) { -List callbackHandlers = new ArrayList<>(); -List watchers = new ArrayList<>(); - -watchInRange(watcher, callbackHandlers, watchers, 0, 20); -createConfigMapsInRange(0, 5); -watchInRange(watcher, callbackHandlers, watchers, 20, 40); -createConfigMapsInRange(5, 10); -updateConfigMapInRange(0, 10, ImmutableMap.of("foo", "bar")); -for (TestingCallbackHandler handler : callbackHandlers) { -
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
Myasuka commented on code in PR #23764: URL: https://github.com/apache/flink/pull/23764#discussion_r1400535656 ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionHaServices.java: ## @@ -202,11 +199,7 @@ public void internalCleanup() throws Exception { exception = e; } -kubeClient -.deleteConfigMapsByLabels( -KubernetesUtils.getConfigMapLabels( -clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)) -.get(); Review Comment: For the code itself, yes, this PR changed the logic. However, for the internal of Flink, we do not change the behavior, and I think current change is the correct thing. After each job finished, Flink will clean up job related data (such as checkpoint id counter) on [Dispatcher#removeJob](https://github.com/apache/flink/blob/278504a2787a154faf6f6401028d4bbadafbba0a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1258), and once the cluster shutdown, it will then clean the cluster related configmap (such as HA related leader) on [ClusterEntrypoint#stopClusterServices](https://github.com/apache/flink/blob/278504a2787a154faf6f6401028d4bbadafbba0a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L502). For `KubernetesLeaderElectionHaServices`, it should only clean cluster related configs on `#internalCleanup` instead of cleanning the configmaps via labels. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
gyfora commented on code in PR #23764: URL: https://github.com/apache/flink/pull/23764#discussion_r1400201251 ## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionHaServices.java: ## @@ -202,11 +199,7 @@ public void internalCleanup() throws Exception { exception = e; } -kubeClient -.deleteConfigMapsByLabels( -KubernetesUtils.getConfigMapLabels( -clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)) -.get(); Review Comment: Based on your comment, the behaviour of this call changed now. Previously it would delete both configmaps (cluster + jobId) as they have the same label. But now only the cluster info ConfigMap will be deleted. Is that correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
Myasuka commented on PR #23764: URL: https://github.com/apache/flink/pull/23764#issuecomment-1820449210 @gyfora Sorry for not describing the problem clearly, I focus on the `-cluster-config-map` which has leader election for all the components(e.g. restserver, dispatcher, resourcemanager, jobmanager), jobGraph store and running job registry. After FLINK-24038, it was still watched via labels. The other `--config-map`, which records the checkpoint id, was not watched by taskmanagers and only used by jobmanager. BTW, I have updated the descriptions in the ticket and this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
gyfora commented on PR #23764: URL: https://github.com/apache/flink/pull/23764#issuecomment-1820384018 @Myasuka , I am a bit confused, you mention https://issues.apache.org/jira/browse/FLINK-24038 but I still see 2 `high-availability` configmaps for Flink 1.18 jobs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
flinkbot commented on PR #23764: URL: https://github.com/apache/flink/pull/23764#issuecomment-1820212100 ## CI report: * 1b5c0d16c60d1470ef2874238035f5cc2c39e1d6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
Myasuka commented on PR #23764: URL: https://github.com/apache/flink/pull/23764#issuecomment-1820210514 @wangyang0918 @dmvk @gyfora could you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33598][k8s] Watch HA configmap via name instead of lables to reduce pressure on APIserver [flink]
Myasuka opened a new pull request, #23764: URL: https://github.com/apache/flink/pull/23764 ## What is the purpose of the change As [FLINK-24819](https://issues.apache.org/jira/browse/FLINK-24819) described, the k8s API server would receive more pressure when HA is enabled, due to the configmap watching being achieved via filter with labels instead of just querying the configmap name. This could be done after [FLINK-24038](https://issues.apache.org/jira/browse/FLINK-24038), which reduced the number of configmaps to only one. ## Brief change log - Introduce new API named `FlinkKubeClient#createConfigMapSharedWatcher(String)` and use this API to create watcher. - Delete useless `FlinkKubeClient#createConfigMapSharedWatcher(Map labels)` and `FlinkKubeClient#deleteConfigMapsByLabels` APIs. ## Verifying this change This change added tests and can be verified as follows: - Modified test `KubernetesLeaderElectionAndRetrievalITCase` - Modified test `KubernetesTestFixture` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org