Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-02-01 Thread via GitHub


XComp merged PR #24132:
URL: https://github.com/apache/flink/pull/24132


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-30 Thread via GitHub


XComp commented on PR #24132:
URL: https://github.com/apache/flink/pull/24132#issuecomment-1916845790

   Ok, looks like the timeout was caused by some changes to the MockServer 
(which was moved into the `kubernetes-client` repository). 
   
   The mocked GET request contains some parameters. The order of the parameters 
matters when mocking the requests (...which is annyoying. I didn't find a way 
to make this order-agnostic). Anyway, updating the GET path makes the test 
succeed again.
   
   For 1.19, I'm gonna go ahead and upgrade `kubernetes-client` to v6.9.2 to 
include all `LeaderElector`-related bug fixes. I don't see a reason to upgrade 
to v6.10.0 ([release 
notes](https://github.com/fabric8io/kubernetes-client/releases/tag/v6.10.0)). 
There are no compelling changes and I rather keep the version bump small: We 
would have to apply more code changes on our side (due to 
https://github.com/fabric8io/kubernetes-client/commit/fd50fa96#diff-375ebadb4285b8214fe6209c8e1758b3cd21f17f9637fb1206173026c0c033d3R65
 ). 
   
   Any objections from your side, @gyfora ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-29 Thread via GitHub


XComp commented on PR #24132:
URL: https://github.com/apache/flink/pull/24132#issuecomment-1915730926

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-29 Thread via GitHub


XComp commented on PR #24132:
URL: https://github.com/apache/flink/pull/24132#issuecomment-1915730800

   hm, the [CI 
timeout](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57072=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=27728)
 should be related to the k8s client version update. I'm not able to reproduce 
it locally, though (after 2500 repetitions). I will check the release notes 
tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-29 Thread via GitHub


XComp commented on PR #24132:
URL: https://github.com/apache/flink/pull/24132#issuecomment-1914754366

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-29 Thread via GitHub


XComp commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1469397883


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   Reconsidering what I said before - upgrading even for 1.18 might be 
necessary. We don't want to keep another (known) bug lingering around in Flink 
1.18.
   
   I upgraded the dependency to 6.9.0 and will do a pass through the diff 
between v6.6.2 and v6.9.0 to check whether there's something else we have to 
keep in mind. :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-29 Thread via GitHub


XComp commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467590709


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   You're right. The release call would trigger the `notLeader` call when 
enabling `ReleaseOnCancel`. That was the wrong conclusion on my side. 
   
   But considering that there's this bug in 
https://github.com/fabric8io/kubernetes-client/issues/5463; should we consider 
upgrading the client for Flink 1.18? I tend to lean towards not upgrading the 
kubernetes-client. The diff between v6.6.2 and ~v6.8.0~ v6.9.0 appears to be 
quite big. :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-29 Thread via GitHub


XComp commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1469394774


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +106,33 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
-internalLeaderElector =
-new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+this.executorService = executorService;
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+cancelCurrentLeaderElectionSession();
+
+currentLeaderElectionSession =
+currentLeaderElectionSession.thenCompose(

Review Comment:
   true, I was wrong in my assumption that `cancel` would cancel also future 
chains that are created by `thenCompose`. But tbh, the `thenCompose` is not 
really necessary because we're cancelling the previous leadership session 
before re-re-initiating another session all being guarded by a lock. I fixed it 
and added another test case to cover this behavior. :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-29 Thread via GitHub


XComp commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1469392396


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -55,17 +59,32 @@ public class KubernetesLeaderElector {
 
 private final Object lock = new Object();
 
-private final ExecutorService executorService =
-Executors.newFixedThreadPool(
-3, new 
ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"));
+private final NamespacedKubernetesClient kubernetesClient;
+private final LeaderElectionConfig leaderElectionConfig;
+private final ExecutorService executorService;
 
-private final LeaderElector internalLeaderElector;
+private CompletableFuture currentLeaderElectionSession = 
FutureUtils.completedVoidFuture();
 
 public KubernetesLeaderElector(
 NamespacedKubernetesClient kubernetesClient,
 KubernetesLeaderElectionConfiguration leaderConfig,
 LeaderCallbackHandler leaderCallbackHandler) {
-final LeaderElectionConfig leaderElectionConfig =
+this(
+kubernetesClient,
+leaderConfig,
+leaderCallbackHandler,
+Executors.newFixedThreadPool(
+3, new 
ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService")));

Review Comment:
   Args, that slipped through again. Fixed :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-28 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1469124707


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +106,33 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
-internalLeaderElector =
-new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+this.executorService = executorService;
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+cancelCurrentLeaderElectionSession();
+
+currentLeaderElectionSession =
+currentLeaderElectionSession.thenCompose(

Review Comment:
   It seems that `currentLeaderElectionSession.cancel(true)` does not cancel 
the future of `LeaderElector#start()`. Then `KubernetesLeaderElector#stop()` 
will not trigger a revoke leadership.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-28 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1469121295


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -55,17 +59,32 @@ public class KubernetesLeaderElector {
 
 private final Object lock = new Object();
 
-private final ExecutorService executorService =
-Executors.newFixedThreadPool(
-3, new 
ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"));
+private final NamespacedKubernetesClient kubernetesClient;
+private final LeaderElectionConfig leaderElectionConfig;
+private final ExecutorService executorService;
 
-private final LeaderElector internalLeaderElector;
+private CompletableFuture currentLeaderElectionSession = 
FutureUtils.completedVoidFuture();
 
 public KubernetesLeaderElector(
 NamespacedKubernetesClient kubernetesClient,
 KubernetesLeaderElectionConfiguration leaderConfig,
 LeaderCallbackHandler leaderCallbackHandler) {
-final LeaderElectionConfig leaderElectionConfig =
+this(
+kubernetesClient,
+leaderConfig,
+leaderCallbackHandler,
+Executors.newFixedThreadPool(
+3, new 
ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService")));

Review Comment:
   Single thread pool is enough for now.



##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +106,33 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
-internalLeaderElector =
-new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+this.executorService = executorService;
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+cancelCurrentLeaderElectionSession();
+
+currentLeaderElectionSession =
+currentLeaderElectionSession.thenCompose(

Review Comment:
   It seems that `currentLeaderElectionSession.cancel(true)` will not cancel 
the future of `LeaderElector#start()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-28 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1469047743


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   If we enable the `isReleaseOnCancel` and do not upgrade the fabricio k8s 
client from v6.6.2 to v6.9.0, which include the fix 
https://github.com/fabric8io/kubernetes-client/issues/5463, then we have the 
risk that the `onStopLeading()` is never executed though the leadership lost. 
From the Flink's perspective, the `revokeLeadership` is never called while a 
new `grantLeadership` happens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-28 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1469043488


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   I agree with you that we do not need to add 
`leadershipCallbackHandler.waitForRevokeLeader();` to the ITCase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-26 Thread via GitHub


XComp commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467610608


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   I verified with the changed test that your finding is correct (the ITCase 
succeeds with the change below and my previous version of the PR but fails with 
the version where the explicit release call is removed):
   ```diff
   diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
   index 51631dc54f6..0bca6c4d641 100644
   --- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
   +++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
   @@ -182,6 +182,9 @@ class KubernetesLeaderElectorITCase {
// revoking the leadership initiates another leadership 
lifecycle
testInstance.run();

   +// leadership should be lost eventually due to the renewal loop 
being stopped
   +leadershipCallbackHandler.waitForRevokeLeader();
   +
// triggers acquiring the leadership again
executorService.trigger(waitForNextTaskForever);
   ```
   
   I didn't address this in a test because it's hard to come up with a 
condition that checks the non-existence of an event (either we would have to 
add a timeout which might make the test unstable or the test would take longer 
than necessary). WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-26 Thread via GitHub


XComp commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467610608


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   I verified with the changed test that your finding is correct:
   ```diff
   diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
   index 51631dc54f6..0bca6c4d641 100644
   --- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
   +++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
   @@ -182,6 +182,9 @@ class KubernetesLeaderElectorITCase {
// revoking the leadership initiates another leadership 
lifecycle
testInstance.run();

   +// leadership should be lost eventually due to the renewal loop 
being stopped
   +leadershipCallbackHandler.waitForRevokeLeader();
   +
// triggers acquiring the leadership again
executorService.trigger(waitForNextTaskForever);
   ```
   
   I didn't address this in a test because it's hard to come up with a 
condition that checks the non-existence of an event (either we would have to 
add a timeout which might make the test unstable or the test would take longer 
than necessary). WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-26 Thread via GitHub


XComp commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467590709


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   You're right. The release call would trigger the `notLeader` call when 
enabling `ReleaseOnCancel`. That was the wrong conclusion on my side. 
   
   But considering that there's this bug in 
https://github.com/fabric8io/kubernetes-client/issues/5463; should we consider 
upgrading the client for Flink 1.18? I tend to lean towards not upgrading the 
kubernetes-client. The diff between v6.6.2 and v6.8.0 appears to be quite big. 
:thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-26 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467583895


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   BTW, the ITCase could also pass by using v6.6.2 with 
`.withReleaseOnCancel(true)` and removing `ReleaseOnCall`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-26 Thread via GitHub


XComp commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467582561


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   Oh true, I should have looked for other calls of the `notLeader` method. 
Thanks for your clarification. I'm gonna check. :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-26 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467574472


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   What I mean is `release()` -> L149 `updateObserved(newLeaderElectionRecord)` 
-> L238 `leaderElectionConfig.getLeaderCallbacks().onStopLeading();` will also 
trigger the `notLeader()`. Right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-26 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467574472


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   What I mean is `release()` -> `updateObserved(newLeaderElectionRecord)` -> 
`leaderElectionConfig.getLeaderCallbacks().onStopLeading();` will also trigger 
the `notLeader()`. Right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-26 Thread via GitHub


XComp commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467558102


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   How will the `LeaderElector#release()` call trigger another `notLeader()` 
callback?
   
   `KubernetesLeaderElector#stopLeaderElectionCycle` is called in two places:
   * `KubernetesLeaderElector#stop`
 * State `leadership acquired`: The renew cycle of the 
`internalLeaderElector` will be cancelled which triggers the 
`LeaderCallbackHandler#notLeader` callback in 
[LeaderElector:96](https://github.com/fabric8io/kubernetes-client/blob/0f6c696509935a6a86fdb4620caea023d8e680f1/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L96).
 The v6.6.2 implementation of `LeaderElector#stopLeading` either calls release 
(if we enable `ReleaseOnCall`) or calls the `notLeader` callback. The bugfix 
for this is https://github.com/fabric8io/kubernetes-client/commit/0f6c6965 and 
ended in v6.9.0. Without upgrading the k8s client dependency, we have to call 
release explicitly to trigger the cleanup.
 * State `leadership lost` (assuming that a `KubernetesLeaderElector#run` 
happened and the elector is initialized): The `#release` call won't have any 
affect because of the if condition in 
[LeaderElector:136](https://github.com/fabric8io/kubernetes-client/blob/v6.6.2/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L136).
   * `KubernetesLeaderElector#resetInternalLeaderElector` which is called as 
part of the `KubernetesLeaderElector#run` method
 * State `leadership acquired`: That case shouldn't happen because we only 
call `#run` after the leadership is lost. If we would do it anyway, it would 
trigger the cancellation of leadership analogously to what happens for the 
`KubernetesLeaderElectorl#stop` call with leadership acquired described above. 
A new leaderelection lifecycle will be initiated afterwards in 
`KubernetesLeaderElector#resetInternalLeaderElector` call.
 * State `leadership lost`: The 
`KubernetesLeaderElector#currentLeaderElectionSession` is already completed. 
Therefore, cancelling this future doesn't have any effect. The subsequent 
`LeaderElector#release` call won't have any effect as well because of the if 
condition in 
[LeaderElector:136](https://github.com/fabric8io/kubernetes-client/blob/v6.6.2/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L136)
 again.
   
   I created FLINK-34243 and will add a comment to cover this. I squashed 
everything already. See [the 
diff](https://github.com/apache/flink/compare/f1f440cd43c40d4cd3b02c435ab128a00ad41c4e..4519e46514d3632d895e470ac0bf7896176eed77)
 for the actual changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-26 Thread via GitHub


XComp commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467558102


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   How will the `LeaderElector#release()` call trigger another `notLeader()` 
callback?
   
   `KubernetesLeaderElector#stopLeaderElectionCycle` is called in two places:
   * `KubernetesLeaderElector#stop`
 * State `leadership acquired`: The renew cycle of the 
`internalLeaderElector` will be cancelled which triggers the 
`LeaderCallbackHandler#notLeader` callback in 
[LeaderElector:96](https://github.com/fabric8io/kubernetes-client/blob/0f6c696509935a6a86fdb4620caea023d8e680f1/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L96).
 The v6.6.2 implementation of `LeaderElector#stopLeading` either calls release 
(if we enable `ReleaseOnCall`) or calls the `notLeader` callback. The bugfix 
for this is https://github.com/fabric8io/kubernetes-client/commit/0f6c6965 and 
ended in v6.9.0. Without upgrading the k8s client dependency, we have to call 
release explicitly to trigger the cleanup.
 * State `leadership lost` (assuming that a `KubernetesLeaderElector#run` 
happened and the elector is initialized): The `#release` call won't have any 
affect because of the if condition in 
[LeaderElector:136](https://github.com/fabric8io/kubernetes-client/blob/v6.6.2/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L136).
   * `KubernetesLeaderElector#resetInternalLeaderElector` which is called as 
part of the `KubernetesLeaderElector#run` method
 * State `leadership acquired`: That case shouldn't happen because we only 
call `#run` after the leadership is lost. If we would do it anyway, it would 
trigger the cancellation of leadership analogously to what happens for the 
`KubernetesLeaderElectorl#stop` call with leadership acquired described above. 
A new leaderelection lifecycle will be initiated afterwards in 
`KubernetesLeaderElector#resetInternalLeaderElector` call.
 * State `leadership lost`: The 
`KubernetesLeaderElector#currentLeaderElectionSession` is already completed. 
Therefore, cancelling this future doesn't have any effect. The subsequent 
`LeaderElector#release` call won't have any effect as well because of the if 
condition in 
[LeaderElector:136](https://github.com/fabric8io/kubernetes-client/blob/v6.6.2/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L136)
 again.
   
   I created FLINK-34243 and will add a comment to cover this
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-26 Thread via GitHub


XComp commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467504153


##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java:
##
@@ -110,4 +114,67 @@ void testMultipleKubernetesLeaderElectors() throws 
Exception {
 
kubernetesExtension.getFlinkKubeClient().deleteConfigMap(leaderConfigMapName).get();
 }
 }
+
+/**
+ * This test verifies that the {@link KubernetesLeaderElector} is able to 
handle scenario where
+ * the lease cannot be renewed.
+ *
+ * See FLINK-34007 for further details.
+ */
+@Test
+void testLeaderElectorLifecycleManagement() throws Exception {

Review Comment:
   Good point. I moved the ConfigMap lifecycle management into separate 
per-test methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-25 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467160375


##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java:
##
@@ -110,4 +114,67 @@ void testMultipleKubernetesLeaderElectors() throws 
Exception {
 
kubernetesExtension.getFlinkKubeClient().deleteConfigMap(leaderConfigMapName).get();
 }
 }
+
+/**
+ * This test verifies that the {@link KubernetesLeaderElector} is able to 
handle scenario where
+ * the lease cannot be renewed.
+ *
+ * See FLINK-34007 for further details.
+ */
+@Test
+void testLeaderElectorLifecycleManagement() throws Exception {

Review Comment:
   Great test. We might need to delete the leader elector ConfigMap because 
fabricio `LeaderElector` does not clean up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-25 Thread via GitHub


wangyang0918 commented on code in PR #24132:
URL: https://github.com/apache/flink/pull/24132#discussion_r1467160375


##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java:
##
@@ -110,4 +114,67 @@ void testMultipleKubernetesLeaderElectors() throws 
Exception {
 
kubernetesExtension.getFlinkKubeClient().deleteConfigMap(leaderConfigMapName).get();
 }
 }
+
+/**
+ * This test verifies that the {@link KubernetesLeaderElector} is able to 
handle scenario where
+ * the lease cannot be renewed.
+ *
+ * See FLINK-34007 for further details.
+ */
+@Test
+void testLeaderElectorLifecycleManagement() throws Exception {

Review Comment:
   Great test. We might need to delete the leader elector ConfigMap because 
fabricio {{LeaderElector}} does not clean up.



##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java:
##
@@ -86,12 +117,38 @@ public KubernetesLeaderElector(
 newLeader,
 
leaderConfig.getConfigMapName(
 .build();
+this.executorService = executorService;
+
+LOG.info(
+"Create KubernetesLeaderElector on lock {}.",
+leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void resetInternalLeaderElector() {
+stopLeaderElectionCycle();
+
 internalLeaderElector =
 new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+currentLeaderElectionSession = internalLeaderElector.start();
+
 LOG.info(
-"Create KubernetesLeaderElector {} with lock identity {}.",
-leaderConfig.getConfigMapName(),
-leaderConfig.getLockIdentity());
+"Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+}
+
+@GuardedBy("lock")
+private void stopLeaderElectionCycle() {
+if (internalLeaderElector != null) {
+Preconditions.checkNotNull(currentLeaderElectionSession);
+
+// the current leader election cycle needs to be cancelled before 
releasing the lock to
+// avoid retrying
+currentLeaderElectionSession.cancel(true);
+currentLeaderElectionSession = null;
+
+internalLeaderElector.release();

Review Comment:
   Executing the `LeaderElector#release()` will trigger another `notLeader()` 
callback, which revoke the leader contender leadership again. It might be 
harmless because the leader contender already lost leadership.
   
   Could you please share me why we do not simply add 
`.withReleaseOnCancel(true)` when building the `leaderElectionConfig`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-23 Thread via GitHub


XComp commented on PR #24132:
URL: https://github.com/apache/flink/pull/24132#issuecomment-1907572028

   [ci 
failure](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=56790)
 is caused by some infrastructure instability in the compile step of the 
connect stage:
   ```
   Jan 23 18:59:09 18:59:09.769 [WARNING] locking 
FileBasedConfig[/home/agent01_azpcontainer/.config/jgit/config] failed after 5 
retries
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-23 Thread via GitHub


XComp commented on PR #24132:
URL: https://github.com/apache/flink/pull/24132#issuecomment-1907572233

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-18 Thread via GitHub


flinkbot commented on PR #24132:
URL: https://github.com/apache/flink/pull/24132#issuecomment-1898519756

   
   ## CI report:
   
   * 1193ea4f3deff5b3d7ba7510910fd77059ff46f2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34007][k8s] Adds workaround that fixes the deadlock when renewing the leadership lease fails [flink]

2024-01-18 Thread via GitHub


XComp opened a new pull request, #24132:
URL: https://github.com/apache/flink/pull/24132

   tbd
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org