This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/release-1.6 by this push: new 25cadc34 [FLINK-32890] Correct HA patch check for zookeeper metadata store 25cadc34 is described below commit 25cadc34bb3b4b2cbbffbffeadaeb9e337b27516 Author: Nicolas Fraison <nicolas.frai...@datadoghq.com> AuthorDate: Thu Aug 17 17:30:52 2023 +0200 [FLINK-32890] Correct HA patch check for zookeeper metadata store --- .../apache/flink/kubernetes/operator/utils/FlinkUtils.java | 12 +++++++++--- .../kubernetes/operator/utils/FlinkUtilsZookeeperHATest.java | 4 ++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index 3b23fa80..02a587b9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -268,8 +268,13 @@ public class FlinkUtils { public static boolean isZookeeperHaMetadataAvailable(Configuration conf) { try (var curator = ZooKeeperUtils.startCuratorFramework(conf, exception -> {})) { - if (curator.asCuratorFramework().checkExists().forPath("/") != null) { - return curator.asCuratorFramework().getChildren().forPath("/").size() != 0; + if (curator.asCuratorFramework().checkExists().forPath(ZooKeeperUtils.getJobsPath()) + != null) { + return curator.asCuratorFramework() + .getChildren() + .forPath(ZooKeeperUtils.getJobsPath()) + .size() + != 0; } return false; } catch (Exception e) { @@ -277,7 +282,8 @@ public class FlinkUtils { "Could not check whether the HA metadata exists at path {} in Zookeeper", ZooKeeperUtils.generateZookeeperPath( conf.get(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT), - conf.get(HighAvailabilityOptions.HA_CLUSTER_ID)), + conf.get(HighAvailabilityOptions.HA_CLUSTER_ID), + ZooKeeperUtils.getJobsPath()), e); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsZookeeperHATest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsZookeeperHATest.java index 2b64a220..4a580c15 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsZookeeperHATest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsZookeeperHATest.java @@ -85,6 +85,10 @@ public class FlinkUtilsZookeeperHATest { jobGraph.setJobID(jobID); jobGraphStore.putJobGraph(jobGraph); jobGraphStore.stop(); + + // Create jobs znode + curator.create().forPath(ZooKeeperUtils.getJobsPath()); + curator.create().forPath(ZooKeeperUtils.getLeaderPathForJob(new JobID())); } @AfterEach