yifan-c commented on code in PR #189: URL: https://github.com/apache/cassandra-sidecar/pull/189#discussion_r1966204577
########## server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraClientTokenRingProvider.java: ########## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.coordination; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Range; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.Token; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.cluster.locator.LocalTokenRangesProvider; +import org.apache.cassandra.sidecar.common.client.SidecarInstance; +import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner; +import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioners; +import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; +import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + + +/** + * Class for getting token range related information using cassandra client's session. + * Token ranges are cached to avoid making dns calls when cluster topology has not changed. + */ +@Singleton +public class CassandraClientTokenRingProvider extends TokenRingProvider implements LocalTokenRangesProvider +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClientTokenRingProvider.class); + @GuardedBy("this") + private volatile Map<String, Map<String, List<Range<BigInteger>>>> assignedRangesOfAllInstancesByDcCache = null; + @GuardedBy("this") + private volatile Map<Host, Integer> localHostsCache = null; + @GuardedBy("this") + private volatile Set<Host> allInstancesCache = null; + + + @Inject + public CassandraClientTokenRingProvider(InstancesMetadata instancesMetadata, InstanceMetadataFetcher instanceMetadataFetcher, DnsResolver dnsResolver) + { + super(instancesMetadata, instanceMetadataFetcher, dnsResolver); + } + + @Override + @Nullable + public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace, boolean forceRefresh) + { + checkAndReloadReloadCaches(); + Metadata metadata = instancesMetadata.instances().get(0).delegate().metadata(); + if (keyspace == null || metadata.getKeyspace(keyspace) == null) + { + throw new NoSuchElementException("Keyspace does not exist. keyspace: " + keyspace); + } + return perKeySpaceTokenRangesOfAllInstances(metadata).get(keyspace) + .entrySet() + .stream() + .filter(entry -> localHostsCache.containsKey(entry.getKey())) + .collect(Collectors.toMap(entry -> localHostsCache.get(entry.getKey()), Map.Entry::getValue)); + } + + public Set<Host> localInstances() + { + checkAndReloadReloadCaches(); + return localHostsCache.keySet(); + } + + public Set<Host> allInstances() + { + checkAndReloadReloadCaches(); + return allInstancesCache; + } + + @Override + protected Map<String, List<Range<BigInteger>>> getAllTokenRanges(Partitioner partitioner, + String dc) + { + checkAndReloadReloadCaches(); + return assignedRangesOfAllInstancesByDcCache.entrySet() + .stream() + .filter(entry -> dc == null || dc.equalsIgnoreCase(entry.getKey())) + .flatMap(entry -> entry.getValue().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public Map<String, List<Range<BigInteger>>> getPrimaryRanges(SidecarInstance instance, + String dc) + { + checkAndReloadReloadCaches(); + return assignedRangesOfAllInstancesByDcCache.entrySet() + .stream() + .filter(entry -> dc == null || dc.equalsIgnoreCase(entry.getKey())) + .flatMap(entry -> entry.getValue().entrySet().stream()) + .filter(entry -> matchesSidecar(entry.getKey(), instance)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public Set<String> dcs() + { + List<InstanceMetadata> localInstances = instancesMetadata.instances(); + Metadata metadata = validatedMetadata(localInstances); + return metadata.getAllHosts().stream().map(Host::getDatacenter).collect(Collectors.toSet()); + } + + @Override + public Partitioner partitioner() + { + return extractPartitioner(instancesMetadata.instances().get(0).delegate().metadata()); + } + + public static Partitioner extractPartitioner(Metadata metadata) + { + String[] tokens = metadata.getPartitioner().split("\\."); + return Partitioners.from(tokens[tokens.length - 1]); + } + + private void checkAndReloadReloadCaches() + { + List<InstanceMetadata> localInstances = instancesMetadata.instances(); + Metadata metadata = validatedMetadata(localInstances); + + Set<Integer> localInstanceIds = localInstances.stream().map(InstanceMetadata::id).collect(Collectors.toSet()); + if (localHostsCache == null + || !new HashSet<>(localHostsCache.values()).equals(localInstanceIds) + || allInstancesCache == null + || !allInstancesCache.equals(metadata.getAllHosts())) + { + synchronized (this) + { + // If cluster configuration changes. + localHostsCache = localInstanceIds(); + allInstancesCache = metadata.getAllHosts(); + assignedRangesOfAllInstancesByDcCache = assignedRangesOfAllInstancesByDc(metadata); + } + } + } + + private boolean matchesSidecar(String hostIp, SidecarInstance sidecarInstance) + { + return getIp(sidecarInstance.hostname()).equals(hostIp); + } + + public Map<String, Map<String, List<Range<BigInteger>>>> assignedRangesOfAllInstancesByDc(Metadata metadata) + { + return assignedRangesOfAllInstancesByDc(dnsResolver, metadata); + } + + @VisibleForTesting + public static Map<String, Map<String, List<Range<BigInteger>>>> assignedRangesOfAllInstancesByDc(DnsResolver dnsResolver, Metadata metadata) + { + Partitioner partitioner = extractPartitioner(metadata); + Map<String, List<CassandraInstance>> perDcHosts = new HashMap<>(4); + for (Host host : metadata.getAllHosts()) + { + Token minToken = host.getTokens().stream().min(Comparable::compareTo).orElseThrow(() -> new RuntimeException("No token found for host: " + host)); + perDcHosts.computeIfAbsent(host.getDatacenter(), (dc) -> new ArrayList<>()) + .add(new CassandraInstance(tokenToString(minToken), getIpFromHost(dnsResolver, host))); + } + perDcHosts.forEach((dc, hosts) -> hosts.sort(Comparator.comparing(o -> new BigInteger(o.token)))); + + return perDcHosts.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> calculateTokenRanges(partitioner, e.getValue()))); + } + + protected static String tokenToString(Token token) + { + if (token.getType() == DataType.bigint()) + { + return Long.toString((long) token.getValue()); + } + else if (token.getType() == DataType.varint()) + { + return token.getValue().toString(); + } + throw new UnsupportedOperationException("Unsupported token type: " + token.getType()); + } + + protected static Map<String, List<Range<BigInteger>>> calculateTokenRanges(Partitioner partitioner, List<CassandraInstance> sortedPerDcHosts) + { + // RingTopologyRefresher.calculate... + return calculateTokenRanges(sortedPerDcHosts, 1, partitioner) + .entries().stream() + .collect(Collectors.groupingBy(e -> e.getKey().node, Collectors.mapping(Map.Entry::getValue, Collectors.toList()))); + } + + private Map<Host, Integer> localInstanceIds() + { + List<InstanceMetadata> localInstances = instancesMetadata.instances(); + Set<Host> hosts = localInstances.get(0).delegate().metadata().getAllHosts(); + Map<String, Integer> localIps = localInstances.stream() + .collect(Collectors.toMap(instanceMetadata -> getIp(instanceMetadata.host()), InstanceMetadata::id)); + Map<String, Host> ipToHost = hosts.stream() + .collect(Collectors.toMap(this::getIpFromHost, Function.identity())); + + return localIps.entrySet() + .stream() + .collect(Collectors.toMap(entry -> ipToHost.get(entry.getKey()), entry -> localIps.get(entry.getKey()))); + } + + private Metadata validatedMetadata(List<InstanceMetadata> localInstances) + { + if (localInstances.isEmpty()) + { + LOGGER.warn("No local instances found"); + throw new RuntimeException("No local instances found"); + } + return fetcher.callOnFirstAvailableInstance(instanceMetadata -> instanceMetadata.delegate().metadata()); + } + + private static Map<String, Map<Host, Set<TokenRange>>> perKeySpaceTokenRangesOfAllInstances(final Metadata metadata) + { + Map<String, Map<Host, Set<TokenRange>>> perKeyspaceTokenRanges = new HashMap<>(); + for (KeyspaceMetadata ks : metadata.getKeyspaces()) + { + Map<Host, Set<TokenRange>> perHostTokenRanges = new HashMap<>(); + for (Host host : metadata.getAllHosts()) + { + Set<TokenRange> tokenRanges = metadata.getTokenRanges(ks.getName(), host) + .stream() + .flatMap(range -> TokenRange.from(range).stream()) + .collect(Collectors.toSet()); + perHostTokenRanges.put(host, tokenRanges); + } + perKeyspaceTokenRanges.put(ks.getName(), perHostTokenRanges); + } + return perKeyspaceTokenRanges; + } + + public static Multimap<CassandraInstance, Range<BigInteger>> calculateTokenRanges(List<CassandraInstance> instances, + int replicationFactor, + Partitioner partitioner) + { + Preconditions.checkArgument(replicationFactor != 0, "Calculation token ranges wouldn't work with RF 0"); + Preconditions.checkArgument(instances.isEmpty() || replicationFactor <= instances.size(), + "Calculation token ranges wouldn't work when RF (" + + replicationFactor + ") is greater than number of Cassandra instances " + + instances.size()); + Multimap<CassandraInstance, Range<BigInteger>> tokenRanges = ArrayListMultimap.create(); + + for (int index = 0; index < instances.size(); ++index) + { + CassandraInstance instance = instances.get(index); + int disjointReplica = (instances.size() + index - replicationFactor) % instances.size(); + BigInteger rangeStart = new BigInteger((instances.get(disjointReplica)).token); + BigInteger rangeEnd = new BigInteger(instance.token); + if (rangeStart.compareTo(rangeEnd) >= 0) + { + tokenRanges.put(instance, Range.openClosed(rangeStart, partitioner.maximumToken().toBigInteger())); + if (!rangeEnd.equals(partitioner.minimumToken().toBigInteger())) + tokenRanges.put(instance, Range.openClosed(partitioner.minimumToken().toBigInteger(), rangeEnd)); + } + else + { + tokenRanges.put(instance, Range.openClosed(rangeStart, rangeEnd)); + } + } + return tokenRanges; + } + + /** + * Class to encapsule Cassandra instance data + */ + private static class CassandraInstance Review Comment: It is used by `protected` methods. To allow those method to remain use-able, you need to open the access to this class to at least `protected` ########## server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.coordination; + +import java.util.concurrent.CompletableFuture; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Future; +import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.client.retry.BasicRetryPolicy; +import org.apache.cassandra.sidecar.common.client.SidecarInstance; +import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl; +import org.apache.cassandra.sidecar.common.response.HealthResponse; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration; +import org.apache.cassandra.sidecar.utils.SidecarClientProvider; + + +/** + * Provides the health of a Sidecar instance over HTTP API, retrying to + * confirm Sidecar is DOWN for extended period of time. + */ +@Singleton +public class SidecarHttpHealthProvider implements SidecarPeerHealthProvider +{ + private final SidecarPeerHealthConfiguration config; + private final SidecarClientProvider clientProvider; + + @Inject + public SidecarHttpHealthProvider(SidecarConfiguration sidecarConfiguration, SidecarClientProvider clientProvider) + { + this.config = sidecarConfiguration.sidecarPeerHealthConfiguration(); + this.clientProvider = clientProvider; + } + + @Override + public Future<Health> health(SidecarInstance instance) + { + try + { + SidecarClient client = clientProvider.get(); + CompletableFuture<HealthResponse> healthRequest = client.executeRequestAsync(client.requestBuilder() + .singleInstanceSelectionPolicy( + new SidecarInstanceImpl(instance.hostname(), + instance.port())) + .retryPolicy(retryPolicy()) + .sidecarHealthRequest() Review Comment: I think this block belongs to `SidecarClient.java` ########## server/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java: ########## @@ -88,6 +88,25 @@ public enum SidecarServerEvents */ ON_SIDECAR_SCHEMA_INITIALIZED, + /** + * The {@link io.vertx.core.eventbus.EventBus} address where events will be published when the health of a Sidecar + * peer instance is marked as DOWN. + */ + ON_SIDECAR_PEER_DOWN, + + /** + * The {@link io.vertx.core.eventbus.EventBus} address where events will be published when the health of a Sidecar + * peer instance is marked as UP. + */ + ON_SIDECAR_PEER_UP, + + // LeadershipEvents + ON_TOKEN_RANGE_GAINED, + ON_TOKEN_RANGE_LOST, + + // RangeManagerEvents + ON_TOKEN_RANGE_CHANGED, Review Comment: They are not used. Are they needed by this patch? If not, please do not include the unrelated changes. ########## server/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java: ########## @@ -88,6 +88,25 @@ public enum SidecarServerEvents */ ON_SIDECAR_SCHEMA_INITIALIZED, + /** + * The {@link io.vertx.core.eventbus.EventBus} address where events will be published when the health of a Sidecar + * peer instance is marked as DOWN. + */ + ON_SIDECAR_PEER_DOWN, + + /** + * The {@link io.vertx.core.eventbus.EventBus} address where events will be published when the health of a Sidecar + * peer instance is marked as UP. + */ + ON_SIDECAR_PEER_UP, Review Comment: Enrich the javadoc. What are the message payload format of those 2 events? ########## server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.coordination; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.codecs.SidecarInstanceCodecs; +import org.apache.cassandra.sidecar.common.client.SidecarInstance; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_PEER_DOWN; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_PEER_UP; + +/** + * Pings other 'peer' Sidecar(s) that are relevant to this Sidecar over HTTP and notifies + * listeners when other Sidecar(s) goes DOWN or OK. + */ +@Singleton +public class SidecarPeerHealthMonitorTask implements PeriodicTask +{ + private static final Logger LOGGER = LoggerFactory.getLogger(SidecarPeerHealthMonitorTask.class); + + private final Vertx vertx; + private final SidecarPeerHealthConfiguration config; + private final SidecarPeerProvider sidecarPeerProvider; + private final SidecarPeerHealthProvider healthProvider; + + private final Map<SidecarInstance, SidecarPeerHealthProvider.Health> status = new ConcurrentHashMap<>(); + + @Inject + public SidecarPeerHealthMonitorTask(Vertx vertx, + SidecarConfiguration sidecarConfiguration, + SidecarPeerProvider sidecarPeerProvider, + SidecarPeerHealthProvider healthProvider, + SidecarInstanceCodecs sidecarInstanceCodecs) + { + this.vertx = vertx; + this.config = sidecarConfiguration.sidecarPeerHealthConfiguration(); + this.sidecarPeerProvider = sidecarPeerProvider; + this.healthProvider = healthProvider; + vertx.eventBus().registerDefaultCodec(SidecarInstance.class, sidecarInstanceCodecs); + } + + @NotNull + public SidecarPeerHealthProvider.Health status(InstanceMetadata instance) + { + return status.getOrDefault(instance, SidecarPeerHealthProvider.Health.UNKNOWN); + } + + @Override + public ScheduleDecision scheduleDecision() + { + return config.enabled() ? ScheduleDecision.EXECUTE : ScheduleDecision.SKIP; + } + + @Override + public DurationSpec delay() + { + return config.executeInterval(); + } + + @Override + public void execute(Promise<Void> promise) + { + try + { + run().onComplete(v -> promise.tryComplete()).wait(10000); Review Comment: 1. `.wait(10000)` is not wanted. Can you remove it? You do not want to block the thread. 2. `onComplete(v -> promise.tryComplete())` is missing the failure from `run()`. ```suggestion run() .onSuccess(v -> promise.tryComplete()) .onFailure(promise::tryFail); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.coordination; + +import java.util.concurrent.CompletableFuture; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Future; +import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.client.retry.BasicRetryPolicy; +import org.apache.cassandra.sidecar.common.client.SidecarInstance; +import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl; +import org.apache.cassandra.sidecar.common.response.HealthResponse; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration; +import org.apache.cassandra.sidecar.utils.SidecarClientProvider; + + +/** + * Provides the health of a Sidecar instance over HTTP API, retrying to + * confirm Sidecar is DOWN for extended period of time. + */ +@Singleton +public class SidecarHttpHealthProvider implements SidecarPeerHealthProvider +{ + private final SidecarPeerHealthConfiguration config; + private final SidecarClientProvider clientProvider; + + @Inject + public SidecarHttpHealthProvider(SidecarConfiguration sidecarConfiguration, SidecarClientProvider clientProvider) + { + this.config = sidecarConfiguration.sidecarPeerHealthConfiguration(); + this.clientProvider = clientProvider; + } + + @Override + public Future<Health> health(SidecarInstance instance) + { + try + { + SidecarClient client = clientProvider.get(); + CompletableFuture<HealthResponse> healthRequest = client.executeRequestAsync(client.requestBuilder() + .singleInstanceSelectionPolicy( + new SidecarInstanceImpl(instance.hostname(), + instance.port())) + .retryPolicy(retryPolicy()) + .sidecarHealthRequest() + .build()); + return Future.fromCompletionStage(healthRequest) + .map(healthResponse -> healthResponse.isOk() + ? Health.OK + : Health.DOWN); + } + catch (Exception e) + { + return Future.succeededFuture(Health.DOWN); Review Comment: Looks like that `UNKNOWN` is not really used anywhere. Can we remove it? If it make sense to have `UNKNOWN`, can you use it? ########## server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.coordination; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.codecs.SidecarInstanceCodecs; +import org.apache.cassandra.sidecar.common.client.SidecarInstance; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_PEER_DOWN; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_PEER_UP; + +/** + * Pings other 'peer' Sidecar(s) that are relevant to this Sidecar over HTTP and notifies + * listeners when other Sidecar(s) goes DOWN or OK. + */ +@Singleton +public class SidecarPeerHealthMonitorTask implements PeriodicTask +{ + private static final Logger LOGGER = LoggerFactory.getLogger(SidecarPeerHealthMonitorTask.class); + + private final Vertx vertx; + private final SidecarPeerHealthConfiguration config; + private final SidecarPeerProvider sidecarPeerProvider; + private final SidecarPeerHealthProvider healthProvider; + + private final Map<SidecarInstance, SidecarPeerHealthProvider.Health> status = new ConcurrentHashMap<>(); + + @Inject + public SidecarPeerHealthMonitorTask(Vertx vertx, + SidecarConfiguration sidecarConfiguration, + SidecarPeerProvider sidecarPeerProvider, + SidecarPeerHealthProvider healthProvider, + SidecarInstanceCodecs sidecarInstanceCodecs) + { + this.vertx = vertx; + this.config = sidecarConfiguration.sidecarPeerHealthConfiguration(); + this.sidecarPeerProvider = sidecarPeerProvider; + this.healthProvider = healthProvider; + vertx.eventBus().registerDefaultCodec(SidecarInstance.class, sidecarInstanceCodecs); + } + + @NotNull + public SidecarPeerHealthProvider.Health status(InstanceMetadata instance) + { + return status.getOrDefault(instance, SidecarPeerHealthProvider.Health.UNKNOWN); + } + + @Override + public ScheduleDecision scheduleDecision() + { + return config.enabled() ? ScheduleDecision.EXECUTE : ScheduleDecision.SKIP; + } + + @Override + public DurationSpec delay() + { + return config.executeInterval(); + } + + @Override + public void execute(Promise<Void> promise) + { + try + { + run().onComplete(v -> promise.tryComplete()).wait(10000); + } + catch (Throwable t) + { + LOGGER.error("Unexpected error running down detector", t); + promise.fail(t); + } + } + + // internal methods + protected Future<CompositeFuture> run() + { + Set<SidecarInstance> sidecarPeers = sidecarPeerProvider.get(); + if (sidecarPeers.isEmpty()) + { + LOGGER.warn("No Sidecar sidecarPeers detected"); + return Future.succeededFuture(); + } + + List<Future<SidecarPeerHealthProvider.Health>> futures = + sidecarPeers.stream() + .map(instance -> healthProvider.health(instance) + .onSuccess(health -> { + updateHealth(instance, health); + }) + .onFailure(throwable -> { + LOGGER.error("Failed to run health check, marking instance as DOWN host={} port={}", + instance.hostname(), instance.port(), throwable); + markDown(instance); + })) + .collect(Collectors.toList()); Review Comment: Both `onSuccess` and `onFailure` are not guaranteed to be executed when `Future.all(...)` completes. There could be a race condition between the block from line#135 to line#144 and the lambda at line#125. To ensure the completion handling is invoked before the completion of per-peer future, update the code to the following. (Use `andThen`) ```suggestion sidecarPeers.stream() .map(instance -> healthProvider.health(instance) .andThen(ar -> { if (ar.succeeded()) { updateHealth(instance, ar.result()); } else { LOGGER.error("Failed to run health check, marking instance as DOWN host={} port={}", instance.hostname(), instance.port(), ar.cause()); markDown(instance); } })) .collect(Collectors.toList()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org