yifan-c commented on code in PR #189:
URL: https://github.com/apache/cassandra-sidecar/pull/189#discussion_r1966204577


##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraClientTokenRingProvider.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.coordination;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Range;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Token;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.cluster.locator.LocalTokenRangesProvider;
+import org.apache.cassandra.sidecar.common.client.SidecarInstance;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioners;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
+import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+
+/**
+ * Class for getting token range related information using cassandra client's 
session.
+ * Token ranges are cached to avoid making dns calls when cluster topology has 
not changed.
+ */
+@Singleton
+public class CassandraClientTokenRingProvider extends TokenRingProvider 
implements LocalTokenRangesProvider
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClientTokenRingProvider.class);
+    @GuardedBy("this")
+    private volatile Map<String, Map<String, List<Range<BigInteger>>>> 
assignedRangesOfAllInstancesByDcCache = null;
+    @GuardedBy("this")
+    private volatile Map<Host, Integer> localHostsCache = null;
+    @GuardedBy("this")
+    private volatile Set<Host> allInstancesCache = null;
+
+
+    @Inject
+    public CassandraClientTokenRingProvider(InstancesMetadata 
instancesMetadata, InstanceMetadataFetcher instanceMetadataFetcher, DnsResolver 
dnsResolver)
+    {
+        super(instancesMetadata, instanceMetadataFetcher, dnsResolver);
+    }
+
+    @Override
+    @Nullable
+    public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace, 
boolean forceRefresh)
+    {
+        checkAndReloadReloadCaches();
+        Metadata metadata = 
instancesMetadata.instances().get(0).delegate().metadata();
+        if (keyspace == null || metadata.getKeyspace(keyspace) == null)
+        {
+            throw new NoSuchElementException("Keyspace does not exist. 
keyspace: " + keyspace);
+        }
+        return perKeySpaceTokenRangesOfAllInstances(metadata).get(keyspace)
+                                                             .entrySet()
+                                                             .stream()
+                                                             .filter(entry -> 
localHostsCache.containsKey(entry.getKey()))
+                                                             
.collect(Collectors.toMap(entry -> localHostsCache.get(entry.getKey()), 
Map.Entry::getValue));
+    }
+
+    public Set<Host> localInstances()
+    {
+        checkAndReloadReloadCaches();
+        return localHostsCache.keySet();
+    }
+
+    public Set<Host> allInstances()
+    {
+        checkAndReloadReloadCaches();
+        return allInstancesCache;
+    }
+
+    @Override
+    protected Map<String, List<Range<BigInteger>>> 
getAllTokenRanges(Partitioner partitioner,
+                                                                     String dc)
+    {
+        checkAndReloadReloadCaches();
+        return assignedRangesOfAllInstancesByDcCache.entrySet()
+                                                    .stream()
+                                                    .filter(entry -> dc == 
null || dc.equalsIgnoreCase(entry.getKey()))
+                                                    .flatMap(entry -> 
entry.getValue().entrySet().stream())
+                                                    
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    @Override
+    public Map<String, List<Range<BigInteger>>> 
getPrimaryRanges(SidecarInstance instance,
+                                                                 String dc)
+    {
+        checkAndReloadReloadCaches();
+        return assignedRangesOfAllInstancesByDcCache.entrySet()
+                                                    .stream()
+                                                    .filter(entry -> dc == 
null || dc.equalsIgnoreCase(entry.getKey()))
+                                                    .flatMap(entry -> 
entry.getValue().entrySet().stream())
+                                                    .filter(entry -> 
matchesSidecar(entry.getKey(), instance))
+                                                    
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    @Override
+    public Set<String> dcs()
+    {
+        List<InstanceMetadata> localInstances = instancesMetadata.instances();
+        Metadata metadata = validatedMetadata(localInstances);
+        return 
metadata.getAllHosts().stream().map(Host::getDatacenter).collect(Collectors.toSet());
+    }
+
+    @Override
+    public Partitioner partitioner()
+    {
+        return 
extractPartitioner(instancesMetadata.instances().get(0).delegate().metadata());
+    }
+
+    public static Partitioner extractPartitioner(Metadata metadata)
+    {
+        String[] tokens = metadata.getPartitioner().split("\\.");
+        return Partitioners.from(tokens[tokens.length - 1]);
+    }
+
+    private void checkAndReloadReloadCaches()
+    {
+        List<InstanceMetadata> localInstances = instancesMetadata.instances();
+        Metadata metadata = validatedMetadata(localInstances);
+
+        Set<Integer> localInstanceIds = 
localInstances.stream().map(InstanceMetadata::id).collect(Collectors.toSet());
+        if (localHostsCache == null
+            || !new 
HashSet<>(localHostsCache.values()).equals(localInstanceIds)
+            || allInstancesCache == null
+            || !allInstancesCache.equals(metadata.getAllHosts()))
+        {
+            synchronized (this)
+            {
+                // If cluster configuration changes.
+                localHostsCache = localInstanceIds();
+                allInstancesCache = metadata.getAllHosts();
+                assignedRangesOfAllInstancesByDcCache = 
assignedRangesOfAllInstancesByDc(metadata);
+            }
+        }
+    }
+
+    private boolean matchesSidecar(String hostIp, SidecarInstance 
sidecarInstance)
+    {
+        return getIp(sidecarInstance.hostname()).equals(hostIp);
+    }
+
+    public Map<String, Map<String, List<Range<BigInteger>>>> 
assignedRangesOfAllInstancesByDc(Metadata metadata)
+    {
+        return assignedRangesOfAllInstancesByDc(dnsResolver, metadata);
+    }
+
+    @VisibleForTesting
+    public static Map<String, Map<String, List<Range<BigInteger>>>> 
assignedRangesOfAllInstancesByDc(DnsResolver dnsResolver, Metadata metadata)
+    {
+        Partitioner partitioner = extractPartitioner(metadata);
+        Map<String, List<CassandraInstance>> perDcHosts = new HashMap<>(4);
+        for (Host host : metadata.getAllHosts())
+        {
+            Token minToken = 
host.getTokens().stream().min(Comparable::compareTo).orElseThrow(() -> new 
RuntimeException("No token found for host: " + host));
+            perDcHosts.computeIfAbsent(host.getDatacenter(), (dc) -> new 
ArrayList<>())
+                      .add(new CassandraInstance(tokenToString(minToken), 
getIpFromHost(dnsResolver, host)));
+        }
+        perDcHosts.forEach((dc, hosts) -> hosts.sort(Comparator.comparing(o -> 
new BigInteger(o.token))));
+
+        return perDcHosts.entrySet().stream()
+                         .collect(Collectors.toMap(Map.Entry::getKey, e -> 
calculateTokenRanges(partitioner, e.getValue())));
+    }
+
+    protected static String tokenToString(Token token)
+    {
+        if (token.getType() == DataType.bigint())
+        {
+            return Long.toString((long) token.getValue());
+        }
+        else if (token.getType() == DataType.varint())
+        {
+            return token.getValue().toString();
+        }
+        throw new UnsupportedOperationException("Unsupported token type: " + 
token.getType());
+    }
+
+    protected static Map<String, List<Range<BigInteger>>> 
calculateTokenRanges(Partitioner partitioner, List<CassandraInstance> 
sortedPerDcHosts)
+    {
+        // RingTopologyRefresher.calculate...
+        return calculateTokenRanges(sortedPerDcHosts, 1, partitioner)
+               .entries().stream()
+               .collect(Collectors.groupingBy(e -> e.getKey().node, 
Collectors.mapping(Map.Entry::getValue, Collectors.toList())));
+    }
+
+    private Map<Host, Integer> localInstanceIds()
+    {
+        List<InstanceMetadata> localInstances = instancesMetadata.instances();
+        Set<Host> hosts = 
localInstances.get(0).delegate().metadata().getAllHosts();
+        Map<String, Integer> localIps = localInstances.stream()
+                                                      
.collect(Collectors.toMap(instanceMetadata -> getIp(instanceMetadata.host()), 
InstanceMetadata::id));
+        Map<String, Host> ipToHost = hosts.stream()
+                                          
.collect(Collectors.toMap(this::getIpFromHost, Function.identity()));
+
+        return localIps.entrySet()
+                       .stream()
+                       .collect(Collectors.toMap(entry -> 
ipToHost.get(entry.getKey()), entry -> localIps.get(entry.getKey())));
+    }
+
+    private Metadata validatedMetadata(List<InstanceMetadata> localInstances)
+    {
+        if (localInstances.isEmpty())
+        {
+            LOGGER.warn("No local instances found");
+            throw new RuntimeException("No local instances found");
+        }
+        return fetcher.callOnFirstAvailableInstance(instanceMetadata -> 
instanceMetadata.delegate().metadata());
+    }
+
+    private static Map<String, Map<Host, Set<TokenRange>>> 
perKeySpaceTokenRangesOfAllInstances(final Metadata metadata)
+    {
+        Map<String, Map<Host, Set<TokenRange>>> perKeyspaceTokenRanges = new 
HashMap<>();
+        for (KeyspaceMetadata ks : metadata.getKeyspaces())
+        {
+            Map<Host, Set<TokenRange>> perHostTokenRanges = new HashMap<>();
+            for (Host host : metadata.getAllHosts())
+            {
+                Set<TokenRange> tokenRanges = 
metadata.getTokenRanges(ks.getName(), host)
+                                                      .stream()
+                                                      .flatMap(range -> 
TokenRange.from(range).stream())
+                                                      
.collect(Collectors.toSet());
+                perHostTokenRanges.put(host, tokenRanges);
+            }
+            perKeyspaceTokenRanges.put(ks.getName(), perHostTokenRanges);
+        }
+        return perKeyspaceTokenRanges;
+    }
+
+    public static Multimap<CassandraInstance, Range<BigInteger>> 
calculateTokenRanges(List<CassandraInstance> instances,
+                                                                               
       int replicationFactor,
+                                                                               
       Partitioner partitioner)
+    {
+        Preconditions.checkArgument(replicationFactor != 0, "Calculation token 
ranges wouldn't work with RF 0");
+        Preconditions.checkArgument(instances.isEmpty() || replicationFactor 
<= instances.size(),
+                                    "Calculation token ranges wouldn't work 
when RF ("
+                                    + replicationFactor + ") is greater than 
number of Cassandra instances "
+                                    + instances.size());
+        Multimap<CassandraInstance, Range<BigInteger>> tokenRanges = 
ArrayListMultimap.create();
+
+        for (int index = 0; index < instances.size(); ++index)
+        {
+            CassandraInstance instance = instances.get(index);
+            int disjointReplica = (instances.size() + index - 
replicationFactor) % instances.size();
+            BigInteger rangeStart = new 
BigInteger((instances.get(disjointReplica)).token);
+            BigInteger rangeEnd = new BigInteger(instance.token);
+            if (rangeStart.compareTo(rangeEnd) >= 0)
+            {
+                tokenRanges.put(instance, Range.openClosed(rangeStart, 
partitioner.maximumToken().toBigInteger()));
+                if 
(!rangeEnd.equals(partitioner.minimumToken().toBigInteger()))
+                    tokenRanges.put(instance, 
Range.openClosed(partitioner.minimumToken().toBigInteger(), rangeEnd));
+            }
+            else
+            {
+                tokenRanges.put(instance, Range.openClosed(rangeStart, 
rangeEnd));
+            }
+        }
+        return tokenRanges;
+    }
+
+    /**
+     * Class to encapsule Cassandra instance data
+     */
+    private static class CassandraInstance

Review Comment:
   It is used by `protected` methods. To allow those method to remain use-able, 
you need to open the access to this class to at least `protected`



##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.coordination;
+
+import java.util.concurrent.CompletableFuture;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.client.retry.BasicRetryPolicy;
+import org.apache.cassandra.sidecar.common.client.SidecarInstance;
+import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
+import org.apache.cassandra.sidecar.common.response.HealthResponse;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration;
+import org.apache.cassandra.sidecar.utils.SidecarClientProvider;
+
+
+/**
+ * Provides the health of a Sidecar instance over HTTP API, retrying to
+ * confirm Sidecar is DOWN for extended period of time.
+ */
+@Singleton
+public class SidecarHttpHealthProvider implements SidecarPeerHealthProvider
+{
+    private final SidecarPeerHealthConfiguration config;
+    private final SidecarClientProvider clientProvider;
+
+    @Inject
+    public SidecarHttpHealthProvider(SidecarConfiguration 
sidecarConfiguration, SidecarClientProvider clientProvider)
+    {
+        this.config = sidecarConfiguration.sidecarPeerHealthConfiguration();
+        this.clientProvider = clientProvider;
+    }
+
+    @Override
+    public Future<Health> health(SidecarInstance instance)
+    {
+        try
+        {
+            SidecarClient client = clientProvider.get();
+            CompletableFuture<HealthResponse> healthRequest = 
client.executeRequestAsync(client.requestBuilder()
+                                                                               
                .singleInstanceSelectionPolicy(
+                                                                               
                new SidecarInstanceImpl(instance.hostname(),
+                                                                               
                                        instance.port()))
+                                                                               
                .retryPolicy(retryPolicy())
+                                                                               
                .sidecarHealthRequest()

Review Comment:
   I think this block belongs to `SidecarClient.java`



##########
server/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java:
##########
@@ -88,6 +88,25 @@ public enum SidecarServerEvents
      */
     ON_SIDECAR_SCHEMA_INITIALIZED,
 
+    /**
+     * The {@link io.vertx.core.eventbus.EventBus} address where events will 
be published when the health of a Sidecar
+     * peer instance is marked as DOWN.
+     */
+    ON_SIDECAR_PEER_DOWN,
+
+    /**
+     * The {@link io.vertx.core.eventbus.EventBus} address where events will 
be published when the health of a Sidecar
+     * peer instance is marked as UP.
+     */
+    ON_SIDECAR_PEER_UP,
+
+    // LeadershipEvents
+    ON_TOKEN_RANGE_GAINED,
+    ON_TOKEN_RANGE_LOST,
+
+    // RangeManagerEvents
+    ON_TOKEN_RANGE_CHANGED,

Review Comment:
   They are not used. Are they needed by this patch? If not, please do not 
include the unrelated changes. 



##########
server/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java:
##########
@@ -88,6 +88,25 @@ public enum SidecarServerEvents
      */
     ON_SIDECAR_SCHEMA_INITIALIZED,
 
+    /**
+     * The {@link io.vertx.core.eventbus.EventBus} address where events will 
be published when the health of a Sidecar
+     * peer instance is marked as DOWN.
+     */
+    ON_SIDECAR_PEER_DOWN,
+
+    /**
+     * The {@link io.vertx.core.eventbus.EventBus} address where events will 
be published when the health of a Sidecar
+     * peer instance is marked as UP.
+     */
+    ON_SIDECAR_PEER_UP,

Review Comment:
   Enrich the javadoc. What are the message payload format of those 2 events? 



##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.coordination;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.codecs.SidecarInstanceCodecs;
+import org.apache.cassandra.sidecar.common.client.SidecarInstance;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_PEER_DOWN;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_PEER_UP;
+
+/**
+ * Pings other 'peer' Sidecar(s) that are relevant to this Sidecar over HTTP 
and notifies
+ * listeners when other Sidecar(s) goes DOWN or OK.
+ */
+@Singleton
+public class SidecarPeerHealthMonitorTask implements PeriodicTask
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarPeerHealthMonitorTask.class);
+
+    private final Vertx vertx;
+    private final SidecarPeerHealthConfiguration config;
+    private final SidecarPeerProvider sidecarPeerProvider;
+    private final SidecarPeerHealthProvider healthProvider;
+
+    private final Map<SidecarInstance, SidecarPeerHealthProvider.Health> 
status = new ConcurrentHashMap<>();
+
+    @Inject
+    public SidecarPeerHealthMonitorTask(Vertx vertx,
+                                        SidecarConfiguration 
sidecarConfiguration,
+                                        SidecarPeerProvider 
sidecarPeerProvider,
+                                        SidecarPeerHealthProvider 
healthProvider,
+                                        SidecarInstanceCodecs 
sidecarInstanceCodecs)
+    {
+        this.vertx = vertx;
+        this.config = sidecarConfiguration.sidecarPeerHealthConfiguration();
+        this.sidecarPeerProvider = sidecarPeerProvider;
+        this.healthProvider = healthProvider;
+        vertx.eventBus().registerDefaultCodec(SidecarInstance.class, 
sidecarInstanceCodecs);
+    }
+
+    @NotNull
+    public SidecarPeerHealthProvider.Health status(InstanceMetadata instance)
+    {
+        return status.getOrDefault(instance, 
SidecarPeerHealthProvider.Health.UNKNOWN);
+    }
+
+    @Override
+    public ScheduleDecision scheduleDecision()
+    {
+        return config.enabled() ? ScheduleDecision.EXECUTE : 
ScheduleDecision.SKIP;
+    }
+
+    @Override
+    public DurationSpec delay()
+    {
+        return config.executeInterval();
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            run().onComplete(v -> promise.tryComplete()).wait(10000);

Review Comment:
   1. `.wait(10000)` is not wanted. Can you remove it? You do not want to block 
the thread. 
   2. `onComplete(v -> promise.tryComplete())` is missing the failure from 
`run()`.
   
   
   ```suggestion
               run()
               .onSuccess(v -> promise.tryComplete())
               .onFailure(promise::tryFail);
   ```
   



##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.coordination;
+
+import java.util.concurrent.CompletableFuture;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.client.retry.BasicRetryPolicy;
+import org.apache.cassandra.sidecar.common.client.SidecarInstance;
+import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
+import org.apache.cassandra.sidecar.common.response.HealthResponse;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration;
+import org.apache.cassandra.sidecar.utils.SidecarClientProvider;
+
+
+/**
+ * Provides the health of a Sidecar instance over HTTP API, retrying to
+ * confirm Sidecar is DOWN for extended period of time.
+ */
+@Singleton
+public class SidecarHttpHealthProvider implements SidecarPeerHealthProvider
+{
+    private final SidecarPeerHealthConfiguration config;
+    private final SidecarClientProvider clientProvider;
+
+    @Inject
+    public SidecarHttpHealthProvider(SidecarConfiguration 
sidecarConfiguration, SidecarClientProvider clientProvider)
+    {
+        this.config = sidecarConfiguration.sidecarPeerHealthConfiguration();
+        this.clientProvider = clientProvider;
+    }
+
+    @Override
+    public Future<Health> health(SidecarInstance instance)
+    {
+        try
+        {
+            SidecarClient client = clientProvider.get();
+            CompletableFuture<HealthResponse> healthRequest = 
client.executeRequestAsync(client.requestBuilder()
+                                                                               
                .singleInstanceSelectionPolicy(
+                                                                               
                new SidecarInstanceImpl(instance.hostname(),
+                                                                               
                                        instance.port()))
+                                                                               
                .retryPolicy(retryPolicy())
+                                                                               
                .sidecarHealthRequest()
+                                                                               
                .build());
+            return Future.fromCompletionStage(healthRequest)
+                         .map(healthResponse -> healthResponse.isOk()
+                                                ? Health.OK
+                                                : Health.DOWN);
+        }
+        catch (Exception e)
+        {
+            return Future.succeededFuture(Health.DOWN);

Review Comment:
   Looks like that `UNKNOWN` is not really used anywhere. Can we remove it? If 
it make sense to have `UNKNOWN`, can you use it?



##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.coordination;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.codecs.SidecarInstanceCodecs;
+import org.apache.cassandra.sidecar.common.client.SidecarInstance;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_PEER_DOWN;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_PEER_UP;
+
+/**
+ * Pings other 'peer' Sidecar(s) that are relevant to this Sidecar over HTTP 
and notifies
+ * listeners when other Sidecar(s) goes DOWN or OK.
+ */
+@Singleton
+public class SidecarPeerHealthMonitorTask implements PeriodicTask
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarPeerHealthMonitorTask.class);
+
+    private final Vertx vertx;
+    private final SidecarPeerHealthConfiguration config;
+    private final SidecarPeerProvider sidecarPeerProvider;
+    private final SidecarPeerHealthProvider healthProvider;
+
+    private final Map<SidecarInstance, SidecarPeerHealthProvider.Health> 
status = new ConcurrentHashMap<>();
+
+    @Inject
+    public SidecarPeerHealthMonitorTask(Vertx vertx,
+                                        SidecarConfiguration 
sidecarConfiguration,
+                                        SidecarPeerProvider 
sidecarPeerProvider,
+                                        SidecarPeerHealthProvider 
healthProvider,
+                                        SidecarInstanceCodecs 
sidecarInstanceCodecs)
+    {
+        this.vertx = vertx;
+        this.config = sidecarConfiguration.sidecarPeerHealthConfiguration();
+        this.sidecarPeerProvider = sidecarPeerProvider;
+        this.healthProvider = healthProvider;
+        vertx.eventBus().registerDefaultCodec(SidecarInstance.class, 
sidecarInstanceCodecs);
+    }
+
+    @NotNull
+    public SidecarPeerHealthProvider.Health status(InstanceMetadata instance)
+    {
+        return status.getOrDefault(instance, 
SidecarPeerHealthProvider.Health.UNKNOWN);
+    }
+
+    @Override
+    public ScheduleDecision scheduleDecision()
+    {
+        return config.enabled() ? ScheduleDecision.EXECUTE : 
ScheduleDecision.SKIP;
+    }
+
+    @Override
+    public DurationSpec delay()
+    {
+        return config.executeInterval();
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            run().onComplete(v -> promise.tryComplete()).wait(10000);
+        }
+        catch (Throwable t)
+        {
+            LOGGER.error("Unexpected error running down detector", t);
+            promise.fail(t);
+        }
+    }
+
+    // internal methods
+    protected Future<CompositeFuture> run()
+    {
+        Set<SidecarInstance> sidecarPeers = sidecarPeerProvider.get();
+        if (sidecarPeers.isEmpty())
+        {
+            LOGGER.warn("No Sidecar sidecarPeers detected");
+            return Future.succeededFuture();
+        }
+
+        List<Future<SidecarPeerHealthProvider.Health>> futures =
+        sidecarPeers.stream()
+                    .map(instance -> healthProvider.health(instance)
+                                                   .onSuccess(health -> {
+                                                       updateHealth(instance, 
health);
+                                                   })
+                                                   .onFailure(throwable -> {
+                                                       LOGGER.error("Failed to 
run health check, marking instance as DOWN host={} port={}",
+                                                                    
instance.hostname(), instance.port(), throwable);
+                                                       markDown(instance);
+                                                   }))
+                    .collect(Collectors.toList());

Review Comment:
   Both `onSuccess` and `onFailure` are not guaranteed to be executed when 
`Future.all(...)` completes. There could be a race condition between the block 
from line#135 to line#144 and the lambda at line#125. 
   
   To ensure the completion handling is invoked before the completion of 
per-peer future, update the code to the following. (Use `andThen`)
   
   ```suggestion
           sidecarPeers.stream()
                       .map(instance ->
                            healthProvider.health(instance)
                                          .andThen(ar -> {
                                              if (ar.succeeded())
                                              {
                                                  updateHealth(instance, 
ar.result());
                                              }
                                              else
                                              {
                                                  LOGGER.error("Failed to run 
health check, marking instance as DOWN host={} port={}",
                                                               
instance.hostname(), instance.port(), ar.cause());
                                                  markDown(instance);
                                              }
                                          }))
                       .collect(Collectors.toList());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org


Reply via email to