CAMEL-11331: Clock-drift-free version of the protocol
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f0b00ab9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f0b00ab9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f0b00ab9 Branch: refs/heads/master Commit: f0b00ab927e9e55c238c8ea77e1b08c65335ca01 Parents: a0b2320 Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Mon Jul 31 17:17:34 2017 +0200 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Tue Aug 8 16:39:43 2017 +0200 ---------------------------------------------------------------------- .../kubernetes/ha/KubernetesClusterService.java | 84 ++-- .../kubernetes/ha/KubernetesClusterView.java | 6 +- .../kubernetes/ha/lock/ConfigMapLockUtils.java | 15 +- .../ha/lock/KubernetesLeadershipController.java | 352 +++++++++++++++++ ...ubernetesLeaseBasedLeadershipController.java | 384 ------------------- .../ha/lock/KubernetesLockConfiguration.java | 84 ++-- .../ha/lock/KubernetesMembersMonitor.java | 237 ------------ .../kubernetes/ha/lock/LeaderInfo.java | 46 ++- .../kubernetes/ha/lock/TimedLeaderNotifier.java | 179 +++++++++ .../ha/KubernetesClusterServiceTest.java | 59 +-- .../kubernetes/ha/TimedLeaderNotifierTest.java | 117 ++++++ .../kubernetes/ha/utils/LeaderRecorder.java | 5 + .../kubernetes/ha/utils/LockTestServer.java | 31 +- 13 files changed, 813 insertions(+), 786 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java index a868d16..08ebb70 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java @@ -82,25 +82,22 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern if (config.getJitterFactor() < 1) { throw new IllegalStateException("jitterFactor must be >= 1 (found: " + config.getJitterFactor() + ")"); } - if (config.getRetryOnErrorIntervalSeconds() <= 0) { - throw new IllegalStateException("retryOnErrorIntervalSeconds must be > 0 (found: " + config.getRetryOnErrorIntervalSeconds() + ")"); + if (config.getRetryPeriodMillis() <= 0) { + throw new IllegalStateException("retryPeriodMillis must be > 0 (found: " + config.getRetryPeriodMillis() + ")"); } - if (config.getRetryPeriodSeconds() <= 0) { - throw new IllegalStateException("retryPeriodSeconds must be > 0 (found: " + config.getRetryPeriodSeconds() + ")"); + if (config.getRenewDeadlineMillis() <= 0) { + throw new IllegalStateException("renewDeadlineMillis must be > 0 (found: " + config.getRenewDeadlineMillis() + ")"); } - if (config.getRenewDeadlineSeconds() <= 0) { - throw new IllegalStateException("renewDeadlineSeconds must be > 0 (found: " + config.getRenewDeadlineSeconds() + ")"); + if (config.getLeaseDurationMillis() <= 0) { + throw new IllegalStateException("leaseDurationMillis must be > 0 (found: " + config.getLeaseDurationMillis() + ")"); } - if (config.getLeaseDurationSeconds() <= 0) { - throw new IllegalStateException("leaseDurationSeconds must be > 0 (found: " + config.getLeaseDurationSeconds() + ")"); + if (config.getLeaseDurationMillis() <= config.getRenewDeadlineMillis()) { + throw new IllegalStateException("leaseDurationMillis must be greater than renewDeadlineMillis " + + "(" + config.getLeaseDurationMillis() + " is not greater than " + config.getRenewDeadlineMillis() + ")"); } - if (config.getLeaseDurationSeconds() <= config.getRenewDeadlineSeconds()) { - throw new IllegalStateException("leaseDurationSeconds must be greater than renewDeadlineSeconds " - + "(" + config.getLeaseDurationSeconds() + " is not greater than " + config.getRenewDeadlineSeconds() + ")"); - } - if (config.getRenewDeadlineSeconds() <= config.getJitterFactor() * config.getRetryPeriodSeconds()) { - throw new IllegalStateException("renewDeadlineSeconds must be greater than jitterFactor*retryPeriodSeconds " - + "(" + config.getRenewDeadlineSeconds() + " is not greater than " + config.getJitterFactor() + "*" + config.getRetryPeriodSeconds() + ")"); + if (config.getRenewDeadlineMillis() <= config.getJitterFactor() * config.getRetryPeriodMillis()) { + throw new IllegalStateException("renewDeadlineMillis must be greater than jitterFactor*retryPeriodMillis " + + "(" + config.getRenewDeadlineMillis() + " is not greater than " + config.getJitterFactor() + "*" + config.getRetryPeriodMillis() + ")"); } return config; @@ -176,16 +173,8 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern lockConfiguration.setKubernetesResourcesNamespace(kubernetesResourcesNamespace); } - public long getRetryOnErrorIntervalSeconds() { - return lockConfiguration.getRetryOnErrorIntervalSeconds(); - } - - /** - * Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated. - * Watch recreation can be disabled by putting value <= 0. - */ - public void setRetryOnErrorIntervalSeconds(long retryOnErrorIntervalSeconds) { - lockConfiguration.setRetryOnErrorIntervalSeconds(retryOnErrorIntervalSeconds); + public String getKubernetesResourcesNamespaceOrDefault(KubernetesClient kubernetesClient) { + return lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient); } public double getJitterFactor() { @@ -193,56 +182,43 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern } /** - * A jitter factor to apply in order to prevent all pods to try to become leaders in the same instant. + * A jitter factor to apply in order to prevent all pods to call Kubernetes APIs in the same instant. */ public void setJitterFactor(double jitterFactor) { lockConfiguration.setJitterFactor(jitterFactor); } - public long getLeaseDurationSeconds() { - return lockConfiguration.getLeaseDurationSeconds(); + public long getLeaseDurationMillis() { + return lockConfiguration.getLeaseDurationMillis(); } /** * The default duration of the lease for the current leader. */ - public void setLeaseDurationSeconds(long leaseDurationSeconds) { - lockConfiguration.setLeaseDurationSeconds(leaseDurationSeconds); - } - - public long getRenewDeadlineSeconds() { - return lockConfiguration.getRenewDeadlineSeconds(); - } - - /** - * The deadline after which the leader must stop trying to renew its leadership (and yield it). - */ - public void setRenewDeadlineSeconds(long renewDeadlineSeconds) { - lockConfiguration.setRenewDeadlineSeconds(renewDeadlineSeconds); + public void setLeaseDurationMillis(long leaseDurationMillis) { + lockConfiguration.setLeaseDurationMillis(leaseDurationMillis); } - public long getRetryPeriodSeconds() { - return lockConfiguration.getRetryPeriodSeconds(); + public long getRenewDeadlineMillis() { + return lockConfiguration.getRenewDeadlineMillis(); } /** - * The time between two subsequent attempts to acquire/renew the leadership (or after the lease expiration). - * It is randomized using the jitter factor in case of new leader election (not renewal). + * The deadline after which the leader must stop its services because it may have lost the leadership. */ - public void setRetryPeriodSeconds(long retryPeriodSeconds) { - lockConfiguration.setRetryPeriodSeconds(retryPeriodSeconds); + public void setRenewDeadlineMillis(long renewDeadlineMillis) { + lockConfiguration.setRenewDeadlineMillis(renewDeadlineMillis); } - public long getWatchRefreshIntervalSeconds() { - return lockConfiguration.getWatchRefreshIntervalSeconds(); + public long getRetryPeriodMillis() { + return lockConfiguration.getRetryPeriodMillis(); } /** - * Set this to a positive value in order to recreate watchers after a certain amount of time, - * to avoid having stale watchers. + * The time between two subsequent attempts to check and acquire the leadership. + * It is randomized using the jitter factor. */ - public void setWatchRefreshIntervalSeconds(long watchRefreshIntervalSeconds) { - lockConfiguration.setWatchRefreshIntervalSeconds(watchRefreshIntervalSeconds); + public void setRetryPeriodMillis(long retryPeriodMillis) { + lockConfiguration.setRetryPeriodMillis(retryPeriodMillis); } - } http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java index 28f38a5..ddda675 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java @@ -30,7 +30,7 @@ import io.fabric8.kubernetes.client.KubernetesClient; import org.apache.camel.component.kubernetes.KubernetesConfiguration; import org.apache.camel.component.kubernetes.KubernetesHelper; import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent; -import org.apache.camel.component.kubernetes.ha.lock.KubernetesLeaseBasedLeadershipController; +import org.apache.camel.component.kubernetes.ha.lock.KubernetesLeadershipController; import org.apache.camel.component.kubernetes.ha.lock.KubernetesLockConfiguration; import org.apache.camel.ha.CamelClusterMember; import org.apache.camel.impl.ha.AbstractCamelClusterView; @@ -56,7 +56,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView { private volatile List<CamelClusterMember> currentMembers = Collections.emptyList(); - private KubernetesLeaseBasedLeadershipController controller; + private KubernetesLeadershipController controller; public KubernetesClusterView(KubernetesClusterService cluster, KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) { super(cluster, lockConfiguration.getGroupName()); @@ -86,7 +86,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView { if (controller == null) { this.kubernetesClient = KubernetesHelper.getKubernetesClient(configuration); - controller = new KubernetesLeaseBasedLeadershipController(kubernetesClient, this.lockConfiguration, event -> { + controller = new KubernetesLeadershipController(kubernetesClient, this.lockConfiguration, event -> { if (event instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) { // New leader Optional<String> leader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData(); http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java index 70fa860..feea1c6 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java @@ -18,6 +18,7 @@ package org.apache.camel.component.kubernetes.ha.lock; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Set; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapBuilder; @@ -36,7 +37,7 @@ public final class ConfigMapLockUtils { private static final String LEADER_PREFIX = "leader.pod."; - private static final String TIMESTAMP_PREFIX = "leader.timestamp."; + private static final String LOCAL_TIMESTAMP_PREFIX = "leader.local.timestamp."; private ConfigMapLockUtils() { } @@ -49,19 +50,19 @@ public final class ConfigMapLockUtils { .addToLabels("kind", "locks"). endMetadata() .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader()) - .addToData(TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getTimestamp())) + .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getLocalTimestamp())) .build(); } public static ConfigMap getConfigMapWithNewLeader(ConfigMap configMap, LeaderInfo leaderInfo) { return new ConfigMapBuilder(configMap) .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader()) - .addToData(TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getTimestamp())) + .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getLocalTimestamp())) .build(); } - public static LeaderInfo getLeaderInfo(ConfigMap configMap, String group) { - return new LeaderInfo(group, getLeader(configMap, group), getTimestamp(configMap, group)); + public static LeaderInfo getLeaderInfo(ConfigMap configMap, Set<String> members, String group) { + return new LeaderInfo(group, getLeader(configMap, group), getLocalTimestamp(configMap, group), members); } private static String getLeader(ConfigMap configMap, String group) { @@ -81,8 +82,8 @@ public final class ConfigMapLockUtils { return null; } - private static Date getTimestamp(ConfigMap configMap, String group) { - String timestamp = getConfigMapValue(configMap, TIMESTAMP_PREFIX + group); + private static Date getLocalTimestamp(ConfigMap configMap, String group) { + String timestamp = getConfigMapValue(configMap, LOCAL_TIMESTAMP_PREFIX + group); if (timestamp == null) { return null; } http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java new file mode 100644 index 0000000..f527779 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java @@ -0,0 +1,352 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.lock; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; + +import org.apache.camel.Service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Monitors current status and participate to leader election when no active leaders are present. + * It communicates changes in leadership and cluster members to the given event handler. + */ +public class KubernetesLeadershipController implements Service { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeadershipController.class); + + private enum State { + NOT_LEADER, + BECOMING_LEADER, + LEADER + } + + private KubernetesClient kubernetesClient; + + private KubernetesLockConfiguration lockConfiguration; + + private KubernetesClusterEventHandler eventHandler; + + private State currentState = State.NOT_LEADER; + + private ScheduledExecutorService serializedExecutor; + + private TimedLeaderNotifier leaderNotifier; + + private volatile LeaderInfo latestLeaderInfo; + private volatile ConfigMap latestConfigMap; + private volatile Set<String> latestMembers; + + public KubernetesLeadershipController(KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) { + this.kubernetesClient = kubernetesClient; + this.lockConfiguration = lockConfiguration; + this.eventHandler = eventHandler; + } + + @Override + public void start() throws Exception { + if (serializedExecutor == null) { + LOG.debug("{} Starting leadership controller...", logPrefix()); + serializedExecutor = Executors.newSingleThreadScheduledExecutor(); + leaderNotifier = new TimedLeaderNotifier(this.eventHandler); + + leaderNotifier.start(); + serializedExecutor.execute(this::refreshStatus); + } + } + + @Override + public void stop() throws Exception { + LOG.debug("{} Stopping leadership controller...", logPrefix()); + if (serializedExecutor != null) { + serializedExecutor.shutdownNow(); + } + serializedExecutor = null; + + if (leaderNotifier != null) { + leaderNotifier.stop(); + } + leaderNotifier = null; + } + + private void refreshStatus() { + switch (currentState) { + case NOT_LEADER: + refreshStatusNotLeader(); + break; + case BECOMING_LEADER: + refreshStatusBecomingLeader(); + break; + case LEADER: + refreshStatusLeader(); + break; + default: + throw new RuntimeException("Unsupported state " + currentState); + } + } + + /** + * This pod is currently not leader. It should monitor the leader configuration and try + * to acquire the leadership if possible. + */ + private void refreshStatusNotLeader() { + LOG.debug("{} Pod is not leader, pulling new data from the cluster", logPrefix()); + boolean pulled = lookupNewLeaderInfo(); + if (!pulled) { + rescheduleAfterDelay(); + return; + } + + if (this.latestLeaderInfo.hasEmptyLeader()) { + // There is no previous leader + LOG.info("{} The cluster has no leaders. Trying to acquire the leadership...", logPrefix()); + boolean acquired = tryAcquireLeadership(); + if (acquired) { + LOG.info("{} Leadership acquired by current pod ({}) with immediate effect", logPrefix(), this.lockConfiguration.getPodName()); + this.currentState = State.LEADER; + this.serializedExecutor.execute(this::refreshStatus); + return; + } else { + LOG.info("{} Unable to acquire the leadership, it may have been acquired by another pod", logPrefix()); + } + } else if (!this.latestLeaderInfo.hasValidLeader()) { + // There's a previous leader and it's invalid + LOG.info("{} Leadership has been lost by old owner. Trying to acquire the leadership...", logPrefix()); + boolean acquired = tryAcquireLeadership(); + if (acquired) { + LOG.info("{} Leadership acquired by current pod ({})", logPrefix(), this.lockConfiguration.getPodName()); + this.currentState = State.BECOMING_LEADER; + this.serializedExecutor.execute(this::refreshStatus); + return; + } else { + LOG.info("{} Unable to acquire the leadership, it may have been acquired by another pod", logPrefix()); + } + } else if (this.latestLeaderInfo.isValidLeader(this.lockConfiguration.getPodName())) { + // We are leaders for some reason (e.g. pod restart on failure) + LOG.info("{} Leadership is already owned by current pod ({})", logPrefix(), this.lockConfiguration.getPodName()); + this.currentState = State.BECOMING_LEADER; + this.serializedExecutor.execute(this::refreshStatus); + return; + } + + this.leaderNotifier.refreshLeadership(Optional.ofNullable(this.latestLeaderInfo.getLeader()), + System.currentTimeMillis(), + this.lockConfiguration.getLeaseDurationMillis(), + this.latestLeaderInfo.getMembers()); + rescheduleAfterDelay(); + } + + /** + * This pod has acquired the leadership but it should wait for the old leader + * to tear down resources before starting the local services. + */ + private void refreshStatusBecomingLeader() { + // Wait always the same amount of time before becoming the leader + // Even if the current pod is already leader, we should let a possible old version of the pod to shut down + long delay = this.lockConfiguration.getLeaseDurationMillis(); + LOG.info("{} Current pod ({}) owns the leadership, but it will be effective in {} seconds...", logPrefix(), this.lockConfiguration.getPodName(), new BigDecimal(delay).divide(BigDecimal + .valueOf(1000), 2, BigDecimal.ROUND_HALF_UP)); + + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + LOG.warn("Thread interrupted", e); + } + + LOG.info("{} Current pod ({}) is becoming the new leader now...", logPrefix(), this.lockConfiguration.getPodName()); + this.currentState = State.LEADER; + this.serializedExecutor.execute(this::refreshStatus); + } + + private void refreshStatusLeader() { + LOG.debug("{} Pod should be the leader, pulling new data from the cluster", logPrefix()); + long timeBeforePulling = System.currentTimeMillis(); + boolean pulled = lookupNewLeaderInfo(); + if (!pulled) { + rescheduleAfterDelay(); + return; + } + + if (this.latestLeaderInfo.isValidLeader(this.lockConfiguration.getPodName())) { + LOG.debug("{} Current Pod ({}) is still the leader", logPrefix(), this.lockConfiguration.getPodName()); + this.leaderNotifier.refreshLeadership(Optional.of(this.lockConfiguration.getPodName()), + timeBeforePulling, + this.lockConfiguration.getRenewDeadlineMillis(), + this.latestLeaderInfo.getMembers()); + rescheduleAfterDelay(); + return; + } else { + LOG.debug("{} Current Pod ({}) has lost the leadership", logPrefix(), this.lockConfiguration.getPodName()); + this.currentState = State.NOT_LEADER; + // set a empty leader to signal leadership loss + this.leaderNotifier.refreshLeadership(Optional.empty(), + System.currentTimeMillis(), + lockConfiguration.getLeaseDurationMillis(), + this.latestLeaderInfo.getMembers()); + + // wait a lease time and restart + this.serializedExecutor.schedule(this::refreshStatus, this.lockConfiguration.getLeaseDurationMillis(), TimeUnit.MILLISECONDS); + } + } + + private void rescheduleAfterDelay() { + this.serializedExecutor.schedule(this::refreshStatus, jitter(this.lockConfiguration.getRetryPeriodMillis(), this.lockConfiguration.getJitterFactor()), TimeUnit.MILLISECONDS); + } + + private boolean lookupNewLeaderInfo() { + LOG.debug("{} Looking up leadership information...", logPrefix()); + + ConfigMap configMap; + try { + configMap = pullConfigMap(); + } catch (Throwable e) { + LOG.warn(logPrefix() + " Unable to retrieve the current ConfigMap " + this.lockConfiguration.getConfigMapName() + " from Kubernetes"); + LOG.debug(logPrefix() + " Exception thrown during ConfigMap lookup", e); + return false; + } + + Set<String> members; + try { + members = Objects.requireNonNull(pullClusterMembers(), "Retrieved a null set of members"); + } catch (Throwable e) { + LOG.warn(logPrefix() + " Unable to retrieve the list of cluster members from Kubernetes"); + LOG.debug(logPrefix() + " Exception thrown during Pod list lookup", e); + return false; + } + + updateLatestLeaderInfo(configMap, members); + return true; + } + + private boolean tryAcquireLeadership() { + LOG.debug("{} Trying to acquire the leadership...", logPrefix()); + + ConfigMap configMap = this.latestConfigMap; + Set<String> members = this.latestMembers; + LeaderInfo latestLeaderInfo = this.latestLeaderInfo; + + if (latestLeaderInfo == null || members == null) { + LOG.warn(logPrefix() + " Unexpected condition. Latest leader info or list of members is empty."); + return false; + } else if (!members.contains(this.lockConfiguration.getPodName())) { + LOG.warn(logPrefix() + " The list of cluster members " + latestLeaderInfo.getMembers() + " does not contain the current pod (" + this.lockConfiguration.getPodName() + "). Cannot acquire" + + " leadership."); + return false; + } + + // Info we would set set in the configmap to become leaders + LeaderInfo newLeaderInfo = new LeaderInfo(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName(), new Date(), members); + + if (configMap == null) { + // No ConfigMap created so far + LOG.debug("{} Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created", logPrefix()); + ConfigMap newConfigMap = ConfigMapLockUtils.createNewConfigMap(this.lockConfiguration.getConfigMapName(), newLeaderInfo); + + try { + kubernetesClient.configMaps() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .create(newConfigMap); + + LOG.debug("{} ConfigMap {} successfully created", logPrefix(), this.lockConfiguration.getConfigMapName()); + updateLatestLeaderInfo(newConfigMap, members); + return true; + } catch (Exception ex) { + // Suppress exception + LOG.warn(logPrefix() + " Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has " + + "the right " + + "permissions to create it"); + LOG.debug(logPrefix() + " Exception while trying to create the ConfigMap", ex); + return false; + } + } else { + LOG.debug("{} Lock configmap already present in the Kubernetes namespace. Checking...", logPrefix()); + LeaderInfo leaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, members, this.lockConfiguration.getGroupName()); + + boolean canAcquire = !leaderInfo.hasValidLeader(); + if (canAcquire) { + // Try to be the new leader + try { + ConfigMap updatedConfigMap = ConfigMapLockUtils.getConfigMapWithNewLeader(configMap, newLeaderInfo); + kubernetesClient.configMaps() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .withName(this.lockConfiguration.getConfigMapName()) + .lockResourceVersion(configMap.getMetadata().getResourceVersion()) + .replace(updatedConfigMap); + + LOG.debug("{} ConfigMap {} successfully updated", logPrefix(), this.lockConfiguration.getConfigMapName()); + updateLatestLeaderInfo(updatedConfigMap, members); + return true; + } catch (Exception ex) { + LOG.warn(logPrefix() + " Unable to update the lock ConfigMap to set leadership information"); + LOG.debug(logPrefix() + " Error received during configmap lock replace", ex); + return false; + } + } else { + // Another pod is the leader and it's still active + LOG.debug("{} Another pod ({}) is the current leader and it is still active", logPrefix(), this.latestLeaderInfo.getLeader()); + return false; + } + } + } + + private void updateLatestLeaderInfo(ConfigMap configMap, Set<String> members) { + LOG.debug("{} Updating internal status about the current leader", logPrefix()); + this.latestConfigMap = configMap; + this.latestMembers = members; + this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, members, this.lockConfiguration.getGroupName()); + LOG.debug("{} Current leader info: {}", logPrefix(), this.latestLeaderInfo); + } + + private ConfigMap pullConfigMap() { + return kubernetesClient.configMaps() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .withName(this.lockConfiguration.getConfigMapName()) + .get(); + } + + private Set<String> pullClusterMembers() { + List<Pod> pods = kubernetesClient.pods() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .withLabels(this.lockConfiguration.getClusterLabels()) + .list().getItems(); + + return pods.stream().map(pod -> pod.getMetadata().getName()).collect(Collectors.toSet()); + } + + private long jitter(long num, double factor) { + return (long) (num * (1 + Math.random() * (factor - 1))); + } + + private String logPrefix() { + return "Leadership Controller [" + this.lockConfiguration.getPodName() + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java deleted file mode 100644 index 76e91bf..0000000 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java +++ /dev/null @@ -1,384 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kubernetes.ha.lock; - -import java.util.Date; -import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.client.KubernetesClient; - -import org.apache.camel.Service; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Monitors current status and participate to leader election when no active leaders are present. - * It communicates changes in leadership and cluster members to the given event handler. - */ -public class KubernetesLeaseBasedLeadershipController implements Service { - - private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaseBasedLeadershipController.class); - - private static final long FIXED_ADDITIONAL_DELAY = 100; - - private KubernetesClient kubernetesClient; - - private KubernetesLockConfiguration lockConfiguration; - - private KubernetesClusterEventHandler eventHandler; - - private ScheduledExecutorService serializedExecutor; - private ScheduledExecutorService eventDispatcherExecutor; - - private KubernetesMembersMonitor membersMonitor; - - private Optional<String> currentLeader = Optional.empty(); - - private volatile LeaderInfo latestLeaderInfo; - - public KubernetesLeaseBasedLeadershipController(KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) { - this.kubernetesClient = kubernetesClient; - this.lockConfiguration = lockConfiguration; - this.eventHandler = eventHandler; - } - - @Override - public void start() throws Exception { - if (serializedExecutor == null) { - LOG.debug("Starting leadership controller..."); - serializedExecutor = Executors.newSingleThreadScheduledExecutor(); - - eventDispatcherExecutor = Executors.newSingleThreadScheduledExecutor(); - - membersMonitor = new KubernetesMembersMonitor(this.serializedExecutor, this.kubernetesClient, this.lockConfiguration); - if (eventHandler != null) { - membersMonitor.addClusterEventHandler(eventHandler); - } - - membersMonitor.start(); - serializedExecutor.execute(this::initialization); - } - } - - @Override - public void stop() throws Exception { - LOG.debug("Stopping leadership controller..."); - if (serializedExecutor != null) { - serializedExecutor.shutdownNow(); - } - if (eventDispatcherExecutor != null) { - eventDispatcherExecutor.shutdown(); - eventDispatcherExecutor.awaitTermination(2, TimeUnit.SECONDS); - eventDispatcherExecutor.shutdownNow(); - } - if (membersMonitor != null) { - membersMonitor.stop(); - } - - membersMonitor = null; - eventDispatcherExecutor = null; - serializedExecutor = null; - } - - /** - * Get the first ConfigMap and setup the initial state. - */ - private void initialization() { - LOG.debug("Reading (with retry) the configmap {} to detect the current leader", this.lockConfiguration.getConfigMapName()); - refreshConfigMapFromCluster(true); - - if (isCurrentPodTheActiveLeader()) { - serializedExecutor.execute(this::onLeadershipAcquired); - } else { - LOG.info("The current pod ({}) is not the leader of the group '{}' in ConfigMap '{}' at this time", this.lockConfiguration.getPodName(), this.lockConfiguration - .getGroupName(), this.lockConfiguration.getConfigMapName()); - serializedExecutor.execute(this::acquireLeadershipCycle); - } - } - - /** - * Signals the acquisition of the leadership and move to the keep-leadership state. - */ - private void onLeadershipAcquired() { - LOG.info("The current pod ({}) is now the leader of the group '{}' in ConfigMap '{}'", this.lockConfiguration.getPodName(), this.lockConfiguration - .getGroupName(), this.lockConfiguration.getConfigMapName()); - - this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader); - - long nextDelay = computeNextRenewWaitTime(this.latestLeaderInfo.getTimestamp(), this.latestLeaderInfo.getTimestamp()); - serializedExecutor.schedule(this::keepLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS); - } - - /** - * While in the keep-leadership state, the controller periodically renews the lease. - * If a renewal deadline is met and it was not possible to renew the lease, the leadership is lost. - */ - private void keepLeadershipCycle() { - // renew lease periodically - refreshConfigMapFromCluster(false); // if possible, update - - if (this.latestLeaderInfo.isTimeElapsedSeconds(lockConfiguration.getRenewDeadlineSeconds()) || !this.latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())) { - // Time over for renewal or leadership lost - LOG.debug("The current pod ({}) has lost the leadership", this.lockConfiguration.getPodName()); - serializedExecutor.execute(this::onLeadershipLost); - return; - } - - boolean success = tryAcquireOrRenewLeadership(); - LOG.debug("Attempted to renew the lease. Success={}", success); - - long nextDelay = computeNextRenewWaitTime(this.latestLeaderInfo.getTimestamp(), new Date()); - serializedExecutor.schedule(this::keepLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS); - } - - /** - * Compute the timestamp of next event while in keep-leadership state. - */ - private long computeNextRenewWaitTime(Date lastRenewal, Date lastRenewalAttempt) { - long timeDeadline = lastRenewal.getTime() + this.lockConfiguration.getRenewDeadlineSeconds() * 1000; - long timeRetry; - long counter = 0; - do { - counter++; - timeRetry = lastRenewal.getTime() + counter * this.lockConfiguration.getRetryPeriodSeconds() * 1000; - } while (timeRetry < lastRenewalAttempt.getTime() && timeRetry < timeDeadline); - - long time = Math.min(timeRetry, timeDeadline); - long delay = Math.max(0, time - System.currentTimeMillis()); - long delayJittered = jitter(delay, lockConfiguration.getJitterFactor()); - LOG.debug("Next renewal timeout event will be fired in {} seconds", delayJittered / 1000); - return delayJittered; - } - - - /** - * Signals the loss of leadership and move to the acquire-leadership state. - */ - private void onLeadershipLost() { - LOG.info("The local pod ({}) is no longer the leader of the group '{}' in ConfigMap '{}'", this.lockConfiguration.getPodName(), this.lockConfiguration.getGroupName(), - this.lockConfiguration.getConfigMapName()); - - this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader); - serializedExecutor.execute(this::acquireLeadershipCycle); - } - - /** - * While in the acquire-leadership state, the controller waits for the current lease to expire before trying to acquire the leadership. - */ - private void acquireLeadershipCycle() { - // wait for the current lease to finish then fire an election - refreshConfigMapFromCluster(false); // if possible, update - - // Notify about changes in current leader if any - this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader); - - if (!this.latestLeaderInfo.isTimeElapsedSeconds(lockConfiguration.getLeaseDurationSeconds())) { - // Wait for the lease to finish before trying leader election - long nextDelay = computeNextElectionWaitTime(this.latestLeaderInfo.getTimestamp()); - serializedExecutor.schedule(this::acquireLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS); - return; - } - - boolean acquired = tryAcquireOrRenewLeadership(); - if (acquired) { - LOG.debug("Leadership acquired for ConfigMap {}. Notification in progress...", this.lockConfiguration.getConfigMapName()); - serializedExecutor.execute(this::onLeadershipAcquired); - return; - } - - // Notify about changes in current leader if any - this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader); - - LOG.debug("Cannot acquire the leadership for ConfigMap {}", this.lockConfiguration.getConfigMapName()); - long nextDelay = computeNextElectionWaitTime(this.latestLeaderInfo.getTimestamp()); - serializedExecutor.schedule(this::acquireLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS); - } - - private long computeNextElectionWaitTime(Date lastRenewal) { - if (lastRenewal == null) { - LOG.debug("Error detected while getting leadership info, next election timeout event will be fired in {} seconds", this.lockConfiguration.getRetryOnErrorIntervalSeconds()); - return this.lockConfiguration.getRetryOnErrorIntervalSeconds(); - } - long time = lastRenewal.getTime() + this.lockConfiguration.getLeaseDurationSeconds() * 1000 - + jitter(this.lockConfiguration.getRetryPeriodSeconds() * 1000, this.lockConfiguration.getJitterFactor()); - - long delay = Math.max(0, time - System.currentTimeMillis()); - LOG.debug("Next election timeout event will be fired in {} seconds", delay / 1000); - return delay; - } - - private long jitter(long num, double factor) { - return (long) (num * (1 + Math.random() * (factor - 1))); - } - - private boolean tryAcquireOrRenewLeadership() { - LOG.debug("Trying to acquire or renew the leadership..."); - - ConfigMap configMap; - try { - configMap = pullConfigMap(); - } catch (Exception e) { - LOG.warn("Unable to retrieve the current ConfigMap " + this.lockConfiguration.getConfigMapName() + " from Kubernetes", e); - return false; - } - - // Info to set in the configmap to become leaders - LeaderInfo newLeaderInfo = new LeaderInfo(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName(), new Date()); - - if (configMap == null) { - // No configmap created so far - LOG.debug("Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created"); - ConfigMap newConfigMap = ConfigMapLockUtils.createNewConfigMap(this.lockConfiguration.getConfigMapName(), newLeaderInfo); - - try { - kubernetesClient.configMaps() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .create(newConfigMap); - } catch (Exception ex) { - // Suppress exception - LOG.warn("Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has the right " - + "permissions to create it"); - LOG.debug("Exception while trying to create the ConfigMap", ex); - - // Try to get the configMap and return the current leader - refreshConfigMapFromCluster(false); - return isCurrentPodTheActiveLeader(); - } - - LOG.debug("ConfigMap {} successfully created and local pod is leader", this.lockConfiguration.getConfigMapName()); - updateLatestLeaderInfo(newConfigMap); - scheduleCheckForPossibleLeadershipLoss(); - return true; - } else { - LOG.debug("Lock configmap already present in the Kubernetes namespace. Checking..."); - LeaderInfo leaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, this.lockConfiguration.getGroupName()); - - boolean weWereLeader = leaderInfo.isLeader(this.lockConfiguration.getPodName()); - boolean leaseExpired = leaderInfo.isTimeElapsedSeconds(this.lockConfiguration.getLeaseDurationSeconds()); - - if (weWereLeader || leaseExpired) { - // Renew the lease or set the new leader - try { - ConfigMap updatedConfigMap = ConfigMapLockUtils.getConfigMapWithNewLeader(configMap, newLeaderInfo); - kubernetesClient.configMaps() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .withName(this.lockConfiguration.getConfigMapName()) - .lockResourceVersion(configMap.getMetadata().getResourceVersion()) - .replace(updatedConfigMap); - - LOG.debug("ConfigMap {} successfully updated and local pod is leader", this.lockConfiguration.getConfigMapName()); - updateLatestLeaderInfo(updatedConfigMap); - scheduleCheckForPossibleLeadershipLoss(); - return true; - } catch (Exception ex) { - LOG.warn("An attempt to become leader has failed. It's possible that the leadership has been taken by another pod"); - LOG.debug("Error received during configmap lock replace", ex); - - // Try to get the configMap and return the current leader - refreshConfigMapFromCluster(false); - return isCurrentPodTheActiveLeader(); - } - } else { - // Another pod is the leader and lease is not expired - LOG.debug("Another pod is the current leader and lease has not expired yet"); - updateLatestLeaderInfo(configMap); - return false; - } - } - } - - - private void refreshConfigMapFromCluster(boolean retry) { - LOG.debug("Retrieving configmap {}", this.lockConfiguration.getConfigMapName()); - try { - updateLatestLeaderInfo(pullConfigMap()); - } catch (Exception ex) { - if (retry) { - LOG.warn("ConfigMap pull failed. Retrying in " + this.lockConfiguration.getRetryOnErrorIntervalSeconds() + " seconds...", ex); - try { - Thread.sleep(this.lockConfiguration.getRetryOnErrorIntervalSeconds() * 1000); - refreshConfigMapFromCluster(true); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Controller Thread interrupted, shutdown in progress", e); - } - } else { - LOG.warn("Cannot retrieve the ConfigMap: pull failed", ex); - } - } - } - - private boolean isCurrentPodTheActiveLeader() { - return latestLeaderInfo != null - && latestLeaderInfo.isLeader(this.lockConfiguration.getPodName()) - && !latestLeaderInfo.isTimeElapsedSeconds(this.lockConfiguration.getRenewDeadlineSeconds()); - } - - private ConfigMap pullConfigMap() { - return kubernetesClient.configMaps() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .withName(this.lockConfiguration.getConfigMapName()) - .get(); - } - - - private void updateLatestLeaderInfo(ConfigMap configMap) { - LOG.debug("Updating internal status about the current leader"); - this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, this.lockConfiguration.getGroupName()); - } - - private void scheduleCheckForPossibleLeadershipLoss() { - // Adding check for the case of main thread busy on http calls - if (this.latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())) { - this.eventDispatcherExecutor.schedule(this::checkAndNotifyNewLeader, this.lockConfiguration.getRenewDeadlineSeconds() * 1000 + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS); - } - } - - private void checkAndNotifyNewLeader() { - LOG.debug("Checking if the current leader has changed to notify the event handler..."); - LeaderInfo newLeaderInfo = this.latestLeaderInfo; - if (newLeaderInfo == null) { - return; - } - - long leaderInfoDurationSeconds = newLeaderInfo.isLeader(this.lockConfiguration.getPodName()) - ? this.lockConfiguration.getRenewDeadlineSeconds() - : this.lockConfiguration.getLeaseDurationSeconds(); - - Optional<String> newLeader; - if (newLeaderInfo.getLeader() != null && !newLeaderInfo.isTimeElapsedSeconds(leaderInfoDurationSeconds)) { - newLeader = Optional.of(newLeaderInfo.getLeader()); - } else { - newLeader = Optional.empty(); - } - - // Sending notifications in case of leader change - if (!newLeader.equals(this.currentLeader)) { - LOG.info("Current leader has changed from {} to {}. Sending notification...", this.currentLeader, newLeader); - this.currentLeader = newLeader; - eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> newLeader); - } else { - LOG.debug("Current leader unchanged: {}", this.currentLeader); - } - } - - -} http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java index 6461708..be0b424 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java @@ -28,14 +28,10 @@ public class KubernetesLockConfiguration implements Cloneable { public static final String DEFAULT_CONFIGMAP_NAME = "leaders"; - public static final double DEFAULT_JITTER_FACTOR = 1.2; - public static final long DEFAULT_LEASE_DURATION_SECONDS = 60; - public static final long DEFAULT_RENEW_DEADLINE_SECONDS = 45; - public static final long DEFAULT_RETRY_PERIOD_SECONDS = 9; - - public static final long DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS = 5; - public static final long DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS = 1800; + public static final long DEFAULT_LEASE_DURATION_MILLIS = 60000; + public static final long DEFAULT_RENEW_DEADLINE_MILLIS = 45000; + public static final long DEFAULT_RETRY_PERIOD_MILLIS = 9000; /** * Kubernetes namespace containing the pods and the ConfigMap used for locking. @@ -63,37 +59,25 @@ public class KubernetesLockConfiguration implements Cloneable { private Map<String, String> clusterLabels = new HashMap<>(); /** - * Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated. - * Watch recreation can be disabled by putting value <= 0. - */ - private long retryOnErrorIntervalSeconds = DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS; - - /** - * A jitter factor to apply in order to prevent all pods to try to become leaders in the same instant. + * A jitter factor to apply in order to prevent all pods to call Kubernetes APIs in the same instant. */ private double jitterFactor = DEFAULT_JITTER_FACTOR; /** * The default duration of the lease for the current leader. */ - private long leaseDurationSeconds = DEFAULT_LEASE_DURATION_SECONDS; - - /** - * The deadline after which the leader must stop trying to renew its leadership (and yield it). - */ - private long renewDeadlineSeconds = DEFAULT_RENEW_DEADLINE_SECONDS; + private long leaseDurationMillis = DEFAULT_LEASE_DURATION_MILLIS; /** - * The time between two subsequent attempts to acquire/renew the leadership (or after the lease expiration). - * It is randomized using the jitter factor in case of new leader election (not renewal). + * The deadline after which the leader must stop its services because it may have lost the leadership. */ - private long retryPeriodSeconds = DEFAULT_RETRY_PERIOD_SECONDS; + private long renewDeadlineMillis = DEFAULT_RENEW_DEADLINE_MILLIS; /** - * Set this to a positive value in order to recreate watchers after a certain amount of time - * (to prevent them becoming stale). + * The time between two subsequent attempts to check and acquire the leadership. + * It is randomized using the jitter factor. */ - private long watchRefreshIntervalSeconds = DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS; + private long retryPeriodMillis = DEFAULT_RETRY_PERIOD_MILLIS; public KubernetesLockConfiguration() { } @@ -149,14 +133,6 @@ public class KubernetesLockConfiguration implements Cloneable { this.clusterLabels = clusterLabels; } - public long getRetryOnErrorIntervalSeconds() { - return retryOnErrorIntervalSeconds; - } - - public void setRetryOnErrorIntervalSeconds(long retryOnErrorIntervalSeconds) { - this.retryOnErrorIntervalSeconds = retryOnErrorIntervalSeconds; - } - public double getJitterFactor() { return jitterFactor; } @@ -165,36 +141,28 @@ public class KubernetesLockConfiguration implements Cloneable { this.jitterFactor = jitterFactor; } - public long getLeaseDurationSeconds() { - return leaseDurationSeconds; - } - - public void setLeaseDurationSeconds(long leaseDurationSeconds) { - this.leaseDurationSeconds = leaseDurationSeconds; - } - - public long getRenewDeadlineSeconds() { - return renewDeadlineSeconds; + public long getLeaseDurationMillis() { + return leaseDurationMillis; } - public void setRenewDeadlineSeconds(long renewDeadlineSeconds) { - this.renewDeadlineSeconds = renewDeadlineSeconds; + public void setLeaseDurationMillis(long leaseDurationMillis) { + this.leaseDurationMillis = leaseDurationMillis; } - public long getRetryPeriodSeconds() { - return retryPeriodSeconds; + public long getRenewDeadlineMillis() { + return renewDeadlineMillis; } - public void setRetryPeriodSeconds(long retryPeriodSeconds) { - this.retryPeriodSeconds = retryPeriodSeconds; + public void setRenewDeadlineMillis(long renewDeadlineMillis) { + this.renewDeadlineMillis = renewDeadlineMillis; } - public long getWatchRefreshIntervalSeconds() { - return watchRefreshIntervalSeconds; + public long getRetryPeriodMillis() { + return retryPeriodMillis; } - public void setWatchRefreshIntervalSeconds(long watchRefreshIntervalSeconds) { - this.watchRefreshIntervalSeconds = watchRefreshIntervalSeconds; + public void setRetryPeriodMillis(long retryPeriodMillis) { + this.retryPeriodMillis = retryPeriodMillis; } public KubernetesLockConfiguration copy() { @@ -214,12 +182,10 @@ public class KubernetesLockConfiguration implements Cloneable { sb.append(", groupName='").append(groupName).append('\''); sb.append(", podName='").append(podName).append('\''); sb.append(", clusterLabels=").append(clusterLabels); - sb.append(", retryOnErrorIntervalSeconds=").append(retryOnErrorIntervalSeconds); sb.append(", jitterFactor=").append(jitterFactor); - sb.append(", leaseDurationSeconds=").append(leaseDurationSeconds); - sb.append(", renewDeadlineSeconds=").append(renewDeadlineSeconds); - sb.append(", retryPeriodSeconds=").append(retryPeriodSeconds); - sb.append(", watchRefreshIntervalSeconds=").append(watchRefreshIntervalSeconds); + sb.append(", leaseDurationMillis=").append(leaseDurationMillis); + sb.append(", renewDeadlineMillis=").append(renewDeadlineMillis); + sb.append(", retryPeriodMillis=").append(retryPeriodMillis); sb.append('}'); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java deleted file mode 100644 index 586a11f..0000000 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kubernetes.ha.lock; - -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watch; -import io.fabric8.kubernetes.client.Watcher; - -import org.apache.camel.Service; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Monitors the list of participants in a leader election and provides the most recently updated list. - * It calls the callback eventHandlers only when the member set changes w.r.t. the previous invocation. - */ -class KubernetesMembersMonitor implements Service { - - private static final Logger LOG = LoggerFactory.getLogger(KubernetesMembersMonitor.class); - - private ScheduledExecutorService serializedExecutor; - - private KubernetesClient kubernetesClient; - - private KubernetesLockConfiguration lockConfiguration; - - private List<KubernetesClusterEventHandler> eventHandlers; - - private Watch watch; - - private boolean terminated; - - private boolean refreshing; - - private Set<String> previousMembers = new HashSet<>(); - - private Set<String> basePoll = new HashSet<>(); - private Set<String> deleted = new HashSet<>(); - private Set<String> added = new HashSet<>(); - - public KubernetesMembersMonitor(ScheduledExecutorService serializedExecutor, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration) { - this.serializedExecutor = serializedExecutor; - this.kubernetesClient = kubernetesClient; - this.lockConfiguration = lockConfiguration; - this.eventHandlers = new LinkedList<>(); - } - - public void addClusterEventHandler(KubernetesClusterEventHandler eventHandler) { - this.eventHandlers.add(eventHandler); - } - - @Override - public void start() throws Exception { - serializedExecutor.execute(() -> doPoll(true)); - serializedExecutor.execute(this::createWatch); - - long recreationDelay = lockConfiguration.getWatchRefreshIntervalSeconds(); - if (recreationDelay > 0) { - serializedExecutor.scheduleWithFixedDelay(this::refresh, recreationDelay, recreationDelay, TimeUnit.SECONDS); - } - } - - private void createWatch() { - try { - LOG.debug("Starting cluster members watcher"); - this.watch = kubernetesClient.pods() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .withLabels(this.lockConfiguration.getClusterLabels()) - .watch(new Watcher<Pod>() { - - @Override - public void eventReceived(Action action, Pod pod) { - switch (action) { - case DELETED: - serializedExecutor.execute(() -> deleteAndNotify(podName(pod))); - break; - case ADDED: - serializedExecutor.execute(() -> addAndNotify(podName(pod))); - break; - default: - } - } - - @Override - public void onClose(KubernetesClientException e) { - if (!terminated) { - KubernetesMembersMonitor.this.watch = null; - if (refreshing) { - LOG.info("Refreshing pod list watcher..."); - serializedExecutor.execute(KubernetesMembersMonitor.this::createWatch); - } else { - LOG.warn("Pod list watcher has been closed unexpectedly. Recreating it in 1 second...", e); - serializedExecutor.schedule(KubernetesMembersMonitor.this::createWatch, 1, TimeUnit.SECONDS); - } - } - } - }); - } catch (Exception ex) { - LOG.warn("Unable to watch for pod list changes. Retrying in 5 seconds..."); - LOG.debug("Error while trying to watch the pod list", ex); - - serializedExecutor.schedule(this::createWatch, 5, TimeUnit.SECONDS); - } - } - - @Override - public void stop() throws Exception { - this.terminated = true; - Watch watch = this.watch; - if (watch != null) { - watch.close(); - } - } - - public void refresh() { - serializedExecutor.execute(() -> { - if (!terminated) { - refreshing = true; - try { - doPoll(false); - - Watch w = this.watch; - if (w != null) { - // It will be recreated - w.close(); - } - } finally { - refreshing = false; - } - } - }); - } - - private void doPoll(boolean retry) { - LOG.debug("Starting poll to get all cluster members"); - List<Pod> pods; - try { - pods = pollPods(); - } catch (Exception ex) { - if (retry) { - LOG.warn("Pods poll failed. Retrying in 5 seconds...", ex); - this.serializedExecutor.schedule(() -> doPoll(true), 5, TimeUnit.SECONDS); - } else { - LOG.warn("Pods poll failed", ex); - } - return; - } - - this.basePoll = pods.stream() - .map(p -> Optional.ofNullable(podName(p))) - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toSet()); - - this.added = new HashSet<>(); - this.deleted = new HashSet<>(); - - LOG.debug("Base list of members is {}", this.basePoll); - - checkAndNotify(); - } - - private List<Pod> pollPods() { - return kubernetesClient.pods() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .withLabels(this.lockConfiguration.getClusterLabels()) - .list().getItems(); - } - - private String podName(Pod pod) { - if (pod != null && pod.getMetadata() != null) { - return pod.getMetadata().getName(); - } - return null; - } - - private void checkAndNotify() { - Set<String> newMembers = new HashSet<>(basePoll); - newMembers.addAll(added); - newMembers.removeAll(deleted); - - LOG.debug("Current list of members is: {}", newMembers); - - if (!newMembers.equals(this.previousMembers)) { - LOG.debug("List of members changed: sending notifications"); - this.previousMembers = newMembers; - - for (KubernetesClusterEventHandler eventHandler : eventHandlers) { - eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) () -> newMembers); - } - } else { - LOG.debug("List of members has not changed"); - } - } - - private void addAndNotify(String member) { - LOG.debug("Adding new member to the list: {}", member); - if (member != null) { - this.added.add(member); - checkAndNotify(); - } - } - - private void deleteAndNotify(String member) { - LOG.debug("Deleting member to the list: {}", member); - if (member != null) { - this.deleted.add(member); - checkAndNotify(); - } - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java index 50d1603..d061945 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java @@ -17,6 +17,7 @@ package org.apache.camel.component.kubernetes.ha.lock; import java.util.Date; +import java.util.Set; import org.apache.camel.util.ObjectHelper; @@ -29,28 +30,31 @@ public class LeaderInfo { private String leader; - private Date timestamp; + private Date localTimestamp; + + private Set<String> members; public LeaderInfo() { } - public LeaderInfo(String groupName, String leader, Date timestamp) { + public LeaderInfo(String groupName, String leader, Date timestamp, Set<String> members) { this.groupName = groupName; this.leader = leader; - this.timestamp = timestamp; + this.localTimestamp = timestamp; + this.members = members; + } + + public boolean hasEmptyLeader() { + return this.leader == null; } - public boolean isTimeElapsedSeconds(long timeSeconds) { - if (timestamp == null) { - return true; - } - long now = System.currentTimeMillis(); - return timestamp.getTime() + timeSeconds * 1000 <= now; + public boolean hasValidLeader() { + return this.leader != null && this.members.contains(this.leader); } - public boolean isLeader(String pod) { + public boolean isValidLeader(String pod) { ObjectHelper.notNull(pod, "pod"); - return pod.equals(leader); + return hasValidLeader() && pod.equals(leader); } public String getGroupName() { @@ -69,12 +73,20 @@ public class LeaderInfo { this.leader = leader; } - public Date getTimestamp() { - return timestamp; + public Date getLocalTimestamp() { + return localTimestamp; + } + + public void setLocalTimestamp(Date localTimestamp) { + this.localTimestamp = localTimestamp; + } + + public Set<String> getMembers() { + return members; } - public void setTimestamp(Date timestamp) { - this.timestamp = timestamp; + public void setMembers(Set<String> members) { + this.members = members; } @Override @@ -82,9 +94,9 @@ public class LeaderInfo { final StringBuilder sb = new StringBuilder("LeaderInfo{"); sb.append("groupName='").append(groupName).append('\''); sb.append(", leader='").append(leader).append('\''); - sb.append(", timestamp=").append(timestamp); + sb.append(", localTimestamp=").append(localTimestamp); + sb.append(", members=").append(members); sb.append('}'); return sb.toString(); } - } http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java new file mode 100644 index 0000000..6ada830 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.lock; + +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.camel.Service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Receives information about the current leader and handles expiration automatically. + */ +public class TimedLeaderNotifier implements Service { + + private static final Logger LOG = LoggerFactory.getLogger(TimedLeaderNotifier.class); + + private static final long FIXED_DELAY = 10; + + private KubernetesClusterEventHandler handler; + + private Lock lock = new ReentrantLock(); + + private ScheduledExecutorService executor; + + private Optional<String> lastCommunicatedLeader = Optional.empty(); + private Set<String> lastCommunicatedMembers = Collections.emptySet(); + + private Optional<String> currentLeader = Optional.empty(); + + private Set<String> currentMembers; + + private Long timestamp; + + private Long lease; + + private long changeCounter; + + public TimedLeaderNotifier(KubernetesClusterEventHandler handler) { + this.handler = Objects.requireNonNull(handler, "Handler must be present"); + } + + public void refreshLeadership(Optional<String> leader, Long timestamp, Long lease, Set<String> members) { + Objects.requireNonNull(leader, "leader must be non null (use Optional.empty)"); + Objects.requireNonNull(members, "members must be non null (use empty set)"); + long version; + try { + lock.lock(); + + this.currentLeader = leader; + this.currentMembers = members; + this.timestamp = timestamp; + this.lease = lease; + version = ++changeCounter; + } finally { + lock.unlock(); + } + + LOG.debug("Updated leader to {} at version version {}", leader, version); + this.executor.execute(() -> checkAndNotify(version)); + if (leader.isPresent()) { + long time = System.currentTimeMillis(); + long delay = Math.max(timestamp + lease + FIXED_DELAY - time, FIXED_DELAY); + LOG.debug("Setting expiration in {} millis for version {}", delay, version); + this.executor.schedule(() -> expiration(version), delay, TimeUnit.MILLISECONDS); + } + } + + @Override + public void start() throws Exception { + if (this.executor == null) { + this.executor = Executors.newSingleThreadScheduledExecutor(); + } + } + + @Override + public void stop() throws Exception { + if (this.executor != null) { + ScheduledExecutorService executor = this.executor; + this.executor = null; + + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.SECONDS); + } + } + + private void expiration(long version) { + try { + lock.lock(); + + if (version != this.changeCounter) { + return; + } + + long time = System.currentTimeMillis(); + if (time < this.timestamp + this.lease) { + long delay = this.timestamp + this.lease - time; + LOG.debug("Delaying expiration by {} millis at version version {}", delay + FIXED_DELAY, version); + this.executor.schedule(() -> expiration(version), delay + FIXED_DELAY, TimeUnit.MILLISECONDS); + return; + } + } finally { + lock.unlock(); + } + + checkAndNotify(version); + } + + private void checkAndNotify(long version) { + Optional<String> leader; + Set<String> members; + try { + lock.lock(); + + if (version != this.changeCounter) { + return; + } + + leader = this.currentLeader; + members = this.currentMembers; + + if (leader.isPresent()) { + long time = System.currentTimeMillis(); + if (time >= this.timestamp + this.lease) { + leader = Optional.empty(); + } + } + + } finally { + lock.unlock(); + } + + final Optional<String> newLeader = leader; + if (!newLeader.equals(lastCommunicatedLeader)) { + lastCommunicatedLeader = newLeader; + handler.onKubernetesClusterEvent(new KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent() { + @Override + public Optional<String> getData() { + return newLeader; + } + }); + } + + final Set<String> newMembers = members; + if (!newMembers.equals(lastCommunicatedMembers)) { + lastCommunicatedMembers = newMembers; + handler.onKubernetesClusterEvent(new KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent() { + @Override + public Set<String> getData() { + return newMembers; + } + }); + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java index 3bdffbd..4a2a11e 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java @@ -42,7 +42,10 @@ import org.junit.Test; */ public class KubernetesClusterServiceTest extends CamelTestSupport { - private static final int LEASE_TIME_SECONDS = 5; + private static final int LEASE_TIME_MILLIS = 2000; + private static final int RENEW_DEADLINE_MILLIS = 1000; + private static final int RETRY_PERIOD_MILLIS = 200; + private static final double JITTER_FACTOR = 1.1; private ConfigMapLockSimulator lockSimulator; @@ -75,6 +78,7 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { mypod2.waitForAnyLeader(2, TimeUnit.SECONDS); String leader = mypod1.getCurrentLeader(); + assertNotNull(leader); assertTrue(leader.startsWith("mypod")); assertEquals("Leaders should be equals", mypod2.getCurrentLeader(), leader); } @@ -129,6 +133,7 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { LeaderRecorder formerLoserRecorder = firstLeader.equals("mypod1") ? mypod2 : mypod1; refuseRequestsFromPod(firstLeader); + disconnectPod(firstLeader); formerLeaderRecorder.waitForALeaderChange(7, TimeUnit.SECONDS); formerLoserRecorder.waitForANewLeader(firstLeader, 7, TimeUnit.SECONDS); @@ -139,12 +144,12 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { Long lossTimestamp = formerLeaderRecorder.getLastTimeOf(l -> l == null); Long gainTimestamp = formerLoserRecorder.getLastTimeOf(secondLeader::equals); - assertTrue("At least 2 seconds must elapse from leadership loss and regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + 2000); - checkLeadershipChangeDistance(LEASE_TIME_SECONDS, TimeUnit.SECONDS, mypod1, mypod2); + assertTrue("At least half distance must elapse from leadership loss and regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + (LEASE_TIME_MILLIS - RENEW_DEADLINE_MILLIS) / 2); + checkLeadershipChangeDistance((LEASE_TIME_MILLIS - RENEW_DEADLINE_MILLIS) / 2, TimeUnit.MILLISECONDS, mypod1, mypod2); } @Test - public void testSlowLeaderLosingLeadership() throws Exception { + public void testSlowLeaderLosingLeadershipOnlyInternally() throws Exception { LeaderRecorder mypod1 = addMember("mypod1"); LeaderRecorder mypod2 = addMember("mypod2"); context.start(); @@ -159,17 +164,9 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { delayRequestsFromPod(firstLeader, 10, TimeUnit.SECONDS); - formerLeaderRecorder.waitForALeaderChange(7, TimeUnit.SECONDS); - formerLoserRecorder.waitForANewLeader(firstLeader, 7, TimeUnit.SECONDS); - - String secondLeader = formerLoserRecorder.getCurrentLeader(); - assertNotEquals("The firstLeader should be different from the new one", firstLeader, secondLeader); - - Long lossTimestamp = formerLeaderRecorder.getLastTimeOf(l -> l == null); - Long gainTimestamp = formerLoserRecorder.getLastTimeOf(secondLeader::equals); - - assertTrue("At least 2 seconds must elapse from leadership loss and regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + 2000); - checkLeadershipChangeDistance(LEASE_TIME_SECONDS, TimeUnit.SECONDS, mypod1, mypod2); + Thread.sleep(LEASE_TIME_MILLIS); + assertNull(formerLeaderRecorder.getCurrentLeader()); + assertEquals(firstLeader, formerLoserRecorder.getCurrentLeader()); } @Test @@ -185,9 +182,9 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { for (int i = 0; i < 3; i++) { refuseRequestsFromPod(firstLeader); - Thread.sleep(1000); + Thread.sleep(RENEW_DEADLINE_MILLIS); allowRequestsFromPod(firstLeader); - Thread.sleep(2000); + Thread.sleep(LEASE_TIME_MILLIS); } assertEquals(firstLeader, mypod1.getCurrentLeader()); @@ -229,6 +226,18 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { this.lockServers.get(pod).setRefuseRequests(false); } + private void disconnectPod(String pod) { + for (LockTestServer server : this.lockServers.values()) { + server.removePod(pod); + } + } + + private void connectPod(String pod) { + for (LockTestServer server : this.lockServers.values()) { + server.addPod(pod); + } + } + private void checkLeadershipChangeDistance(long minimum, TimeUnit unit, LeaderRecorder... recorders) { List<LeaderRecorder.LeadershipInfo> infos = Arrays.stream(recorders) .flatMap(lr -> lr.getLeadershipInfo().stream()) @@ -245,7 +254,8 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { } else if (info.getLeader() != null && !info.getLeader().equals(currentLeaderLastSeen.getLeader())) { // switch long delay = info.getChangeTimestamp() - currentLeaderLastSeen.getChangeTimestamp(); - assertTrue("Lease time not elapsed between switch", delay >= TimeUnit.MILLISECONDS.convert(minimum, unit)); + assertTrue("Lease time not elapsed between switch, minimum=" + TimeUnit.MILLISECONDS.convert(minimum, unit) + ", found=" + delay, delay >= TimeUnit.MILLISECONDS.convert(minimum, + unit)); currentLeaderLastSeen = info; } } @@ -268,11 +278,10 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { KubernetesClusterService member = new KubernetesClusterService(configuration); member.setKubernetesNamespace("test"); member.setPodName(name); - member.setLeaseDurationSeconds(LEASE_TIME_SECONDS); - member.setRenewDeadlineSeconds(3); // 5-3 = at least 2 seconds for switching on leadership loss - member.setRetryPeriodSeconds(1); - member.setRetryOnErrorIntervalSeconds(1); - member.setJitterFactor(1.2); + member.setLeaseDurationMillis(LEASE_TIME_MILLIS); + member.setRenewDeadlineMillis(RENEW_DEADLINE_MILLIS); + member.setRetryPeriodMillis(RETRY_PERIOD_MILLIS); + member.setJitterFactor(JITTER_FACTOR); LeaderRecorder recorder = new LeaderRecorder(); try { @@ -281,6 +290,10 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { } catch (Exception ex) { throw new RuntimeException(ex); } + + for (String pod : this.lockServers.keySet()) { + connectPod(pod); + } return recorder; }