CAMEL-11331: Lease based implementation of Kubernetes lock
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e4cab329 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e4cab329 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e4cab329 Branch: refs/heads/master Commit: e4cab32911c2d825568d1de93fbf42ccbc341c92 Parents: 4548126 Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Fri Jul 7 17:05:31 2017 +0200 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Tue Aug 8 16:39:43 2017 +0200 ---------------------------------------------------------------------- .../kubernetes/ha/KubernetesClusterService.java | 117 +++++- .../kubernetes/ha/KubernetesClusterView.java | 6 +- .../kubernetes/ha/lock/ConfigMapLockUtils.java | 106 ++++++ .../ha/lock/KubernetesLeaderMonitor.java | 256 ------------- .../ha/lock/KubernetesLeadershipController.java | 211 ----------- ...ubernetesLeaseBasedLeadershipController.java | 374 +++++++++++++++++++ .../ha/lock/KubernetesLockConfiguration.java | 99 ++++- .../ha/lock/KubernetesMembersMonitor.java | 4 +- .../kubernetes/ha/lock/LeaderInfo.java | 90 +++++ 9 files changed, 767 insertions(+), 496 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java index 6d87d48..a868d16 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java @@ -19,6 +19,8 @@ package org.apache.camel.component.kubernetes.ha; import java.net.InetAddress; import java.util.Map; +import io.fabric8.kubernetes.client.KubernetesClient; + import org.apache.camel.CamelContext; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.kubernetes.KubernetesConfiguration; @@ -31,8 +33,6 @@ import org.apache.camel.util.ObjectHelper; */ public class KubernetesClusterService extends AbstractCamelClusterService<KubernetesClusterView> { - public static final String DEFAULT_CONFIGMAP_NAME = "leaders"; - private KubernetesConfiguration configuration; private KubernetesLockConfiguration lockConfiguration; @@ -64,10 +64,7 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern config.setGroupName(ObjectHelper.notNull(groupName, "groupName")); - // Check defaults (Namespace and podName can be null) - if (config.getConfigMapName() == null) { - config.setConfigMapName(DEFAULT_CONFIGMAP_NAME); - } + // Determine the pod name if not provided if (config.getPodName() == null) { config.setPodName(System.getenv("HOSTNAME")); if (config.getPodName() == null) { @@ -79,6 +76,33 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern } } + ObjectHelper.notNull(config.getConfigMapName(), "configMapName"); + ObjectHelper.notNull(config.getClusterLabels(), "clusterLabels"); + + if (config.getJitterFactor() < 1) { + throw new IllegalStateException("jitterFactor must be >= 1 (found: " + config.getJitterFactor() + ")"); + } + if (config.getRetryOnErrorIntervalSeconds() <= 0) { + throw new IllegalStateException("retryOnErrorIntervalSeconds must be > 0 (found: " + config.getRetryOnErrorIntervalSeconds() + ")"); + } + if (config.getRetryPeriodSeconds() <= 0) { + throw new IllegalStateException("retryPeriodSeconds must be > 0 (found: " + config.getRetryPeriodSeconds() + ")"); + } + if (config.getRenewDeadlineSeconds() <= 0) { + throw new IllegalStateException("renewDeadlineSeconds must be > 0 (found: " + config.getRenewDeadlineSeconds() + ")"); + } + if (config.getLeaseDurationSeconds() <= 0) { + throw new IllegalStateException("leaseDurationSeconds must be > 0 (found: " + config.getLeaseDurationSeconds() + ")"); + } + if (config.getLeaseDurationSeconds() <= config.getRenewDeadlineSeconds()) { + throw new IllegalStateException("leaseDurationSeconds must be greater than renewDeadlineSeconds " + + "(" + config.getLeaseDurationSeconds() + " is not greater than " + config.getRenewDeadlineSeconds() + ")"); + } + if (config.getRenewDeadlineSeconds() <= config.getJitterFactor() * config.getRetryPeriodSeconds()) { + throw new IllegalStateException("renewDeadlineSeconds must be greater than jitterFactor*retryPeriodSeconds " + + "(" + config.getRenewDeadlineSeconds() + " is not greater than " + config.getJitterFactor() + "*" + config.getRetryPeriodSeconds() + ")"); + } + return config; } @@ -137,15 +161,88 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern lockConfiguration.setClusterLabels(clusterLabels); } - public Long getWatchRefreshIntervalSeconds() { - return lockConfiguration.getWatchRefreshIntervalSeconds(); + public void addToClusterLabels(String key, String value) { + lockConfiguration.addToClusterLabels(key, value); + } + + public String getKubernetesResourcesNamespace() { + return lockConfiguration.getKubernetesResourcesNamespace(); + } + + /** + * Kubernetes namespace containing the pods and the ConfigMap used for locking. + */ + public void setKubernetesResourcesNamespace(String kubernetesResourcesNamespace) { + lockConfiguration.setKubernetesResourcesNamespace(kubernetesResourcesNamespace); + } + + public long getRetryOnErrorIntervalSeconds() { + return lockConfiguration.getRetryOnErrorIntervalSeconds(); } /** * Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated. - * Watch recreation can be disabled by putting a negative value (the default will be used in case of null). + * Watch recreation can be disabled by putting value <= 0. + */ + public void setRetryOnErrorIntervalSeconds(long retryOnErrorIntervalSeconds) { + lockConfiguration.setRetryOnErrorIntervalSeconds(retryOnErrorIntervalSeconds); + } + + public double getJitterFactor() { + return lockConfiguration.getJitterFactor(); + } + + /** + * A jitter factor to apply in order to prevent all pods to try to become leaders in the same instant. */ - public void setWatchRefreshIntervalSeconds(Long watchRefreshIntervalSeconds) { + public void setJitterFactor(double jitterFactor) { + lockConfiguration.setJitterFactor(jitterFactor); + } + + public long getLeaseDurationSeconds() { + return lockConfiguration.getLeaseDurationSeconds(); + } + + /** + * The default duration of the lease for the current leader. + */ + public void setLeaseDurationSeconds(long leaseDurationSeconds) { + lockConfiguration.setLeaseDurationSeconds(leaseDurationSeconds); + } + + public long getRenewDeadlineSeconds() { + return lockConfiguration.getRenewDeadlineSeconds(); + } + + /** + * The deadline after which the leader must stop trying to renew its leadership (and yield it). + */ + public void setRenewDeadlineSeconds(long renewDeadlineSeconds) { + lockConfiguration.setRenewDeadlineSeconds(renewDeadlineSeconds); + } + + public long getRetryPeriodSeconds() { + return lockConfiguration.getRetryPeriodSeconds(); + } + + /** + * The time between two subsequent attempts to acquire/renew the leadership (or after the lease expiration). + * It is randomized using the jitter factor in case of new leader election (not renewal). + */ + public void setRetryPeriodSeconds(long retryPeriodSeconds) { + lockConfiguration.setRetryPeriodSeconds(retryPeriodSeconds); + } + + public long getWatchRefreshIntervalSeconds() { + return lockConfiguration.getWatchRefreshIntervalSeconds(); + } + + /** + * Set this to a positive value in order to recreate watchers after a certain amount of time, + * to avoid having stale watchers. + */ + public void setWatchRefreshIntervalSeconds(long watchRefreshIntervalSeconds) { lockConfiguration.setWatchRefreshIntervalSeconds(watchRefreshIntervalSeconds); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java index 9ac6a86..e324b3f 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java @@ -30,7 +30,7 @@ import io.fabric8.kubernetes.client.KubernetesClient; import org.apache.camel.component.kubernetes.KubernetesConfiguration; import org.apache.camel.component.kubernetes.KubernetesHelper; import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent; -import org.apache.camel.component.kubernetes.ha.lock.KubernetesLeadershipController; +import org.apache.camel.component.kubernetes.ha.lock.KubernetesLeaseBasedLeadershipController; import org.apache.camel.component.kubernetes.ha.lock.KubernetesLockConfiguration; import org.apache.camel.ha.CamelClusterMember; import org.apache.camel.impl.ha.AbstractCamelClusterView; @@ -56,7 +56,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView { private volatile List<CamelClusterMember> currentMembers = Collections.emptyList(); - private KubernetesLeadershipController controller; + private KubernetesLeaseBasedLeadershipController controller; public KubernetesClusterView(KubernetesClusterService cluster, KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) { super(cluster, lockConfiguration.getGroupName()); @@ -86,7 +86,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView { if (controller == null) { this.kubernetesClient = KubernetesHelper.getKubernetesClient(configuration); - controller = new KubernetesLeadershipController(kubernetesClient, this.lockConfiguration, event -> { + controller = new KubernetesLeaseBasedLeadershipController(kubernetesClient, this.lockConfiguration, event -> { if (event instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) { // New leader Optional<String> leader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData(); http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java new file mode 100644 index 0000000..84718f3 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.lock; + +import java.text.SimpleDateFormat; +import java.util.Date; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public final class ConfigMapLockUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ConfigMapLockUtils.class); + + private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ssX"; + + private static final String LEADER_PREFIX = "leader.pod."; + + private static final String TIMESTAMP_PREFIX = "leader.timestamp."; + + private ConfigMapLockUtils() { + } + + public static ConfigMap createNewConfigMap(String configMapName, LeaderInfo leaderInfo) { + return new ConfigMapBuilder(). + withNewMetadata() + .withName(configMapName) + .addToLabels("provider", "camel") + .addToLabels("kind", "locks"). + endMetadata() + .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader()) + .addToData(TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getTimestamp())) + .build(); + } + + public static ConfigMap getConfigMapWithNewLeader(ConfigMap configMap, LeaderInfo leaderInfo) { + return new ConfigMapBuilder(configMap) + .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader()) + .addToData(TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getTimestamp())) + .build(); + } + + public static LeaderInfo getLeaderInfo(ConfigMap configMap, String group) { + return new LeaderInfo(group, getLeader(configMap, group), getTimestamp(configMap, group)); + } + + private static String getLeader(ConfigMap configMap, String group) { + return getConfigMapValue(configMap, LEADER_PREFIX + group); + } + + private static String formatDate(Date date) { + if (date == null) { + return null; + } + try { + return new SimpleDateFormat(DATE_TIME_FORMAT).format(date); + } catch (Exception e) { + LOG.warn("Unable to format date '" + date + "' using format " + DATE_TIME_FORMAT, e); + } + + return null; + } + + private static Date getTimestamp(ConfigMap configMap, String group) { + String timestamp = getConfigMapValue(configMap, TIMESTAMP_PREFIX + group); + if (timestamp == null) { + return null; + } + + try { + return new SimpleDateFormat(DATE_TIME_FORMAT).parse(timestamp); + } catch (Exception e) { + LOG.warn("Unable to parse time string '" + timestamp + "' using format " + DATE_TIME_FORMAT, e); + } + + return null; + } + + private static String getConfigMapValue(ConfigMap configMap, String key) { + if (configMap == null || configMap.getData() == null) { + return null; + } + return configMap.getData().get(key); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java deleted file mode 100644 index 5555fe1..0000000 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java +++ /dev/null @@ -1,256 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kubernetes.ha.lock; - -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watch; -import io.fabric8.kubernetes.client.Watcher; - -import org.apache.camel.Service; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Monitors continuously the configmap to detect changes in leadership. - * It calls the callback eventHandlers only when the leader changes w.r.t. the previous invocation. - */ -class KubernetesLeaderMonitor implements Service { - - private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderMonitor.class); - - private ScheduledExecutorService serializedExecutor; - - private KubernetesClient kubernetesClient; - - private KubernetesLockConfiguration lockConfiguration; - - private List<KubernetesClusterEventHandler> eventHandlers; - - private Watch watch; - - private boolean terminated; - - private boolean refreshing; - - private ConfigMap latestConfigMap; - - public KubernetesLeaderMonitor(ScheduledExecutorService serializedExecutor, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration) { - this.serializedExecutor = serializedExecutor; - this.kubernetesClient = kubernetesClient; - this.lockConfiguration = lockConfiguration; - this.eventHandlers = new LinkedList<>(); - } - - public void addClusterEventHandler(KubernetesClusterEventHandler leaderEventHandler) { - this.eventHandlers.add(leaderEventHandler); - } - - @Override - public void start() throws Exception { - this.terminated = false; - serializedExecutor.execute(this::startWatch); - serializedExecutor.execute(() -> doPoll(true)); - - long recreationDelay = lockConfiguration.getWatchRefreshIntervalSecondsOrDefault(); - if (recreationDelay > 0) { - serializedExecutor.scheduleWithFixedDelay(this::refresh, recreationDelay, recreationDelay, TimeUnit.SECONDS); - } - } - - @Override - public void stop() throws Exception { - this.terminated = true; - Watch watch = this.watch; - if (watch != null) { - watch.close(); - } - } - - public void refresh() { - serializedExecutor.execute(() -> { - if (!terminated) { - refreshing = true; - try { - doPoll(false); - - Watch w = this.watch; - if (w != null) { - // It will be recreated - w.close(); - } - } finally { - refreshing = false; - } - } - }); - } - - private void startWatch() { - try { - LOG.debug("Starting ConfigMap watcher for monitoring the current leader"); - this.watch = kubernetesClient.configMaps() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .withName(this.lockConfiguration.getConfigMapName()) - .watch(new Watcher<ConfigMap>() { - - @Override - public void eventReceived(Action action, ConfigMap configMap) { - switch (action) { - case MODIFIED: - case DELETED: - case ADDED: - LOG.debug("Received update from watch on ConfigMap {}", configMap); - serializedExecutor.execute(() -> checkAndNotify(configMap)); - break; - default: - } - } - - @Override - public void onClose(KubernetesClientException e) { - if (!terminated) { - KubernetesLeaderMonitor.this.watch = null; - if (refreshing) { - LOG.info("Refreshing ConfigMap watcher..."); - serializedExecutor.execute(KubernetesLeaderMonitor.this::startWatch); - } else { - LOG.warn("ConfigMap watcher has been closed unexpectedly. Recreating it in 1 second...", e); - serializedExecutor.schedule(KubernetesLeaderMonitor.this::startWatch, 1, TimeUnit.SECONDS); - } - } - } - }); - } catch (Exception ex) { - LOG.warn("Unable to watch for configmap changes. Retrying in 5 seconds..."); - LOG.debug("Error while trying to watch the configmap", ex); - - this.serializedExecutor.schedule(this::startWatch, 5, TimeUnit.SECONDS); - } - } - - private void doPoll(boolean retry) { - LOG.debug("Starting poll to get configmap {}", this.lockConfiguration.getConfigMapName()); - ConfigMap configMap; - try { - configMap = pollConfigMap(); - } catch (Exception ex) { - if (retry) { - LOG.warn("ConfigMap poll failed. Retrying in 5 seconds...", ex); - this.serializedExecutor.schedule(() -> doPoll(true), 5, TimeUnit.SECONDS); - } else { - LOG.warn("ConfigMap poll failed", ex); - } - return; - } - - checkAndNotify(configMap); - } - - private void checkAndNotify(ConfigMap candidateConfigMap) { - LOG.debug("Checking configMap {}", candidateConfigMap); - ConfigMap newConfigMap = newest(this.latestConfigMap, candidateConfigMap); - Optional<String> leader = extractLeader(newConfigMap); - Optional<String> oldLeader = extractLeader(this.latestConfigMap); - - this.latestConfigMap = newConfigMap; - - LOG.debug("The new leader is {}. Old leader was {}", leader, oldLeader); - if (!leader.equals(oldLeader)) { - LOG.debug("Notifying the new leader to all eventHandlers"); - for (KubernetesClusterEventHandler eventHandler : eventHandlers) { - eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> leader); - } - } else { - LOG.debug("Leader has not changed"); - } - } - - private ConfigMap pollConfigMap() { - return kubernetesClient.configMaps() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .withName(this.lockConfiguration.getConfigMapName()) - .get(); - } - - private Optional<String> extractLeader(ConfigMap configMap) { - Optional<String> leader = Optional.empty(); - if (configMap != null && configMap.getData() != null) { - leader = Optional.ofNullable(configMap.getData().get(this.lockConfiguration.getGroupName())); - } - return leader; - } - - private ConfigMap newest(ConfigMap configMap1, ConfigMap configMap2) { - ConfigMap newest = null; - - if (configMap1 != null && configMap2 == null) { - newest = configMap1; - } else if (configMap1 == null && configMap2 != null) { - newest = configMap2; - } - - if (newest == null) { - String rv1 = extractResourceVersion(configMap1); - String rv2 = extractResourceVersion(configMap2); - newest = newest(configMap1, configMap2, rv1, rv2); - } - - if (newest == null) { - String ct1 = extractCreationTimestamp(configMap1); - String ct2 = extractCreationTimestamp(configMap2); - // timestamps are string-comparable - newest = newest(configMap1, configMap2, ct1, ct2); - } - - return newest; - } - - private <T extends Comparable<T>> ConfigMap newest(ConfigMap configMap1, ConfigMap configMap2, T cmp1, T cmp2) { - if (cmp1 != null && cmp2 != null) { - int comp = cmp1.compareTo(cmp2); - if (comp > 0) { - return configMap1; - } else { - return configMap2; - } - } - return null; - } - - private String extractResourceVersion(ConfigMap configMap) { - if (configMap != null && configMap.getMetadata() != null) { - return configMap.getMetadata().getResourceVersion(); - } - return null; - } - - private String extractCreationTimestamp(ConfigMap configMap) { - if (configMap != null && configMap.getMetadata() != null) { - return configMap.getMetadata().getCreationTimestamp(); - } - return null; - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java deleted file mode 100644 index ad2f9bc..0000000 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kubernetes.ha.lock; - -import java.util.Collections; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.api.model.ConfigMapBuilder; -import io.fabric8.kubernetes.client.KubernetesClient; - -import org.apache.camel.Service; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Start the monitors and participate to leader election when no active leaders are present. - * It communicates changes in leadership and cluster members to the given event handler. - */ -public class KubernetesLeadershipController implements Service { - - private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeadershipController.class); - - private KubernetesClient kubernetesClient; - - private KubernetesLockConfiguration lockConfiguration; - - private ScheduledExecutorService executor; - - private KubernetesLeaderMonitor leaderMonitor; - - private KubernetesMembersMonitor membersMonitor; - - private Optional<String> currentLeader; - - private Set<String> currentMembers; - - private KubernetesClusterEventHandler eventHandler; - - public KubernetesLeadershipController(KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) { - - this.kubernetesClient = kubernetesClient; - this.lockConfiguration = lockConfiguration; - this.eventHandler = eventHandler; - - this.currentLeader = Optional.empty(); - this.currentMembers = Collections.emptySet(); - } - - @Override - public void start() throws Exception { - - if (executor == null) { - executor = Executors.newSingleThreadScheduledExecutor(); // No concurrency - leaderMonitor = new KubernetesLeaderMonitor(this.executor, this.kubernetesClient, this.lockConfiguration); - membersMonitor = new KubernetesMembersMonitor(this.executor, this.kubernetesClient, this.lockConfiguration); - - leaderMonitor.addClusterEventHandler(e -> executor.execute(() -> onLeaderChanged(e))); - if (eventHandler != null) { - leaderMonitor.addClusterEventHandler(eventHandler); - } - - membersMonitor.addClusterEventHandler(e -> executor.execute(() -> onMembersChanged(e))); - if (eventHandler != null) { - membersMonitor.addClusterEventHandler(eventHandler); - } - - // Start all services - leaderMonitor.start(); - membersMonitor.start(); - - // Fire a new election if possible - executor.execute(this::runLeaderElection); - } - - } - - @Override - public void stop() throws Exception { - if (executor != null) { - membersMonitor.stop(); - leaderMonitor.stop(); - executor.shutdown(); - executor.shutdownNow(); - - membersMonitor = null; - leaderMonitor = null; - executor = null; - } - } - - private void onLeaderChanged(KubernetesClusterEvent e) { - Optional<String> newLeader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(e).getData(); - if (!newLeader.isPresent()) { - executor.execute(this::tryLeaderElection); - } - this.currentLeader = newLeader; - } - - private void onMembersChanged(KubernetesClusterEvent e) { - Set<String> newMembers = KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent.class.cast(e).getData(); - if (currentLeader.isPresent()) { - // Check if the current leader is still present in the list - if (!newMembers.contains(currentLeader.get()) && currentMembers.contains(currentLeader.get())) { - executor.execute(this::runLeaderElection); - } - } - this.currentMembers = newMembers; - } - - private void runLeaderElection() { - boolean finished = false; - try { - finished = tryLeaderElection(); - } catch (Exception ex) { - LOG.warn("Exception while trying to acquire the leadership", ex); - } - - if (!finished) { - executor.schedule(this::runLeaderElection, 1, TimeUnit.SECONDS); - } - } - - private boolean tryLeaderElection() { - LOG.debug("Starting leader election"); - if (!currentMembers.contains(this.lockConfiguration.getPodName())) { - LOG.debug("The current pod ({}) is not in the list of participating pods {}. Cannot participate to the election", this.lockConfiguration.getPodName(), currentMembers); - return false; - } - - ConfigMap configMap = kubernetesClient.configMaps() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .withName(this.lockConfiguration.getConfigMapName()) - .get(); - - if (configMap == null) { - // No configmap created so far - LOG.info("Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created"); - - ConfigMap newConfigMap = new ConfigMapBuilder(). - withNewMetadata() - .withName(this.lockConfiguration.getConfigMapName()) - .addToLabels("provider", "camel") - .addToLabels("kind", "locks"). - endMetadata() - .addToData(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName()) - .build(); - - try { - kubernetesClient.configMaps() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .create(newConfigMap); - } catch (Exception ex) { - // Suppress exception - LOG.warn("Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has the right " - + "permissions to create it"); - LOG.debug("Exception while trying to create the ConfigMap", ex); - return false; - } - return true; - } else { - LOG.info("Lock configmap already present in the Kubernetes namespace. Checking..."); - Map<String, String> leaders = configMap.getData(); - Optional<String> oldLeader = leaders != null ? Optional.ofNullable(leaders.get(this.lockConfiguration.getGroupName())) : Optional.empty(); - - boolean noLeaderPresent = !oldLeader.isPresent() || !currentMembers.contains(oldLeader.get()); - boolean alreadyLeader = oldLeader.isPresent() && oldLeader.get().equals(this.lockConfiguration.getPodName()); - - if (noLeaderPresent && !alreadyLeader) { - LOG.info("Trying to acquire the lock in configmap={}, key={}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName()); - ConfigMap newConfigMap = new ConfigMapBuilder(configMap) - .addToData(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName()) - .build(); - - kubernetesClient.configMaps() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .withName(this.lockConfiguration.getConfigMapName()) - .lockResourceVersion(configMap.getMetadata().getResourceVersion()) - .replace(newConfigMap); - - LOG.info("Lock acquired for configmap={}, key={}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName()); - } else if (!noLeaderPresent) { - LOG.info("A leader is already present for configmap={}, key={}: {}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName(), oldLeader); - } else { - LOG.info("This pod ({}) is already the leader for configmap={}, key={}", this.lockConfiguration.getPodName(), this.lockConfiguration.getConfigMapName(), this.lockConfiguration - .getGroupName()); - } - return true; - } - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java new file mode 100644 index 0000000..b385925 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.lock; + +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.client.KubernetesClient; + +import org.apache.camel.Service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Monitors current status and participate to leader election when no active leaders are present. + * It communicates changes in leadership and cluster members to the given event handler. + */ +public class KubernetesLeaseBasedLeadershipController implements Service { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaseBasedLeadershipController.class); + + private static final long FIXED_ADDITIONAL_DELAY = 100; + + private KubernetesClient kubernetesClient; + + private KubernetesLockConfiguration lockConfiguration; + + private KubernetesClusterEventHandler eventHandler; + + private ScheduledExecutorService serializedExecutor; + private ScheduledExecutorService eventDispatcherExecutor; + + private KubernetesMembersMonitor membersMonitor; + + private Optional<String> currentLeader = Optional.empty(); + + private volatile LeaderInfo latestLeaderInfo; + + public KubernetesLeaseBasedLeadershipController(KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) { + this.kubernetesClient = kubernetesClient; + this.lockConfiguration = lockConfiguration; + this.eventHandler = eventHandler; + } + + @Override + public void start() throws Exception { + if (serializedExecutor == null) { + LOG.debug("Starting leadership controller..."); + serializedExecutor = Executors.newSingleThreadScheduledExecutor(); + + eventDispatcherExecutor = Executors.newSingleThreadScheduledExecutor(); + + membersMonitor = new KubernetesMembersMonitor(this.serializedExecutor, this.kubernetesClient, this.lockConfiguration); + if (eventHandler != null) { + membersMonitor.addClusterEventHandler(eventHandler); + } + + membersMonitor.start(); + serializedExecutor.execute(this::initialization); + } + } + + @Override + public void stop() throws Exception { + LOG.debug("Stopping leadership controller..."); + if (serializedExecutor != null) { + serializedExecutor.shutdownNow(); + } + if (eventDispatcherExecutor != null) { + eventDispatcherExecutor.shutdown(); + eventDispatcherExecutor.awaitTermination(2, TimeUnit.SECONDS); + eventDispatcherExecutor.shutdownNow(); + } + if (membersMonitor != null) { + membersMonitor.stop(); + } + + membersMonitor = null; + eventDispatcherExecutor = null; + serializedExecutor = null; + } + + /** + * Get the first ConfigMap and setup the initial state. + */ + private void initialization() { + LOG.debug("Reading (with retry) the configmap {} to detect the current leader", this.lockConfiguration.getConfigMapName()); + refreshConfigMapFromCluster(true); + + if (isCurrentPodTheActiveLeader()) { + serializedExecutor.execute(this::onLeadershipAcquired); + } else { + LOG.info("The current pod ({}) is not the leader of the group '{}' in ConfigMap '{}' at this time", this.lockConfiguration.getPodName(), this.lockConfiguration + .getGroupName(), this.lockConfiguration.getConfigMapName()); + serializedExecutor.execute(this::acquireLeadershipCycle); + } + } + + /** + * Signals the acquisition of the leadership and move to the keep-leadership state. + */ + private void onLeadershipAcquired() { + LOG.info("The current pod ({}) is now the leader of the group '{}' in ConfigMap '{}'", this.lockConfiguration.getPodName(), this.lockConfiguration + .getGroupName(), this.lockConfiguration.getConfigMapName()); + + this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader); + + long nextDelay = computeNextRenewWaitTime(this.latestLeaderInfo.getTimestamp(), this.latestLeaderInfo.getTimestamp()); + serializedExecutor.schedule(this::keepLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS); + } + + /** + * While in the keep-leadership state, the controller periodically renews the lease. + * If a renewal deadline is met and it was not possible to renew the lease, the leadership is lost. + */ + private void keepLeadershipCycle() { + // renew lease periodically + refreshConfigMapFromCluster(false); // if possible, update + + if (this.latestLeaderInfo.isTimeElapsedSeconds(lockConfiguration.getRenewDeadlineSeconds()) || !this.latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())) { + // Time over for renewal or leadership lost + LOG.debug("The current pod ({}) has lost the leadership", this.lockConfiguration.getPodName()); + serializedExecutor.execute(this::onLeadershipLost); + return; + } + + boolean success = tryAcquireOrRenewLeadership(); + LOG.debug("Attempted to renew the lease. Success={}", success); + + long nextDelay = computeNextRenewWaitTime(this.latestLeaderInfo.getTimestamp(), new Date()); + serializedExecutor.schedule(this::keepLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS); + } + + /** + * Compute the timestamp of next event while in keep-leadership state. + */ + private long computeNextRenewWaitTime(Date lastRenewal, Date lastRenewalAttempt) { + long timeDeadline = lastRenewal.getTime() + this.lockConfiguration.getRenewDeadlineSeconds() * 1000; + long timeRetry; + long counter = 0; + do { + counter++; + timeRetry = lastRenewal.getTime() + counter * this.lockConfiguration.getRetryPeriodSeconds() * 1000; + } while (timeRetry < lastRenewalAttempt.getTime() && timeRetry < timeDeadline); + + long time = Math.min(timeRetry, timeDeadline); + long delay = Math.max(0, time - System.currentTimeMillis()); + LOG.debug("Next renewal timeout event will be fired in {} seconds", delay / 1000); + return delay; + } + + + /** + * Signals the loss of leadership and move to the acquire-leadership state. + */ + private void onLeadershipLost() { + LOG.info("The local pod ({}) is no longer the leader of the group '{}' in ConfigMap '{}'", this.lockConfiguration.getPodName(), this.lockConfiguration.getGroupName(), + this.lockConfiguration.getConfigMapName()); + + this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader); + serializedExecutor.execute(this::acquireLeadershipCycle); + } + + /** + * While in the acquire-leadership state, the controller waits for the current lease to expire before trying to acquire the leadership. + */ + private void acquireLeadershipCycle() { + // wait for the current lease to finish then fire an election + refreshConfigMapFromCluster(false); // if possible, update + + // Notify about changes in current leader if any + this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader); + + if (!this.latestLeaderInfo.isTimeElapsedSeconds(lockConfiguration.getLeaseDurationSeconds())) { + // Wait for the lease to finish before trying leader election + long nextDelay = computeNextElectionWaitTime(this.latestLeaderInfo.getTimestamp()); + serializedExecutor.schedule(this::acquireLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS); + return; + } + + boolean acquired = tryAcquireOrRenewLeadership(); + if (acquired) { + LOG.debug("Leadership acquired for ConfigMap {}. Notification in progress...", this.lockConfiguration.getConfigMapName()); + serializedExecutor.execute(this::onLeadershipAcquired); + return; + } + + // Notify about changes in current leader if any + this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader); + + LOG.debug("Cannot acquire the leadership for ConfigMap {}", this.lockConfiguration.getConfigMapName()); + long nextDelay = computeNextElectionWaitTime(this.latestLeaderInfo.getTimestamp()); + serializedExecutor.schedule(this::acquireLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS); + } + + private long computeNextElectionWaitTime(Date lastRenewal) { + if (lastRenewal == null) { + LOG.debug("Error detected while getting leadership info, next election timeout event will be fired in {} seconds", this.lockConfiguration.getRetryOnErrorIntervalSeconds()); + return this.lockConfiguration.getRetryOnErrorIntervalSeconds(); + } + long time = lastRenewal.getTime() + this.lockConfiguration.getLeaseDurationSeconds() * 1000 + + jitter(this.lockConfiguration.getRetryPeriodSeconds() * 1000, this.lockConfiguration.getJitterFactor()); + + long delay = Math.max(0, time - System.currentTimeMillis()); + LOG.debug("Next election timeout event will be fired in {} seconds", delay / 1000); + return delay; + } + + private long jitter(long num, double factor) { + return (long) (num * (1 + Math.random() * (factor - 1))); + } + + private boolean tryAcquireOrRenewLeadership() { + LOG.debug("Trying to acquire or renew the leadership..."); + + ConfigMap configMap; + try { + configMap = pullConfigMap(); + } catch (Exception e) { + LOG.warn("Unable to retrieve the current ConfigMap " + this.lockConfiguration.getConfigMapName() + " from Kubernetes", e); + return false; + } + + // Info to set in the configmap to become leaders + LeaderInfo newLeaderInfo = new LeaderInfo(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName(), new Date()); + + if (configMap == null) { + // No configmap created so far + LOG.debug("Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created"); + ConfigMap newConfigMap = ConfigMapLockUtils.createNewConfigMap(this.lockConfiguration.getConfigMapName(), newLeaderInfo); + + try { + kubernetesClient.configMaps() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .create(newConfigMap); + } catch (Exception ex) { + // Suppress exception + LOG.warn("Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has the right " + + "permissions to create it"); + LOG.debug("Exception while trying to create the ConfigMap", ex); + + // Try to get the configMap and return the current leader + refreshConfigMapFromCluster(false); + return isCurrentPodTheActiveLeader(); + } + + LOG.debug("ConfigMap {} successfully created and local pod is leader", this.lockConfiguration.getConfigMapName()); + updateLatestLeaderInfo(newConfigMap); + return true; + } else { + LOG.debug("Lock configmap already present in the Kubernetes namespace. Checking..."); + LeaderInfo leaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, this.lockConfiguration.getGroupName()); + + boolean weWereLeader = leaderInfo.isLeader(this.lockConfiguration.getPodName()); + boolean leaseExpired = leaderInfo.isTimeElapsedSeconds(this.lockConfiguration.getLeaseDurationSeconds()); + + if (weWereLeader || leaseExpired) { + // Renew the lease or set the new leader + try { + ConfigMap updatedConfigMap = ConfigMapLockUtils.getConfigMapWithNewLeader(configMap, newLeaderInfo); + kubernetesClient.configMaps() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .withName(this.lockConfiguration.getConfigMapName()) + .lockResourceVersion(configMap.getMetadata().getResourceVersion()) + .replace(updatedConfigMap); + + LOG.debug("ConfigMap {} successfully updated and local pod is leader", this.lockConfiguration.getConfigMapName()); + updateLatestLeaderInfo(updatedConfigMap); + return true; + } catch (Exception ex) { + LOG.warn("An attempt to become leader has failed. It's possible that the leadership has been taken by another pod"); + LOG.debug("Error received during configmap lock replace", ex); + + // Try to get the configMap and return the current leader + refreshConfigMapFromCluster(false); + return isCurrentPodTheActiveLeader(); + } + } else { + // Another pod is the leader and lease is not expired + LOG.debug("Another pod is the current leader and lease has not expired yet"); + updateLatestLeaderInfo(configMap); + return false; + } + } + } + + + private void refreshConfigMapFromCluster(boolean retry) { + LOG.debug("Retrieving configmap {}", this.lockConfiguration.getConfigMapName()); + try { + updateLatestLeaderInfo(pullConfigMap()); + } catch (Exception ex) { + if (retry) { + LOG.warn("ConfigMap pull failed. Retrying in " + this.lockConfiguration.getRetryOnErrorIntervalSeconds() + " seconds...", ex); + try { + Thread.sleep(this.lockConfiguration.getRetryOnErrorIntervalSeconds() * 1000); + refreshConfigMapFromCluster(true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Controller Thread interrupted, shutdown in progress", e); + } + } else { + LOG.warn("Cannot retrieve the ConfigMap: pull failed", ex); + } + } + } + + private boolean isCurrentPodTheActiveLeader() { + return latestLeaderInfo != null + && latestLeaderInfo.isLeader(this.lockConfiguration.getPodName()) + && !latestLeaderInfo.isTimeElapsedSeconds(this.lockConfiguration.getRenewDeadlineSeconds()); + } + + private ConfigMap pullConfigMap() { + return kubernetesClient.configMaps() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .withName(this.lockConfiguration.getConfigMapName()) + .get(); + } + + + private void updateLatestLeaderInfo(ConfigMap configMap) { + LOG.debug("Updating internal status about the current leader"); + this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, this.lockConfiguration.getGroupName()); + } + + private void checkAndNotifyNewLeader() { + LOG.debug("Checking if the current leader has changed to notify the event handler..."); + LeaderInfo newLeaderInfo = this.latestLeaderInfo; + if (newLeaderInfo == null) { + return; + } + + long leaderInfoDurationSeconds = newLeaderInfo.isLeader(this.lockConfiguration.getPodName()) + ? this.lockConfiguration.getRenewDeadlineSeconds() + : this.lockConfiguration.getLeaseDurationSeconds(); + + Optional<String> newLeader; + if (newLeaderInfo.getLeader() != null && !newLeaderInfo.isTimeElapsedSeconds(leaderInfoDurationSeconds)) { + newLeader = Optional.of(newLeaderInfo.getLeader()); + } else { + newLeader = Optional.empty(); + } + + // Sending notifications in case of leader change + if (!newLeader.equals(this.currentLeader)) { + LOG.debug("Current leader has changed from {} to {}. Sending notifications...", this.currentLeader, newLeader); + this.currentLeader = newLeader; + eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> newLeader); + } else { + LOG.debug("Current leader unchanged: {}", this.currentLeader); + } + } + + +} http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java index f203c0a..37e0251 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java @@ -26,7 +26,16 @@ import io.fabric8.kubernetes.client.KubernetesClient; */ public class KubernetesLockConfiguration implements Cloneable { - private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800; + public static final String DEFAULT_CONFIGMAP_NAME = "leaders"; + + + public static final double DEFAULT_JITTER_FACTOR = 1.2; + public static final long DEFAULT_LEASE_DURATION_SECONDS = 20; + public static final long DEFAULT_RENEW_DEADLINE_SECONDS = 15; + public static final long DEFAULT_RETRY_PERIOD_SECONDS = 6; + + public static final long DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS = 5; + public static final long DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS = 1800; /** * Kubernetes namespace containing the pods and the ConfigMap used for locking. @@ -36,7 +45,7 @@ public class KubernetesLockConfiguration implements Cloneable { /** * Name of the ConfigMap used for locking. */ - private String configMapName; + private String configMapName = DEFAULT_CONFIGMAP_NAME; /** * Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfgMap. @@ -55,9 +64,36 @@ public class KubernetesLockConfiguration implements Cloneable { /** * Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated. - * Watch recreation can be disabled by putting a negative value (the default will be used in case of null). + * Watch recreation can be disabled by putting value <= 0. + */ + private long retryOnErrorIntervalSeconds = DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS; + + /** + * A jitter factor to apply in order to prevent all pods to try to become leaders in the same instant. + */ + private double jitterFactor = DEFAULT_JITTER_FACTOR; + + /** + * The default duration of the lease for the current leader. + */ + private long leaseDurationSeconds = DEFAULT_LEASE_DURATION_SECONDS; + + /** + * The deadline after which the leader must stop trying to renew its leadership (and yield it). + */ + private long renewDeadlineSeconds = DEFAULT_RENEW_DEADLINE_SECONDS; + + /** + * The time between two subsequent attempts to acquire/renew the leadership (or after the lease expiration). + * It is randomized using the jitter factor in case of new leader election (not renewal). */ - private Long watchRefreshIntervalSeconds; + private long retryPeriodSeconds = DEFAULT_RETRY_PERIOD_SECONDS; + + /** + * Set this to a positive value in order to recreate watchers after a certain amount of time + * (to prevent them becoming stale). + */ + private long watchRefreshIntervalSeconds = DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS; public KubernetesLockConfiguration() { } @@ -113,19 +149,51 @@ public class KubernetesLockConfiguration implements Cloneable { this.clusterLabels = clusterLabels; } - public Long getWatchRefreshIntervalSeconds() { - return watchRefreshIntervalSeconds; + public long getRetryOnErrorIntervalSeconds() { + return retryOnErrorIntervalSeconds; } - public long getWatchRefreshIntervalSecondsOrDefault() { - Long interval = watchRefreshIntervalSeconds; - if (interval == null) { - interval = DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS; - } - return interval; + public void setRetryOnErrorIntervalSeconds(long retryOnErrorIntervalSeconds) { + this.retryOnErrorIntervalSeconds = retryOnErrorIntervalSeconds; + } + + public double getJitterFactor() { + return jitterFactor; + } + + public void setJitterFactor(double jitterFactor) { + this.jitterFactor = jitterFactor; + } + + public long getLeaseDurationSeconds() { + return leaseDurationSeconds; + } + + public void setLeaseDurationSeconds(long leaseDurationSeconds) { + this.leaseDurationSeconds = leaseDurationSeconds; + } + + public long getRenewDeadlineSeconds() { + return renewDeadlineSeconds; + } + + public void setRenewDeadlineSeconds(long renewDeadlineSeconds) { + this.renewDeadlineSeconds = renewDeadlineSeconds; + } + + public long getRetryPeriodSeconds() { + return retryPeriodSeconds; + } + + public void setRetryPeriodSeconds(long retryPeriodSeconds) { + this.retryPeriodSeconds = retryPeriodSeconds; + } + + public long getWatchRefreshIntervalSeconds() { + return watchRefreshIntervalSeconds; } - public void setWatchRefreshIntervalSeconds(Long watchRefreshIntervalSeconds) { + public void setWatchRefreshIntervalSeconds(long watchRefreshIntervalSeconds) { this.watchRefreshIntervalSeconds = watchRefreshIntervalSeconds; } @@ -146,6 +214,11 @@ public class KubernetesLockConfiguration implements Cloneable { sb.append(", groupName='").append(groupName).append('\''); sb.append(", podName='").append(podName).append('\''); sb.append(", clusterLabels=").append(clusterLabels); + sb.append(", retryOnErrorIntervalSeconds=").append(retryOnErrorIntervalSeconds); + sb.append(", jitterFactor=").append(jitterFactor); + sb.append(", leaseDurationSeconds=").append(leaseDurationSeconds); + sb.append(", renewDeadlineSeconds=").append(renewDeadlineSeconds); + sb.append(", retryPeriodSeconds=").append(retryPeriodSeconds); sb.append(", watchRefreshIntervalSeconds=").append(watchRefreshIntervalSeconds); sb.append('}'); return sb.toString(); http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java index d9173b2..586a11f 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java @@ -41,8 +41,6 @@ import org.slf4j.LoggerFactory; */ class KubernetesMembersMonitor implements Service { - private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800; - private static final Logger LOG = LoggerFactory.getLogger(KubernetesMembersMonitor.class); private ScheduledExecutorService serializedExecutor; @@ -81,7 +79,7 @@ class KubernetesMembersMonitor implements Service { serializedExecutor.execute(() -> doPoll(true)); serializedExecutor.execute(this::createWatch); - long recreationDelay = lockConfiguration.getWatchRefreshIntervalSecondsOrDefault(); + long recreationDelay = lockConfiguration.getWatchRefreshIntervalSeconds(); if (recreationDelay > 0) { serializedExecutor.scheduleWithFixedDelay(this::refresh, recreationDelay, recreationDelay, TimeUnit.SECONDS); } http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java new file mode 100644 index 0000000..50d1603 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.lock; + +import java.util.Date; + +import org.apache.camel.util.ObjectHelper; + +/** + * Overview of a leadership status. + */ +public class LeaderInfo { + + private String groupName; + + private String leader; + + private Date timestamp; + + public LeaderInfo() { + } + + public LeaderInfo(String groupName, String leader, Date timestamp) { + this.groupName = groupName; + this.leader = leader; + this.timestamp = timestamp; + } + + public boolean isTimeElapsedSeconds(long timeSeconds) { + if (timestamp == null) { + return true; + } + long now = System.currentTimeMillis(); + return timestamp.getTime() + timeSeconds * 1000 <= now; + } + + public boolean isLeader(String pod) { + ObjectHelper.notNull(pod, "pod"); + return pod.equals(leader); + } + + public String getGroupName() { + return groupName; + } + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + public String getLeader() { + return leader; + } + + public void setLeader(String leader) { + this.leader = leader; + } + + public Date getTimestamp() { + return timestamp; + } + + public void setTimestamp(Date timestamp) { + this.timestamp = timestamp; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("LeaderInfo{"); + sb.append("groupName='").append(groupName).append('\''); + sb.append(", leader='").append(leader).append('\''); + sb.append(", timestamp=").append(timestamp); + sb.append('}'); + return sb.toString(); + } + +}