CAMEL-11331: Clock-drift-free version of the protocol

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f0b00ab9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f0b00ab9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f0b00ab9

Branch: refs/heads/master
Commit: f0b00ab927e9e55c238c8ea77e1b08c65335ca01
Parents: a0b2320
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Authored: Mon Jul 31 17:17:34 2017 +0200
Committer: Nicola Ferraro <ni.ferr...@gmail.com>
Committed: Tue Aug 8 16:39:43 2017 +0200

----------------------------------------------------------------------
 .../kubernetes/ha/KubernetesClusterService.java |  84 ++--
 .../kubernetes/ha/KubernetesClusterView.java    |   6 +-
 .../kubernetes/ha/lock/ConfigMapLockUtils.java  |  15 +-
 .../ha/lock/KubernetesLeadershipController.java | 352 +++++++++++++++++
 ...ubernetesLeaseBasedLeadershipController.java | 384 -------------------
 .../ha/lock/KubernetesLockConfiguration.java    |  84 ++--
 .../ha/lock/KubernetesMembersMonitor.java       | 237 ------------
 .../kubernetes/ha/lock/LeaderInfo.java          |  46 ++-
 .../kubernetes/ha/lock/TimedLeaderNotifier.java | 179 +++++++++
 .../ha/KubernetesClusterServiceTest.java        |  59 +--
 .../kubernetes/ha/TimedLeaderNotifierTest.java  | 117 ++++++
 .../kubernetes/ha/utils/LeaderRecorder.java     |   5 +
 .../kubernetes/ha/utils/LockTestServer.java     |  31 +-
 13 files changed, 813 insertions(+), 786 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
index a868d16..08ebb70 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
@@ -82,25 +82,22 @@ public class KubernetesClusterService extends 
AbstractCamelClusterService<Kubern
         if (config.getJitterFactor() < 1) {
             throw new IllegalStateException("jitterFactor must be >= 1 (found: 
" + config.getJitterFactor() + ")");
         }
-        if (config.getRetryOnErrorIntervalSeconds() <= 0) {
-            throw new IllegalStateException("retryOnErrorIntervalSeconds must 
be > 0 (found: " + config.getRetryOnErrorIntervalSeconds() + ")");
+        if (config.getRetryPeriodMillis() <= 0) {
+            throw new IllegalStateException("retryPeriodMillis must be > 0 
(found: " + config.getRetryPeriodMillis() + ")");
         }
-        if (config.getRetryPeriodSeconds() <= 0) {
-            throw new IllegalStateException("retryPeriodSeconds must be > 0 
(found: " + config.getRetryPeriodSeconds() + ")");
+        if (config.getRenewDeadlineMillis() <= 0) {
+            throw new IllegalStateException("renewDeadlineMillis must be > 0 
(found: " + config.getRenewDeadlineMillis() + ")");
         }
-        if (config.getRenewDeadlineSeconds() <= 0) {
-            throw new IllegalStateException("renewDeadlineSeconds must be > 0 
(found: " + config.getRenewDeadlineSeconds() + ")");
+        if (config.getLeaseDurationMillis() <= 0) {
+            throw new IllegalStateException("leaseDurationMillis must be > 0 
(found: " + config.getLeaseDurationMillis() + ")");
         }
-        if (config.getLeaseDurationSeconds() <= 0) {
-            throw new IllegalStateException("leaseDurationSeconds must be > 0 
(found: " + config.getLeaseDurationSeconds() + ")");
+        if (config.getLeaseDurationMillis() <= 
config.getRenewDeadlineMillis()) {
+            throw new IllegalStateException("leaseDurationMillis must be 
greater than renewDeadlineMillis "
+                    + "(" + config.getLeaseDurationMillis() + " is not greater 
than " + config.getRenewDeadlineMillis() + ")");
         }
-        if (config.getLeaseDurationSeconds() <= 
config.getRenewDeadlineSeconds()) {
-            throw new IllegalStateException("leaseDurationSeconds must be 
greater than renewDeadlineSeconds "
-                    + "(" + config.getLeaseDurationSeconds() + " is not 
greater than " + config.getRenewDeadlineSeconds() + ")");
-        }
-        if (config.getRenewDeadlineSeconds() <= config.getJitterFactor() * 
config.getRetryPeriodSeconds()) {
-            throw new IllegalStateException("renewDeadlineSeconds must be 
greater than jitterFactor*retryPeriodSeconds "
-                    + "(" + config.getRenewDeadlineSeconds() + " is not 
greater than " + config.getJitterFactor() + "*" + 
config.getRetryPeriodSeconds() + ")");
+        if (config.getRenewDeadlineMillis() <= config.getJitterFactor() * 
config.getRetryPeriodMillis()) {
+            throw new IllegalStateException("renewDeadlineMillis must be 
greater than jitterFactor*retryPeriodMillis "
+                    + "(" + config.getRenewDeadlineMillis() + " is not greater 
than " + config.getJitterFactor() + "*" + config.getRetryPeriodMillis() + ")");
         }
 
         return config;
@@ -176,16 +173,8 @@ public class KubernetesClusterService extends 
AbstractCamelClusterService<Kubern
         
lockConfiguration.setKubernetesResourcesNamespace(kubernetesResourcesNamespace);
     }
 
-    public long getRetryOnErrorIntervalSeconds() {
-        return lockConfiguration.getRetryOnErrorIntervalSeconds();
-    }
-
-    /**
-     * Indicates the maximum amount of time a Kubernetes watch should be kept 
active, before being recreated.
-     * Watch recreation can be disabled by putting value <= 0.
-     */
-    public void setRetryOnErrorIntervalSeconds(long 
retryOnErrorIntervalSeconds) {
-        
lockConfiguration.setRetryOnErrorIntervalSeconds(retryOnErrorIntervalSeconds);
+    public String getKubernetesResourcesNamespaceOrDefault(KubernetesClient 
kubernetesClient) {
+        return 
lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient);
     }
 
     public double getJitterFactor() {
@@ -193,56 +182,43 @@ public class KubernetesClusterService extends 
AbstractCamelClusterService<Kubern
     }
 
     /**
-     * A jitter factor to apply in order to prevent all pods to try to become 
leaders in the same instant.
+     * A jitter factor to apply in order to prevent all pods to call 
Kubernetes APIs in the same instant.
      */
     public void setJitterFactor(double jitterFactor) {
         lockConfiguration.setJitterFactor(jitterFactor);
     }
 
-    public long getLeaseDurationSeconds() {
-        return lockConfiguration.getLeaseDurationSeconds();
+    public long getLeaseDurationMillis() {
+        return lockConfiguration.getLeaseDurationMillis();
     }
 
     /**
      * The default duration of the lease for the current leader.
      */
-    public void setLeaseDurationSeconds(long leaseDurationSeconds) {
-        lockConfiguration.setLeaseDurationSeconds(leaseDurationSeconds);
-    }
-
-    public long getRenewDeadlineSeconds() {
-        return lockConfiguration.getRenewDeadlineSeconds();
-    }
-
-    /**
-     * The deadline after which the leader must stop trying to renew its 
leadership (and yield it).
-     */
-    public void setRenewDeadlineSeconds(long renewDeadlineSeconds) {
-        lockConfiguration.setRenewDeadlineSeconds(renewDeadlineSeconds);
+    public void setLeaseDurationMillis(long leaseDurationMillis) {
+        lockConfiguration.setLeaseDurationMillis(leaseDurationMillis);
     }
 
-    public long getRetryPeriodSeconds() {
-        return lockConfiguration.getRetryPeriodSeconds();
+    public long getRenewDeadlineMillis() {
+        return lockConfiguration.getRenewDeadlineMillis();
     }
 
     /**
-     * The time between two subsequent attempts to acquire/renew the 
leadership (or after the lease expiration).
-     * It is randomized using the jitter factor in case of new leader election 
(not renewal).
+     * The deadline after which the leader must stop its services because it 
may have lost the leadership.
      */
-    public void setRetryPeriodSeconds(long retryPeriodSeconds) {
-        lockConfiguration.setRetryPeriodSeconds(retryPeriodSeconds);
+    public void setRenewDeadlineMillis(long renewDeadlineMillis) {
+        lockConfiguration.setRenewDeadlineMillis(renewDeadlineMillis);
     }
 
-    public long getWatchRefreshIntervalSeconds() {
-        return lockConfiguration.getWatchRefreshIntervalSeconds();
+    public long getRetryPeriodMillis() {
+        return lockConfiguration.getRetryPeriodMillis();
     }
 
     /**
-     * Set this to a positive value in order to recreate watchers after a 
certain amount of time,
-     * to avoid having stale watchers.
+     * The time between two subsequent attempts to check and acquire the 
leadership.
+     * It is randomized using the jitter factor.
      */
-    public void setWatchRefreshIntervalSeconds(long 
watchRefreshIntervalSeconds) {
-        
lockConfiguration.setWatchRefreshIntervalSeconds(watchRefreshIntervalSeconds);
+    public void setRetryPeriodMillis(long retryPeriodMillis) {
+        lockConfiguration.setRetryPeriodMillis(retryPeriodMillis);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
index 28f38a5..ddda675 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
@@ -30,7 +30,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.camel.component.kubernetes.KubernetesConfiguration;
 import org.apache.camel.component.kubernetes.KubernetesHelper;
 import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent;
-import 
org.apache.camel.component.kubernetes.ha.lock.KubernetesLeaseBasedLeadershipController;
+import 
org.apache.camel.component.kubernetes.ha.lock.KubernetesLeadershipController;
 import 
org.apache.camel.component.kubernetes.ha.lock.KubernetesLockConfiguration;
 import org.apache.camel.ha.CamelClusterMember;
 import org.apache.camel.impl.ha.AbstractCamelClusterView;
@@ -56,7 +56,7 @@ public class KubernetesClusterView extends 
AbstractCamelClusterView {
 
     private volatile List<CamelClusterMember> currentMembers = 
Collections.emptyList();
 
-    private KubernetesLeaseBasedLeadershipController controller;
+    private KubernetesLeadershipController controller;
 
     public KubernetesClusterView(KubernetesClusterService cluster, 
KubernetesConfiguration configuration, KubernetesLockConfiguration 
lockConfiguration) {
         super(cluster, lockConfiguration.getGroupName());
@@ -86,7 +86,7 @@ public class KubernetesClusterView extends 
AbstractCamelClusterView {
         if (controller == null) {
             this.kubernetesClient = 
KubernetesHelper.getKubernetesClient(configuration);
 
-            controller = new 
KubernetesLeaseBasedLeadershipController(kubernetesClient, 
this.lockConfiguration, event -> {
+            controller = new KubernetesLeadershipController(kubernetesClient, 
this.lockConfiguration, event -> {
                 if (event instanceof 
KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) {
                     // New leader
                     Optional<String> leader = 
KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData();

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
index 70fa860..feea1c6 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.kubernetes.ha.lock;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.Set;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
@@ -36,7 +37,7 @@ public final class ConfigMapLockUtils {
 
     private static final String LEADER_PREFIX = "leader.pod.";
 
-    private static final String TIMESTAMP_PREFIX = "leader.timestamp.";
+    private static final String LOCAL_TIMESTAMP_PREFIX = 
"leader.local.timestamp.";
 
     private ConfigMapLockUtils() {
     }
@@ -49,19 +50,19 @@ public final class ConfigMapLockUtils {
                     .addToLabels("kind", "locks").
                 endMetadata()
                 .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), 
leaderInfo.getLeader())
-                .addToData(TIMESTAMP_PREFIX + leaderInfo.getGroupName(), 
formatDate(leaderInfo.getTimestamp()))
+                .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(), 
formatDate(leaderInfo.getLocalTimestamp()))
                 .build();
     }
 
     public static ConfigMap getConfigMapWithNewLeader(ConfigMap configMap, 
LeaderInfo leaderInfo) {
         return new ConfigMapBuilder(configMap)
                 .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), 
leaderInfo.getLeader())
-                .addToData(TIMESTAMP_PREFIX + leaderInfo.getGroupName(), 
formatDate(leaderInfo.getTimestamp()))
+                .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(), 
formatDate(leaderInfo.getLocalTimestamp()))
                 .build();
     }
 
-    public static LeaderInfo getLeaderInfo(ConfigMap configMap, String group) {
-        return new LeaderInfo(group, getLeader(configMap, group), 
getTimestamp(configMap, group));
+    public static LeaderInfo getLeaderInfo(ConfigMap configMap, Set<String> 
members, String group) {
+        return new LeaderInfo(group, getLeader(configMap, group), 
getLocalTimestamp(configMap, group), members);
     }
 
     private static String getLeader(ConfigMap configMap, String group) {
@@ -81,8 +82,8 @@ public final class ConfigMapLockUtils {
         return null;
     }
 
-    private static Date getTimestamp(ConfigMap configMap, String group) {
-        String timestamp = getConfigMapValue(configMap, TIMESTAMP_PREFIX + 
group);
+    private static Date getLocalTimestamp(ConfigMap configMap, String group) {
+        String timestamp = getConfigMapValue(configMap, LOCAL_TIMESTAMP_PREFIX 
+ group);
         if (timestamp == null) {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
new file mode 100644
index 0000000..f527779
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import org.apache.camel.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitors current status and participate to leader election when no active 
leaders are present.
+ * It communicates changes in leadership and cluster members to the given 
event handler.
+ */
+public class KubernetesLeadershipController implements Service {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesLeadershipController.class);
+
+    private enum State {
+        NOT_LEADER,
+        BECOMING_LEADER,
+        LEADER
+    }
+
+    private KubernetesClient kubernetesClient;
+
+    private KubernetesLockConfiguration lockConfiguration;
+
+    private KubernetesClusterEventHandler eventHandler;
+
+    private State currentState = State.NOT_LEADER;
+
+    private ScheduledExecutorService serializedExecutor;
+
+    private TimedLeaderNotifier leaderNotifier;
+
+    private volatile LeaderInfo latestLeaderInfo;
+    private volatile ConfigMap latestConfigMap;
+    private volatile Set<String> latestMembers;
+
+    public KubernetesLeadershipController(KubernetesClient kubernetesClient, 
KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler 
eventHandler) {
+        this.kubernetesClient = kubernetesClient;
+        this.lockConfiguration = lockConfiguration;
+        this.eventHandler = eventHandler;
+    }
+
+    @Override
+    public void start() throws Exception {
+        if (serializedExecutor == null) {
+            LOG.debug("{} Starting leadership controller...", logPrefix());
+            serializedExecutor = Executors.newSingleThreadScheduledExecutor();
+            leaderNotifier = new TimedLeaderNotifier(this.eventHandler);
+
+            leaderNotifier.start();
+            serializedExecutor.execute(this::refreshStatus);
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        LOG.debug("{} Stopping leadership controller...", logPrefix());
+        if (serializedExecutor != null) {
+            serializedExecutor.shutdownNow();
+        }
+        serializedExecutor = null;
+
+        if (leaderNotifier != null) {
+            leaderNotifier.stop();
+        }
+        leaderNotifier = null;
+    }
+
+    private void refreshStatus() {
+        switch (currentState) {
+        case NOT_LEADER:
+            refreshStatusNotLeader();
+            break;
+        case BECOMING_LEADER:
+            refreshStatusBecomingLeader();
+            break;
+        case LEADER:
+            refreshStatusLeader();
+            break;
+        default:
+            throw new RuntimeException("Unsupported state " + currentState);
+        }
+    }
+
+    /**
+     * This pod is currently not leader. It should monitor the leader 
configuration and try
+     * to acquire the leadership if possible.
+     */
+    private void refreshStatusNotLeader() {
+        LOG.debug("{} Pod is not leader, pulling new data from the cluster", 
logPrefix());
+        boolean pulled = lookupNewLeaderInfo();
+        if (!pulled) {
+            rescheduleAfterDelay();
+            return;
+        }
+
+        if (this.latestLeaderInfo.hasEmptyLeader()) {
+            // There is no previous leader
+            LOG.info("{} The cluster has no leaders. Trying to acquire the 
leadership...", logPrefix());
+            boolean acquired = tryAcquireLeadership();
+            if (acquired) {
+                LOG.info("{} Leadership acquired by current pod ({}) with 
immediate effect", logPrefix(), this.lockConfiguration.getPodName());
+                this.currentState = State.LEADER;
+                this.serializedExecutor.execute(this::refreshStatus);
+                return;
+            } else {
+                LOG.info("{} Unable to acquire the leadership, it may have 
been acquired by another pod", logPrefix());
+            }
+        } else if (!this.latestLeaderInfo.hasValidLeader()) {
+            // There's a previous leader and it's invalid
+            LOG.info("{} Leadership has been lost by old owner. Trying to 
acquire the leadership...", logPrefix());
+            boolean acquired = tryAcquireLeadership();
+            if (acquired) {
+                LOG.info("{} Leadership acquired by current pod ({})", 
logPrefix(), this.lockConfiguration.getPodName());
+                this.currentState = State.BECOMING_LEADER;
+                this.serializedExecutor.execute(this::refreshStatus);
+                return;
+            } else {
+                LOG.info("{} Unable to acquire the leadership, it may have 
been acquired by another pod", logPrefix());
+            }
+        } else if 
(this.latestLeaderInfo.isValidLeader(this.lockConfiguration.getPodName())) {
+            // We are leaders for some reason (e.g. pod restart on failure)
+            LOG.info("{} Leadership is already owned by current pod ({})", 
logPrefix(), this.lockConfiguration.getPodName());
+            this.currentState = State.BECOMING_LEADER;
+            this.serializedExecutor.execute(this::refreshStatus);
+            return;
+        }
+
+        
this.leaderNotifier.refreshLeadership(Optional.ofNullable(this.latestLeaderInfo.getLeader()),
+                System.currentTimeMillis(),
+                this.lockConfiguration.getLeaseDurationMillis(),
+                this.latestLeaderInfo.getMembers());
+        rescheduleAfterDelay();
+    }
+
+    /**
+     * This pod has acquired the leadership but it should wait for the old 
leader
+     * to tear down resources before starting the local services.
+     */
+    private void refreshStatusBecomingLeader() {
+        // Wait always the same amount of time before becoming the leader
+        // Even if the current pod is already leader, we should let a possible 
old version of the pod to shut down
+        long delay = this.lockConfiguration.getLeaseDurationMillis();
+        LOG.info("{} Current pod ({}) owns the leadership, but it will be 
effective in {} seconds...", logPrefix(), this.lockConfiguration.getPodName(), 
new BigDecimal(delay).divide(BigDecimal
+                .valueOf(1000), 2, BigDecimal.ROUND_HALF_UP));
+
+        try {
+            Thread.sleep(delay);
+        } catch (InterruptedException e) {
+            LOG.warn("Thread interrupted", e);
+        }
+
+        LOG.info("{} Current pod ({}) is becoming the new leader now...", 
logPrefix(), this.lockConfiguration.getPodName());
+        this.currentState = State.LEADER;
+        this.serializedExecutor.execute(this::refreshStatus);
+    }
+
+    private void refreshStatusLeader() {
+        LOG.debug("{} Pod should be the leader, pulling new data from the 
cluster", logPrefix());
+        long timeBeforePulling = System.currentTimeMillis();
+        boolean pulled = lookupNewLeaderInfo();
+        if (!pulled) {
+            rescheduleAfterDelay();
+            return;
+        }
+
+        if 
(this.latestLeaderInfo.isValidLeader(this.lockConfiguration.getPodName())) {
+            LOG.debug("{} Current Pod ({}) is still the leader", logPrefix(), 
this.lockConfiguration.getPodName());
+            
this.leaderNotifier.refreshLeadership(Optional.of(this.lockConfiguration.getPodName()),
+                    timeBeforePulling,
+                    this.lockConfiguration.getRenewDeadlineMillis(),
+                    this.latestLeaderInfo.getMembers());
+            rescheduleAfterDelay();
+            return;
+        } else {
+            LOG.debug("{} Current Pod ({}) has lost the leadership", 
logPrefix(), this.lockConfiguration.getPodName());
+            this.currentState = State.NOT_LEADER;
+            // set a empty leader to signal leadership loss
+            this.leaderNotifier.refreshLeadership(Optional.empty(),
+                    System.currentTimeMillis(),
+                    lockConfiguration.getLeaseDurationMillis(),
+                    this.latestLeaderInfo.getMembers());
+
+            // wait a lease time and restart
+            this.serializedExecutor.schedule(this::refreshStatus, 
this.lockConfiguration.getLeaseDurationMillis(), TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void rescheduleAfterDelay() {
+        this.serializedExecutor.schedule(this::refreshStatus, 
jitter(this.lockConfiguration.getRetryPeriodMillis(), 
this.lockConfiguration.getJitterFactor()), TimeUnit.MILLISECONDS);
+    }
+
+    private boolean lookupNewLeaderInfo() {
+        LOG.debug("{} Looking up leadership information...", logPrefix());
+
+        ConfigMap configMap;
+        try {
+            configMap = pullConfigMap();
+        } catch (Throwable e) {
+            LOG.warn(logPrefix() + " Unable to retrieve the current ConfigMap 
" + this.lockConfiguration.getConfigMapName() + " from Kubernetes");
+            LOG.debug(logPrefix() + " Exception thrown during ConfigMap 
lookup", e);
+            return false;
+        }
+
+        Set<String> members;
+        try {
+            members = Objects.requireNonNull(pullClusterMembers(), "Retrieved 
a null set of members");
+        } catch (Throwable e) {
+            LOG.warn(logPrefix() + " Unable to retrieve the list of cluster 
members from Kubernetes");
+            LOG.debug(logPrefix() + " Exception thrown during Pod list 
lookup", e);
+            return false;
+        }
+
+        updateLatestLeaderInfo(configMap, members);
+        return true;
+    }
+
+    private boolean tryAcquireLeadership() {
+        LOG.debug("{} Trying to acquire the leadership...", logPrefix());
+
+        ConfigMap configMap = this.latestConfigMap;
+        Set<String> members = this.latestMembers;
+        LeaderInfo latestLeaderInfo = this.latestLeaderInfo;
+
+        if (latestLeaderInfo == null || members == null) {
+            LOG.warn(logPrefix() + " Unexpected condition. Latest leader info 
or list of members is empty.");
+            return false;
+        } else if (!members.contains(this.lockConfiguration.getPodName())) {
+            LOG.warn(logPrefix() + " The list of cluster members " + 
latestLeaderInfo.getMembers() + " does not contain the current pod (" + 
this.lockConfiguration.getPodName() + "). Cannot acquire"
+                    + " leadership.");
+            return false;
+        }
+
+        // Info we would set set in the configmap to become leaders
+        LeaderInfo newLeaderInfo = new 
LeaderInfo(this.lockConfiguration.getGroupName(), 
this.lockConfiguration.getPodName(), new Date(), members);
+
+        if (configMap == null) {
+            // No ConfigMap created so far
+            LOG.debug("{} Lock configmap is not present in the Kubernetes 
namespace. A new ConfigMap will be created", logPrefix());
+            ConfigMap newConfigMap = 
ConfigMapLockUtils.createNewConfigMap(this.lockConfiguration.getConfigMapName(),
 newLeaderInfo);
+
+            try {
+                kubernetesClient.configMaps()
+                        
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                        .create(newConfigMap);
+
+                LOG.debug("{} ConfigMap {} successfully created", logPrefix(), 
this.lockConfiguration.getConfigMapName());
+                updateLatestLeaderInfo(newConfigMap, members);
+                return true;
+            } catch (Exception ex) {
+                // Suppress exception
+                LOG.warn(logPrefix() + " Unable to create the ConfigMap, it 
may have been created by other cluster members concurrently. If the problem 
persists, check if the service account has "
+                        + "the right "
+                        + "permissions to create it");
+                LOG.debug(logPrefix() + " Exception while trying to create the 
ConfigMap", ex);
+                return false;
+            }
+        } else {
+            LOG.debug("{} Lock configmap already present in the Kubernetes 
namespace. Checking...", logPrefix());
+            LeaderInfo leaderInfo = 
ConfigMapLockUtils.getLeaderInfo(configMap, members, 
this.lockConfiguration.getGroupName());
+
+            boolean canAcquire = !leaderInfo.hasValidLeader();
+            if (canAcquire) {
+                // Try to be the new leader
+                try {
+                    ConfigMap updatedConfigMap = 
ConfigMapLockUtils.getConfigMapWithNewLeader(configMap, newLeaderInfo);
+                    kubernetesClient.configMaps()
+                            
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                            
.withName(this.lockConfiguration.getConfigMapName())
+                            
.lockResourceVersion(configMap.getMetadata().getResourceVersion())
+                            .replace(updatedConfigMap);
+
+                    LOG.debug("{} ConfigMap {} successfully updated", 
logPrefix(), this.lockConfiguration.getConfigMapName());
+                    updateLatestLeaderInfo(updatedConfigMap, members);
+                    return true;
+                } catch (Exception ex) {
+                    LOG.warn(logPrefix() + " Unable to update the lock 
ConfigMap to set leadership information");
+                    LOG.debug(logPrefix() + " Error received during configmap 
lock replace", ex);
+                    return false;
+                }
+            } else {
+                // Another pod is the leader and it's still active
+                LOG.debug("{} Another pod ({}) is the current leader and it is 
still active", logPrefix(), this.latestLeaderInfo.getLeader());
+                return false;
+            }
+        }
+    }
+
+    private void updateLatestLeaderInfo(ConfigMap configMap, Set<String> 
members) {
+        LOG.debug("{} Updating internal status about the current leader", 
logPrefix());
+        this.latestConfigMap = configMap;
+        this.latestMembers = members;
+        this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, 
members, this.lockConfiguration.getGroupName());
+        LOG.debug("{} Current leader info: {}", logPrefix(), 
this.latestLeaderInfo);
+    }
+
+    private ConfigMap pullConfigMap() {
+        return kubernetesClient.configMaps()
+                
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                .withName(this.lockConfiguration.getConfigMapName())
+                .get();
+    }
+
+    private Set<String> pullClusterMembers() {
+        List<Pod> pods = kubernetesClient.pods()
+                
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                .withLabels(this.lockConfiguration.getClusterLabels())
+                .list().getItems();
+
+        return pods.stream().map(pod -> 
pod.getMetadata().getName()).collect(Collectors.toSet());
+    }
+
+    private long jitter(long num, double factor) {
+        return (long) (num * (1 + Math.random() * (factor - 1)));
+    }
+
+    private String logPrefix() {
+        return "Leadership Controller [" + this.lockConfiguration.getPodName() 
+ "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
deleted file mode 100644
index 76e91bf..0000000
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.ha.lock;
-
-import java.util.Date;
-import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.fabric8.kubernetes.client.KubernetesClient;
-
-import org.apache.camel.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Monitors current status and participate to leader election when no active 
leaders are present.
- * It communicates changes in leadership and cluster members to the given 
event handler.
- */
-public class KubernetesLeaseBasedLeadershipController implements Service {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesLeaseBasedLeadershipController.class);
-
-    private static final long FIXED_ADDITIONAL_DELAY = 100;
-
-    private KubernetesClient kubernetesClient;
-
-    private KubernetesLockConfiguration lockConfiguration;
-
-    private KubernetesClusterEventHandler eventHandler;
-
-    private ScheduledExecutorService serializedExecutor;
-    private ScheduledExecutorService eventDispatcherExecutor;
-
-    private KubernetesMembersMonitor membersMonitor;
-
-    private Optional<String> currentLeader = Optional.empty();
-
-    private volatile LeaderInfo latestLeaderInfo;
-
-    public KubernetesLeaseBasedLeadershipController(KubernetesClient 
kubernetesClient, KubernetesLockConfiguration lockConfiguration, 
KubernetesClusterEventHandler eventHandler) {
-        this.kubernetesClient = kubernetesClient;
-        this.lockConfiguration = lockConfiguration;
-        this.eventHandler = eventHandler;
-    }
-
-    @Override
-    public void start() throws Exception {
-        if (serializedExecutor == null) {
-            LOG.debug("Starting leadership controller...");
-            serializedExecutor = Executors.newSingleThreadScheduledExecutor();
-
-            eventDispatcherExecutor = 
Executors.newSingleThreadScheduledExecutor();
-
-            membersMonitor = new 
KubernetesMembersMonitor(this.serializedExecutor, this.kubernetesClient, 
this.lockConfiguration);
-            if (eventHandler != null) {
-                membersMonitor.addClusterEventHandler(eventHandler);
-            }
-
-            membersMonitor.start();
-            serializedExecutor.execute(this::initialization);
-        }
-    }
-
-    @Override
-    public void stop() throws Exception {
-        LOG.debug("Stopping leadership controller...");
-        if (serializedExecutor != null) {
-            serializedExecutor.shutdownNow();
-        }
-        if (eventDispatcherExecutor != null) {
-            eventDispatcherExecutor.shutdown();
-            eventDispatcherExecutor.awaitTermination(2, TimeUnit.SECONDS);
-            eventDispatcherExecutor.shutdownNow();
-        }
-        if (membersMonitor != null) {
-            membersMonitor.stop();
-        }
-
-        membersMonitor = null;
-        eventDispatcherExecutor = null;
-        serializedExecutor = null;
-    }
-
-    /**
-     * Get the first ConfigMap and setup the initial state.
-     */
-    private void initialization() {
-        LOG.debug("Reading (with retry) the configmap {} to detect the current 
leader", this.lockConfiguration.getConfigMapName());
-        refreshConfigMapFromCluster(true);
-
-        if (isCurrentPodTheActiveLeader()) {
-            serializedExecutor.execute(this::onLeadershipAcquired);
-        } else {
-            LOG.info("The current pod ({}) is not the leader of the group '{}' 
in ConfigMap '{}' at this time", this.lockConfiguration.getPodName(), 
this.lockConfiguration
-                    .getGroupName(), 
this.lockConfiguration.getConfigMapName());
-            serializedExecutor.execute(this::acquireLeadershipCycle);
-        }
-    }
-
-    /**
-     * Signals the acquisition of the leadership and move to the 
keep-leadership state.
-     */
-    private void onLeadershipAcquired() {
-        LOG.info("The current pod ({}) is now the leader of the group '{}' in 
ConfigMap '{}'", this.lockConfiguration.getPodName(), this.lockConfiguration
-                .getGroupName(), this.lockConfiguration.getConfigMapName());
-
-        this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
-
-        long nextDelay = 
computeNextRenewWaitTime(this.latestLeaderInfo.getTimestamp(), 
this.latestLeaderInfo.getTimestamp());
-        serializedExecutor.schedule(this::keepLeadershipCycle, nextDelay + 
FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * While in the keep-leadership state, the controller periodically renews 
the lease.
-     * If a renewal deadline is met and it was not possible to renew the 
lease, the leadership is lost.
-     */
-    private void keepLeadershipCycle() {
-        // renew lease periodically
-        refreshConfigMapFromCluster(false); // if possible, update
-
-        if 
(this.latestLeaderInfo.isTimeElapsedSeconds(lockConfiguration.getRenewDeadlineSeconds())
 || !this.latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())) {
-            // Time over for renewal or leadership lost
-            LOG.debug("The current pod ({}) has lost the leadership", 
this.lockConfiguration.getPodName());
-            serializedExecutor.execute(this::onLeadershipLost);
-            return;
-        }
-
-        boolean success = tryAcquireOrRenewLeadership();
-        LOG.debug("Attempted to renew the lease. Success={}", success);
-
-        long nextDelay = 
computeNextRenewWaitTime(this.latestLeaderInfo.getTimestamp(), new Date());
-        serializedExecutor.schedule(this::keepLeadershipCycle, nextDelay + 
FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Compute the timestamp of next event while in keep-leadership state.
-     */
-    private long computeNextRenewWaitTime(Date lastRenewal, Date 
lastRenewalAttempt) {
-        long timeDeadline = lastRenewal.getTime() + 
this.lockConfiguration.getRenewDeadlineSeconds() * 1000;
-        long timeRetry;
-        long counter = 0;
-        do {
-            counter++;
-            timeRetry = lastRenewal.getTime() + counter * 
this.lockConfiguration.getRetryPeriodSeconds() * 1000;
-        } while (timeRetry < lastRenewalAttempt.getTime() && timeRetry < 
timeDeadline);
-
-        long time = Math.min(timeRetry, timeDeadline);
-        long delay = Math.max(0, time - System.currentTimeMillis());
-        long delayJittered = jitter(delay, 
lockConfiguration.getJitterFactor());
-        LOG.debug("Next renewal timeout event will be fired in {} seconds", 
delayJittered / 1000);
-        return delayJittered;
-    }
-
-
-    /**
-     * Signals the loss of leadership and move to the acquire-leadership state.
-     */
-    private void onLeadershipLost() {
-        LOG.info("The local pod ({}) is no longer the leader of the group '{}' 
in ConfigMap '{}'", this.lockConfiguration.getPodName(), 
this.lockConfiguration.getGroupName(),
-                this.lockConfiguration.getConfigMapName());
-
-        this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
-        serializedExecutor.execute(this::acquireLeadershipCycle);
-    }
-
-    /**
-     * While in the acquire-leadership state, the controller waits for the 
current lease to expire before trying to acquire the leadership.
-     */
-    private void acquireLeadershipCycle() {
-        // wait for the current lease to finish then fire an election
-        refreshConfigMapFromCluster(false); // if possible, update
-
-        // Notify about changes in current leader if any
-        this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
-
-        if 
(!this.latestLeaderInfo.isTimeElapsedSeconds(lockConfiguration.getLeaseDurationSeconds()))
 {
-            // Wait for the lease to finish before trying leader election
-            long nextDelay = 
computeNextElectionWaitTime(this.latestLeaderInfo.getTimestamp());
-            serializedExecutor.schedule(this::acquireLeadershipCycle, 
nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
-            return;
-        }
-
-        boolean acquired = tryAcquireOrRenewLeadership();
-        if (acquired) {
-            LOG.debug("Leadership acquired for ConfigMap {}. Notification in 
progress...", this.lockConfiguration.getConfigMapName());
-            serializedExecutor.execute(this::onLeadershipAcquired);
-            return;
-        }
-
-        // Notify about changes in current leader if any
-        this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
-
-        LOG.debug("Cannot acquire the leadership for ConfigMap {}", 
this.lockConfiguration.getConfigMapName());
-        long nextDelay = 
computeNextElectionWaitTime(this.latestLeaderInfo.getTimestamp());
-        serializedExecutor.schedule(this::acquireLeadershipCycle, nextDelay + 
FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
-    }
-
-    private long computeNextElectionWaitTime(Date lastRenewal) {
-        if (lastRenewal == null) {
-            LOG.debug("Error detected while getting leadership info, next 
election timeout event will be fired in {} seconds", 
this.lockConfiguration.getRetryOnErrorIntervalSeconds());
-            return this.lockConfiguration.getRetryOnErrorIntervalSeconds();
-        }
-        long time = lastRenewal.getTime() + 
this.lockConfiguration.getLeaseDurationSeconds() * 1000
-                + jitter(this.lockConfiguration.getRetryPeriodSeconds() * 
1000, this.lockConfiguration.getJitterFactor());
-
-        long delay = Math.max(0, time - System.currentTimeMillis());
-        LOG.debug("Next election timeout event will be fired in {} seconds", 
delay / 1000);
-        return delay;
-    }
-
-    private long jitter(long num, double factor) {
-        return (long) (num * (1 + Math.random() * (factor - 1)));
-    }
-
-    private boolean tryAcquireOrRenewLeadership() {
-        LOG.debug("Trying to acquire or renew the leadership...");
-
-        ConfigMap configMap;
-        try {
-            configMap = pullConfigMap();
-        } catch (Exception e) {
-            LOG.warn("Unable to retrieve the current ConfigMap " + 
this.lockConfiguration.getConfigMapName() + " from Kubernetes", e);
-            return false;
-        }
-
-        // Info to set in the configmap to become leaders
-        LeaderInfo newLeaderInfo = new 
LeaderInfo(this.lockConfiguration.getGroupName(), 
this.lockConfiguration.getPodName(), new Date());
-
-        if (configMap == null) {
-            // No configmap created so far
-            LOG.debug("Lock configmap is not present in the Kubernetes 
namespace. A new ConfigMap will be created");
-            ConfigMap newConfigMap = 
ConfigMapLockUtils.createNewConfigMap(this.lockConfiguration.getConfigMapName(),
 newLeaderInfo);
-
-            try {
-                kubernetesClient.configMaps()
-                        
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                        .create(newConfigMap);
-            } catch (Exception ex) {
-                // Suppress exception
-                LOG.warn("Unable to create the ConfigMap, it may have been 
created by other cluster members concurrently. If the problem persists, check 
if the service account has the right "
-                        + "permissions to create it");
-                LOG.debug("Exception while trying to create the ConfigMap", 
ex);
-
-                // Try to get the configMap and return the current leader
-                refreshConfigMapFromCluster(false);
-                return isCurrentPodTheActiveLeader();
-            }
-
-            LOG.debug("ConfigMap {} successfully created and local pod is 
leader", this.lockConfiguration.getConfigMapName());
-            updateLatestLeaderInfo(newConfigMap);
-            scheduleCheckForPossibleLeadershipLoss();
-            return true;
-        } else {
-            LOG.debug("Lock configmap already present in the Kubernetes 
namespace. Checking...");
-            LeaderInfo leaderInfo = 
ConfigMapLockUtils.getLeaderInfo(configMap, 
this.lockConfiguration.getGroupName());
-
-            boolean weWereLeader = 
leaderInfo.isLeader(this.lockConfiguration.getPodName());
-            boolean leaseExpired = 
leaderInfo.isTimeElapsedSeconds(this.lockConfiguration.getLeaseDurationSeconds());
-
-            if (weWereLeader || leaseExpired) {
-                // Renew the lease or set the new leader
-                try {
-                    ConfigMap updatedConfigMap = 
ConfigMapLockUtils.getConfigMapWithNewLeader(configMap, newLeaderInfo);
-                    kubernetesClient.configMaps()
-                            
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                            
.withName(this.lockConfiguration.getConfigMapName())
-                            
.lockResourceVersion(configMap.getMetadata().getResourceVersion())
-                            .replace(updatedConfigMap);
-
-                    LOG.debug("ConfigMap {} successfully updated and local pod 
is leader", this.lockConfiguration.getConfigMapName());
-                    updateLatestLeaderInfo(updatedConfigMap);
-                    scheduleCheckForPossibleLeadershipLoss();
-                    return true;
-                } catch (Exception ex) {
-                    LOG.warn("An attempt to become leader has failed. It's 
possible that the leadership has been taken by another pod");
-                    LOG.debug("Error received during configmap lock replace", 
ex);
-
-                    // Try to get the configMap and return the current leader
-                    refreshConfigMapFromCluster(false);
-                    return isCurrentPodTheActiveLeader();
-                }
-            } else {
-                // Another pod is the leader and lease is not expired
-                LOG.debug("Another pod is the current leader and lease has not 
expired yet");
-                updateLatestLeaderInfo(configMap);
-                return false;
-            }
-        }
-    }
-
-
-    private void refreshConfigMapFromCluster(boolean retry) {
-        LOG.debug("Retrieving configmap {}", 
this.lockConfiguration.getConfigMapName());
-        try {
-            updateLatestLeaderInfo(pullConfigMap());
-        } catch (Exception ex) {
-            if (retry) {
-                LOG.warn("ConfigMap pull failed. Retrying in " + 
this.lockConfiguration.getRetryOnErrorIntervalSeconds() + " seconds...", ex);
-                try {
-                    
Thread.sleep(this.lockConfiguration.getRetryOnErrorIntervalSeconds() * 1000);
-                    refreshConfigMapFromCluster(true);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("Controller Thread interrupted, 
shutdown in progress", e);
-                }
-            } else {
-                LOG.warn("Cannot retrieve the ConfigMap: pull failed", ex);
-            }
-        }
-    }
-
-    private boolean isCurrentPodTheActiveLeader() {
-        return latestLeaderInfo != null
-                && 
latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())
-                && 
!latestLeaderInfo.isTimeElapsedSeconds(this.lockConfiguration.getRenewDeadlineSeconds());
-    }
-
-    private ConfigMap pullConfigMap() {
-        return kubernetesClient.configMaps()
-                
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                .withName(this.lockConfiguration.getConfigMapName())
-                .get();
-    }
-
-
-    private void updateLatestLeaderInfo(ConfigMap configMap) {
-        LOG.debug("Updating internal status about the current leader");
-        this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, 
this.lockConfiguration.getGroupName());
-    }
-
-    private void scheduleCheckForPossibleLeadershipLoss() {
-        // Adding check for the case of main thread busy on http calls
-        if 
(this.latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())) {
-            
this.eventDispatcherExecutor.schedule(this::checkAndNotifyNewLeader, 
this.lockConfiguration.getRenewDeadlineSeconds() * 1000 + 
FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
-        }
-    }
-
-    private void checkAndNotifyNewLeader() {
-        LOG.debug("Checking if the current leader has changed to notify the 
event handler...");
-        LeaderInfo newLeaderInfo = this.latestLeaderInfo;
-        if (newLeaderInfo == null) {
-            return;
-        }
-
-        long leaderInfoDurationSeconds = 
newLeaderInfo.isLeader(this.lockConfiguration.getPodName())
-                ? this.lockConfiguration.getRenewDeadlineSeconds()
-                : this.lockConfiguration.getLeaseDurationSeconds();
-
-        Optional<String> newLeader;
-        if (newLeaderInfo.getLeader() != null && 
!newLeaderInfo.isTimeElapsedSeconds(leaderInfoDurationSeconds)) {
-            newLeader = Optional.of(newLeaderInfo.getLeader());
-        } else {
-            newLeader = Optional.empty();
-        }
-
-        // Sending notifications in case of leader change
-        if (!newLeader.equals(this.currentLeader)) {
-            LOG.info("Current leader has changed from {} to {}. Sending 
notification...", this.currentLeader, newLeader);
-            this.currentLeader = newLeader;
-            
eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent)
 () -> newLeader);
-        } else {
-            LOG.debug("Current leader unchanged: {}", this.currentLeader);
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
index 6461708..be0b424 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
@@ -28,14 +28,10 @@ public class KubernetesLockConfiguration implements 
Cloneable {
 
     public static final String DEFAULT_CONFIGMAP_NAME = "leaders";
 
-
     public static final double DEFAULT_JITTER_FACTOR = 1.2;
-    public static final long DEFAULT_LEASE_DURATION_SECONDS = 60;
-    public static final long DEFAULT_RENEW_DEADLINE_SECONDS = 45;
-    public static final long DEFAULT_RETRY_PERIOD_SECONDS = 9;
-
-    public static final long DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS = 5;
-    public static final long DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS = 1800;
+    public static final long DEFAULT_LEASE_DURATION_MILLIS = 60000;
+    public static final long DEFAULT_RENEW_DEADLINE_MILLIS = 45000;
+    public static final long DEFAULT_RETRY_PERIOD_MILLIS = 9000;
 
     /**
      * Kubernetes namespace containing the pods and the ConfigMap used for 
locking.
@@ -63,37 +59,25 @@ public class KubernetesLockConfiguration implements 
Cloneable {
     private Map<String, String> clusterLabels = new HashMap<>();
 
     /**
-     * Indicates the maximum amount of time a Kubernetes watch should be kept 
active, before being recreated.
-     * Watch recreation can be disabled by putting value <= 0.
-     */
-    private long retryOnErrorIntervalSeconds = 
DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS;
-
-    /**
-     * A jitter factor to apply in order to prevent all pods to try to become 
leaders in the same instant.
+     * A jitter factor to apply in order to prevent all pods to call 
Kubernetes APIs in the same instant.
      */
     private double jitterFactor = DEFAULT_JITTER_FACTOR;
 
     /**
      * The default duration of the lease for the current leader.
      */
-    private long leaseDurationSeconds = DEFAULT_LEASE_DURATION_SECONDS;
-
-    /**
-     * The deadline after which the leader must stop trying to renew its 
leadership (and yield it).
-     */
-    private long renewDeadlineSeconds = DEFAULT_RENEW_DEADLINE_SECONDS;
+    private long leaseDurationMillis = DEFAULT_LEASE_DURATION_MILLIS;
 
     /**
-     * The time between two subsequent attempts to acquire/renew the 
leadership (or after the lease expiration).
-     * It is randomized using the jitter factor in case of new leader election 
(not renewal).
+     * The deadline after which the leader must stop its services because it 
may have lost the leadership.
      */
-    private long retryPeriodSeconds = DEFAULT_RETRY_PERIOD_SECONDS;
+    private long renewDeadlineMillis = DEFAULT_RENEW_DEADLINE_MILLIS;
 
     /**
-     * Set this to a positive value in order to recreate watchers after a 
certain amount of time
-     * (to prevent them becoming stale).
+     * The time between two subsequent attempts to check and acquire the 
leadership.
+     * It is randomized using the jitter factor.
      */
-    private long watchRefreshIntervalSeconds = 
DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS;
+    private long retryPeriodMillis = DEFAULT_RETRY_PERIOD_MILLIS;
 
     public KubernetesLockConfiguration() {
     }
@@ -149,14 +133,6 @@ public class KubernetesLockConfiguration implements 
Cloneable {
         this.clusterLabels = clusterLabels;
     }
 
-    public long getRetryOnErrorIntervalSeconds() {
-        return retryOnErrorIntervalSeconds;
-    }
-
-    public void setRetryOnErrorIntervalSeconds(long 
retryOnErrorIntervalSeconds) {
-        this.retryOnErrorIntervalSeconds = retryOnErrorIntervalSeconds;
-    }
-
     public double getJitterFactor() {
         return jitterFactor;
     }
@@ -165,36 +141,28 @@ public class KubernetesLockConfiguration implements 
Cloneable {
         this.jitterFactor = jitterFactor;
     }
 
-    public long getLeaseDurationSeconds() {
-        return leaseDurationSeconds;
-    }
-
-    public void setLeaseDurationSeconds(long leaseDurationSeconds) {
-        this.leaseDurationSeconds = leaseDurationSeconds;
-    }
-
-    public long getRenewDeadlineSeconds() {
-        return renewDeadlineSeconds;
+    public long getLeaseDurationMillis() {
+        return leaseDurationMillis;
     }
 
-    public void setRenewDeadlineSeconds(long renewDeadlineSeconds) {
-        this.renewDeadlineSeconds = renewDeadlineSeconds;
+    public void setLeaseDurationMillis(long leaseDurationMillis) {
+        this.leaseDurationMillis = leaseDurationMillis;
     }
 
-    public long getRetryPeriodSeconds() {
-        return retryPeriodSeconds;
+    public long getRenewDeadlineMillis() {
+        return renewDeadlineMillis;
     }
 
-    public void setRetryPeriodSeconds(long retryPeriodSeconds) {
-        this.retryPeriodSeconds = retryPeriodSeconds;
+    public void setRenewDeadlineMillis(long renewDeadlineMillis) {
+        this.renewDeadlineMillis = renewDeadlineMillis;
     }
 
-    public long getWatchRefreshIntervalSeconds() {
-        return watchRefreshIntervalSeconds;
+    public long getRetryPeriodMillis() {
+        return retryPeriodMillis;
     }
 
-    public void setWatchRefreshIntervalSeconds(long 
watchRefreshIntervalSeconds) {
-        this.watchRefreshIntervalSeconds = watchRefreshIntervalSeconds;
+    public void setRetryPeriodMillis(long retryPeriodMillis) {
+        this.retryPeriodMillis = retryPeriodMillis;
     }
 
     public KubernetesLockConfiguration copy() {
@@ -214,12 +182,10 @@ public class KubernetesLockConfiguration implements 
Cloneable {
         sb.append(", groupName='").append(groupName).append('\'');
         sb.append(", podName='").append(podName).append('\'');
         sb.append(", clusterLabels=").append(clusterLabels);
-        sb.append(", 
retryOnErrorIntervalSeconds=").append(retryOnErrorIntervalSeconds);
         sb.append(", jitterFactor=").append(jitterFactor);
-        sb.append(", leaseDurationSeconds=").append(leaseDurationSeconds);
-        sb.append(", renewDeadlineSeconds=").append(renewDeadlineSeconds);
-        sb.append(", retryPeriodSeconds=").append(retryPeriodSeconds);
-        sb.append(", 
watchRefreshIntervalSeconds=").append(watchRefreshIntervalSeconds);
+        sb.append(", leaseDurationMillis=").append(leaseDurationMillis);
+        sb.append(", renewDeadlineMillis=").append(renewDeadlineMillis);
+        sb.append(", retryPeriodMillis=").append(retryPeriodMillis);
         sb.append('}');
         return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
deleted file mode 100644
index 586a11f..0000000
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.ha.lock;
-
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClientException;
-import io.fabric8.kubernetes.client.Watch;
-import io.fabric8.kubernetes.client.Watcher;
-
-import org.apache.camel.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Monitors the list of participants in a leader election and provides the 
most recently updated list.
- * It calls the callback eventHandlers only when the member set changes w.r.t. 
the previous invocation.
- */
-class KubernetesMembersMonitor implements Service {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesMembersMonitor.class);
-
-    private ScheduledExecutorService serializedExecutor;
-
-    private KubernetesClient kubernetesClient;
-
-    private KubernetesLockConfiguration lockConfiguration;
-
-    private List<KubernetesClusterEventHandler> eventHandlers;
-
-    private Watch watch;
-
-    private boolean terminated;
-
-    private boolean refreshing;
-
-    private Set<String> previousMembers = new HashSet<>();
-
-    private Set<String> basePoll = new HashSet<>();
-    private Set<String> deleted = new HashSet<>();
-    private Set<String> added = new HashSet<>();
-
-    public KubernetesMembersMonitor(ScheduledExecutorService 
serializedExecutor, KubernetesClient kubernetesClient, 
KubernetesLockConfiguration lockConfiguration) {
-        this.serializedExecutor = serializedExecutor;
-        this.kubernetesClient = kubernetesClient;
-        this.lockConfiguration = lockConfiguration;
-        this.eventHandlers = new LinkedList<>();
-    }
-
-    public void addClusterEventHandler(KubernetesClusterEventHandler 
eventHandler) {
-        this.eventHandlers.add(eventHandler);
-    }
-
-    @Override
-    public void start() throws Exception {
-        serializedExecutor.execute(() -> doPoll(true));
-        serializedExecutor.execute(this::createWatch);
-
-        long recreationDelay = 
lockConfiguration.getWatchRefreshIntervalSeconds();
-        if (recreationDelay > 0) {
-            serializedExecutor.scheduleWithFixedDelay(this::refresh, 
recreationDelay, recreationDelay, TimeUnit.SECONDS);
-        }
-    }
-
-    private void createWatch() {
-        try {
-            LOG.debug("Starting cluster members watcher");
-            this.watch = kubernetesClient.pods()
-                    
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                    .withLabels(this.lockConfiguration.getClusterLabels())
-                    .watch(new Watcher<Pod>() {
-
-                        @Override
-                        public void eventReceived(Action action, Pod pod) {
-                            switch (action) {
-                            case DELETED:
-                                serializedExecutor.execute(() -> 
deleteAndNotify(podName(pod)));
-                                break;
-                            case ADDED:
-                                serializedExecutor.execute(() -> 
addAndNotify(podName(pod)));
-                                break;
-                            default:
-                            }
-                        }
-
-                        @Override
-                        public void onClose(KubernetesClientException e) {
-                            if (!terminated) {
-                                KubernetesMembersMonitor.this.watch = null;
-                                if (refreshing) {
-                                    LOG.info("Refreshing pod list watcher...");
-                                    
serializedExecutor.execute(KubernetesMembersMonitor.this::createWatch);
-                                } else {
-                                    LOG.warn("Pod list watcher has been closed 
unexpectedly. Recreating it in 1 second...", e);
-                                    
serializedExecutor.schedule(KubernetesMembersMonitor.this::createWatch, 1, 
TimeUnit.SECONDS);
-                                }
-                            }
-                        }
-                    });
-        } catch (Exception ex) {
-            LOG.warn("Unable to watch for pod list changes. Retrying in 5 
seconds...");
-            LOG.debug("Error while trying to watch the pod list", ex);
-
-            serializedExecutor.schedule(this::createWatch, 5, 
TimeUnit.SECONDS);
-        }
-    }
-
-    @Override
-    public void stop() throws Exception {
-        this.terminated = true;
-        Watch watch = this.watch;
-        if (watch != null) {
-            watch.close();
-        }
-    }
-
-    public void refresh() {
-        serializedExecutor.execute(() -> {
-            if (!terminated) {
-                refreshing = true;
-                try {
-                    doPoll(false);
-
-                    Watch w = this.watch;
-                    if (w != null) {
-                        // It will be recreated
-                        w.close();
-                    }
-                } finally {
-                    refreshing = false;
-                }
-            }
-        });
-    }
-
-    private void doPoll(boolean retry) {
-        LOG.debug("Starting poll to get all cluster members");
-        List<Pod> pods;
-        try {
-            pods = pollPods();
-        } catch (Exception ex) {
-            if (retry) {
-                LOG.warn("Pods poll failed. Retrying in 5 seconds...", ex);
-                this.serializedExecutor.schedule(() -> doPoll(true), 5, 
TimeUnit.SECONDS);
-            } else {
-                LOG.warn("Pods poll failed", ex);
-            }
-            return;
-        }
-
-        this.basePoll = pods.stream()
-                .map(p -> Optional.ofNullable(podName(p)))
-                .filter(Optional::isPresent)
-                .map(Optional::get)
-                .collect(Collectors.toSet());
-
-        this.added = new HashSet<>();
-        this.deleted = new HashSet<>();
-
-        LOG.debug("Base list of members is {}", this.basePoll);
-
-        checkAndNotify();
-    }
-
-    private List<Pod> pollPods() {
-        return kubernetesClient.pods()
-                
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                .withLabels(this.lockConfiguration.getClusterLabels())
-                .list().getItems();
-    }
-
-    private String podName(Pod pod) {
-        if (pod != null && pod.getMetadata() != null) {
-            return pod.getMetadata().getName();
-        }
-        return null;
-    }
-
-    private void checkAndNotify() {
-        Set<String> newMembers = new HashSet<>(basePoll);
-        newMembers.addAll(added);
-        newMembers.removeAll(deleted);
-
-        LOG.debug("Current list of members is: {}", newMembers);
-
-        if (!newMembers.equals(this.previousMembers)) {
-            LOG.debug("List of members changed: sending notifications");
-            this.previousMembers = newMembers;
-
-            for (KubernetesClusterEventHandler eventHandler : eventHandlers) {
-                
eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent)
 () -> newMembers);
-            }
-        } else {
-            LOG.debug("List of members has not changed");
-        }
-    }
-
-    private void addAndNotify(String member) {
-        LOG.debug("Adding new member to the list: {}", member);
-        if (member != null) {
-            this.added.add(member);
-            checkAndNotify();
-        }
-    }
-
-    private void deleteAndNotify(String member) {
-        LOG.debug("Deleting member to the list: {}", member);
-        if (member != null) {
-            this.deleted.add(member);
-            checkAndNotify();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java
index 50d1603..d061945 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kubernetes.ha.lock;
 
 import java.util.Date;
+import java.util.Set;
 
 import org.apache.camel.util.ObjectHelper;
 
@@ -29,28 +30,31 @@ public class LeaderInfo {
 
     private String leader;
 
-    private Date timestamp;
+    private Date localTimestamp;
+
+    private Set<String> members;
 
     public LeaderInfo() {
     }
 
-    public LeaderInfo(String groupName, String leader, Date timestamp) {
+    public LeaderInfo(String groupName, String leader, Date timestamp, 
Set<String> members) {
         this.groupName = groupName;
         this.leader = leader;
-        this.timestamp = timestamp;
+        this.localTimestamp = timestamp;
+        this.members = members;
+    }
+
+    public boolean hasEmptyLeader() {
+        return this.leader == null;
     }
 
-    public boolean isTimeElapsedSeconds(long timeSeconds) {
-        if (timestamp == null) {
-            return true;
-        }
-        long now = System.currentTimeMillis();
-        return timestamp.getTime() + timeSeconds * 1000 <= now;
+    public boolean hasValidLeader() {
+        return this.leader != null && this.members.contains(this.leader);
     }
 
-    public boolean isLeader(String pod) {
+    public boolean isValidLeader(String pod) {
         ObjectHelper.notNull(pod, "pod");
-        return pod.equals(leader);
+        return hasValidLeader() && pod.equals(leader);
     }
 
     public String getGroupName() {
@@ -69,12 +73,20 @@ public class LeaderInfo {
         this.leader = leader;
     }
 
-    public Date getTimestamp() {
-        return timestamp;
+    public Date getLocalTimestamp() {
+        return localTimestamp;
+    }
+
+    public void setLocalTimestamp(Date localTimestamp) {
+        this.localTimestamp = localTimestamp;
+    }
+
+    public Set<String> getMembers() {
+        return members;
     }
 
-    public void setTimestamp(Date timestamp) {
-        this.timestamp = timestamp;
+    public void setMembers(Set<String> members) {
+        this.members = members;
     }
 
     @Override
@@ -82,9 +94,9 @@ public class LeaderInfo {
         final StringBuilder sb = new StringBuilder("LeaderInfo{");
         sb.append("groupName='").append(groupName).append('\'');
         sb.append(", leader='").append(leader).append('\'');
-        sb.append(", timestamp=").append(timestamp);
+        sb.append(", localTimestamp=").append(localTimestamp);
+        sb.append(", members=").append(members);
         sb.append('}');
         return sb.toString();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java
new file mode 100644
index 0000000..6ada830
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.camel.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Receives information about the current leader and handles expiration 
automatically.
+ */
+public class TimedLeaderNotifier implements Service {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TimedLeaderNotifier.class);
+
+    private static final long FIXED_DELAY = 10;
+
+    private KubernetesClusterEventHandler handler;
+
+    private Lock lock = new ReentrantLock();
+
+    private ScheduledExecutorService executor;
+
+    private Optional<String> lastCommunicatedLeader = Optional.empty();
+    private Set<String> lastCommunicatedMembers = Collections.emptySet();
+
+    private Optional<String> currentLeader = Optional.empty();
+
+    private Set<String> currentMembers;
+
+    private Long timestamp;
+
+    private Long lease;
+
+    private long changeCounter;
+
+    public TimedLeaderNotifier(KubernetesClusterEventHandler handler) {
+        this.handler = Objects.requireNonNull(handler, "Handler must be 
present");
+    }
+
+    public void refreshLeadership(Optional<String> leader, Long timestamp, 
Long lease, Set<String> members) {
+        Objects.requireNonNull(leader, "leader must be non null (use 
Optional.empty)");
+        Objects.requireNonNull(members, "members must be non null (use empty 
set)");
+        long version;
+        try {
+            lock.lock();
+
+            this.currentLeader = leader;
+            this.currentMembers = members;
+            this.timestamp = timestamp;
+            this.lease = lease;
+            version = ++changeCounter;
+        } finally {
+            lock.unlock();
+        }
+
+        LOG.debug("Updated leader to {} at version version {}", leader, 
version);
+        this.executor.execute(() -> checkAndNotify(version));
+        if (leader.isPresent()) {
+            long time = System.currentTimeMillis();
+            long delay = Math.max(timestamp + lease + FIXED_DELAY - time, 
FIXED_DELAY);
+            LOG.debug("Setting expiration in {} millis for version {}", delay, 
version);
+            this.executor.schedule(() -> expiration(version), delay, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void start() throws Exception {
+        if (this.executor == null) {
+            this.executor = Executors.newSingleThreadScheduledExecutor();
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (this.executor != null) {
+            ScheduledExecutorService executor = this.executor;
+            this.executor = null;
+
+            executor.shutdownNow();
+            executor.awaitTermination(1, TimeUnit.SECONDS);
+        }
+    }
+
+    private void expiration(long version) {
+        try {
+            lock.lock();
+
+            if (version != this.changeCounter) {
+                return;
+            }
+
+            long time = System.currentTimeMillis();
+            if (time < this.timestamp + this.lease) {
+                long delay = this.timestamp + this.lease - time;
+                LOG.debug("Delaying expiration by {} millis at version version 
{}", delay + FIXED_DELAY, version);
+                this.executor.schedule(() -> expiration(version), delay + 
FIXED_DELAY, TimeUnit.MILLISECONDS);
+                return;
+            }
+        } finally {
+            lock.unlock();
+        }
+
+        checkAndNotify(version);
+    }
+
+    private void checkAndNotify(long version) {
+        Optional<String> leader;
+        Set<String> members;
+        try {
+            lock.lock();
+
+            if (version != this.changeCounter) {
+                return;
+            }
+
+            leader = this.currentLeader;
+            members = this.currentMembers;
+
+            if (leader.isPresent()) {
+                long time = System.currentTimeMillis();
+                if (time >= this.timestamp + this.lease) {
+                    leader = Optional.empty();
+                }
+            }
+
+        } finally {
+            lock.unlock();
+        }
+
+        final Optional<String> newLeader = leader;
+        if (!newLeader.equals(lastCommunicatedLeader)) {
+            lastCommunicatedLeader = newLeader;
+            handler.onKubernetesClusterEvent(new 
KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent() {
+                @Override
+                public Optional<String> getData() {
+                    return newLeader;
+                }
+            });
+        }
+
+        final Set<String> newMembers = members;
+        if (!newMembers.equals(lastCommunicatedMembers)) {
+            lastCommunicatedMembers = newMembers;
+            handler.onKubernetesClusterEvent(new 
KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent() {
+                @Override
+                public Set<String> getData() {
+                    return newMembers;
+                }
+            });
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
index 3bdffbd..4a2a11e 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
@@ -42,7 +42,10 @@ import org.junit.Test;
  */
 public class KubernetesClusterServiceTest extends CamelTestSupport {
 
-    private static final int LEASE_TIME_SECONDS = 5;
+    private static final int LEASE_TIME_MILLIS = 2000;
+    private static final int RENEW_DEADLINE_MILLIS = 1000;
+    private static final int RETRY_PERIOD_MILLIS = 200;
+    private static final double JITTER_FACTOR = 1.1;
 
     private ConfigMapLockSimulator lockSimulator;
 
@@ -75,6 +78,7 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
         mypod2.waitForAnyLeader(2, TimeUnit.SECONDS);
 
         String leader = mypod1.getCurrentLeader();
+        assertNotNull(leader);
         assertTrue(leader.startsWith("mypod"));
         assertEquals("Leaders should be equals", mypod2.getCurrentLeader(), 
leader);
     }
@@ -129,6 +133,7 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
         LeaderRecorder formerLoserRecorder = firstLeader.equals("mypod1") ? 
mypod2 : mypod1;
 
         refuseRequestsFromPod(firstLeader);
+        disconnectPod(firstLeader);
 
         formerLeaderRecorder.waitForALeaderChange(7, TimeUnit.SECONDS);
         formerLoserRecorder.waitForANewLeader(firstLeader, 7, 
TimeUnit.SECONDS);
@@ -139,12 +144,12 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
         Long lossTimestamp = formerLeaderRecorder.getLastTimeOf(l -> l == 
null);
         Long gainTimestamp = 
formerLoserRecorder.getLastTimeOf(secondLeader::equals);
 
-        assertTrue("At least 2 seconds must elapse from leadership loss and 
regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + 2000);
-        checkLeadershipChangeDistance(LEASE_TIME_SECONDS, TimeUnit.SECONDS, 
mypod1, mypod2);
+        assertTrue("At least half distance must elapse from leadership loss 
and regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + 
(LEASE_TIME_MILLIS - RENEW_DEADLINE_MILLIS) / 2);
+        checkLeadershipChangeDistance((LEASE_TIME_MILLIS - 
RENEW_DEADLINE_MILLIS) / 2, TimeUnit.MILLISECONDS, mypod1, mypod2);
     }
 
     @Test
-    public void testSlowLeaderLosingLeadership() throws Exception {
+    public void testSlowLeaderLosingLeadershipOnlyInternally() throws 
Exception {
         LeaderRecorder mypod1 = addMember("mypod1");
         LeaderRecorder mypod2 = addMember("mypod2");
         context.start();
@@ -159,17 +164,9 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
 
         delayRequestsFromPod(firstLeader, 10, TimeUnit.SECONDS);
 
-        formerLeaderRecorder.waitForALeaderChange(7, TimeUnit.SECONDS);
-        formerLoserRecorder.waitForANewLeader(firstLeader, 7, 
TimeUnit.SECONDS);
-
-        String secondLeader = formerLoserRecorder.getCurrentLeader();
-        assertNotEquals("The firstLeader should be different from the new 
one", firstLeader, secondLeader);
-
-        Long lossTimestamp = formerLeaderRecorder.getLastTimeOf(l -> l == 
null);
-        Long gainTimestamp = 
formerLoserRecorder.getLastTimeOf(secondLeader::equals);
-
-        assertTrue("At least 2 seconds must elapse from leadership loss and 
regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + 2000);
-        checkLeadershipChangeDistance(LEASE_TIME_SECONDS, TimeUnit.SECONDS, 
mypod1, mypod2);
+        Thread.sleep(LEASE_TIME_MILLIS);
+        assertNull(formerLeaderRecorder.getCurrentLeader());
+        assertEquals(firstLeader, formerLoserRecorder.getCurrentLeader());
     }
 
     @Test
@@ -185,9 +182,9 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
 
         for (int i = 0; i < 3; i++) {
             refuseRequestsFromPod(firstLeader);
-            Thread.sleep(1000);
+            Thread.sleep(RENEW_DEADLINE_MILLIS);
             allowRequestsFromPod(firstLeader);
-            Thread.sleep(2000);
+            Thread.sleep(LEASE_TIME_MILLIS);
         }
 
         assertEquals(firstLeader, mypod1.getCurrentLeader());
@@ -229,6 +226,18 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
         this.lockServers.get(pod).setRefuseRequests(false);
     }
 
+    private void disconnectPod(String pod) {
+        for (LockTestServer server : this.lockServers.values()) {
+            server.removePod(pod);
+        }
+    }
+
+    private void connectPod(String pod) {
+        for (LockTestServer server : this.lockServers.values()) {
+            server.addPod(pod);
+        }
+    }
+
     private void checkLeadershipChangeDistance(long minimum, TimeUnit unit, 
LeaderRecorder... recorders) {
         List<LeaderRecorder.LeadershipInfo> infos = Arrays.stream(recorders)
                 .flatMap(lr -> lr.getLeadershipInfo().stream())
@@ -245,7 +254,8 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
                 } else if (info.getLeader() != null && 
!info.getLeader().equals(currentLeaderLastSeen.getLeader())) {
                     // switch
                     long delay = info.getChangeTimestamp() - 
currentLeaderLastSeen.getChangeTimestamp();
-                    assertTrue("Lease time not elapsed between switch", delay 
>= TimeUnit.MILLISECONDS.convert(minimum, unit));
+                    assertTrue("Lease time not elapsed between switch, 
minimum=" + TimeUnit.MILLISECONDS.convert(minimum, unit) + ", found=" + delay, 
delay >= TimeUnit.MILLISECONDS.convert(minimum,
+                            unit));
                     currentLeaderLastSeen = info;
                 }
             }
@@ -268,11 +278,10 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
         KubernetesClusterService member = new 
KubernetesClusterService(configuration);
         member.setKubernetesNamespace("test");
         member.setPodName(name);
-        member.setLeaseDurationSeconds(LEASE_TIME_SECONDS);
-        member.setRenewDeadlineSeconds(3); // 5-3 = at least 2 seconds for 
switching on leadership loss
-        member.setRetryPeriodSeconds(1);
-        member.setRetryOnErrorIntervalSeconds(1);
-        member.setJitterFactor(1.2);
+        member.setLeaseDurationMillis(LEASE_TIME_MILLIS);
+        member.setRenewDeadlineMillis(RENEW_DEADLINE_MILLIS);
+        member.setRetryPeriodMillis(RETRY_PERIOD_MILLIS);
+        member.setJitterFactor(JITTER_FACTOR);
 
         LeaderRecorder recorder = new LeaderRecorder();
         try {
@@ -281,6 +290,10 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
         } catch (Exception ex) {
             throw new RuntimeException(ex);
         }
+
+        for (String pod : this.lockServers.keySet()) {
+            connectPod(pod);
+        }
         return recorder;
     }
 

Reply via email to