frankgh commented on code in PR #74:
URL: https://github.com/apache/cassandra-sidecar/pull/74#discussion_r1378018015


##########
src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java:
##########
@@ -103,7 +117,11 @@ void setup(AbstractCassandraTestContext 
cassandraTestContext) throws Interrupted
 
         server.start()
               .onSuccess(s -> {
+                  // TODO: DTR - Do we still need the new scheduleHealthCheck 
here (and the sleep below)?
+                  scheduleHealthCheck(5000); // Check health frequently so 
adapters are created in time for tests
                   
sidecarTestContext.registerInstanceConfigListener(this::healthCheck);
+                  // Give everything a moment to get started and connected
+                  Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
                   if (!sidecarTestContext.isClusterBuilt())
                   {
                       context.completeNow();

Review Comment:
   ```suggestion
                     if (!sidecarTestContext.isClusterBuilt())
                     {
                         vertx.setTimer(TimeUnit.SECONDS.toMillis(1), id -> 
context.completeNow());
   ```



##########
common/src/main/java/com/datastax/driver/core/DriverExtensions.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datastax.driver.core;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A collection of methods that require access to package-private members in 
the datastax driver.
+ */
+

Review Comment:
   nit : remove extra empty line



##########
src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java:
##########
@@ -292,6 +306,18 @@ public Builder 
healthCheckConfiguration(HealthCheckConfiguration healthCheckConf
             return update(b -> b.healthCheckConfiguration = 
healthCheckConfiguration);
         }
 
+        /**
+         * Sets the {@code driverConfiguration} and returns a reference to 
this Builder enabling
+         * method chaining.
+         *
+         * @param driverConfiguration the {@code 
cassandraInputValidationConfiguration} to set
+         * @return a reference to this Builder
+         */
+        public Builder 
cassandraInputValidationConfiguration(DriverConfiguration driverConfiguration)

Review Comment:
   ```suggestion
           public Builder driverConfiguration(DriverConfiguration 
driverConfiguration)
   ```



##########
src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public interface DriverConfiguration
+{
+    @JsonProperty("contact_points")
+    List<InetSocketAddress> contactPoints();
+
+    @JsonProperty("num_connections")

Review Comment:
   Annotations under the implementation instead?
   ```suggestion
   ```



##########
common/src/main/java/org/apache/cassandra/sidecar/common/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra 
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections 
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport 
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+    public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+    private static final Logger logger = 
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+    private final Set<Host> selectedHosts = new HashSet<>();
+    private final Set<InetSocketAddress> localHostAddresses;
+    private final LoadBalancingPolicy childPolicy;
+    private final int totalRequestedConnections;
+    private final Random secureRandom = new SecureRandom();
+    private final HashSet<Host> allHosts = new HashSet<>();
+    private Cluster cluster;
+
+    public SidecarLoadBalancingPolicy(List<InetSocketAddress> 
localHostAddresses,
+                                      String localDc,
+                                      int numAdditionalConnections)
+    {
+        this.childPolicy = getChildPolicy(localDc);
+        this.localHostAddresses = new HashSet<>(localHostAddresses);
+        if (numAdditionalConnections < MIN_ADDITIONAL_CONNECTIONS)
+        {
+            logger.warn("Additional instances requested was {}, which is less 
than the minimum of {}. Using {}.",
+                        numAdditionalConnections, MIN_ADDITIONAL_CONNECTIONS, 
MIN_ADDITIONAL_CONNECTIONS);
+            numAdditionalConnections = MIN_ADDITIONAL_CONNECTIONS;
+        }
+        this.totalRequestedConnections = this.localHostAddresses.size() + 
numAdditionalConnections;
+    }
+
+    private LoadBalancingPolicy getChildPolicy(String localDc)
+    {
+        if (localDc != null)
+        {
+            return 
DCAwareRoundRobinPolicy.builder().withLocalDc(localDc).build();
+        }
+        return new RoundRobinPolicy();
+    }
+
+    public void init(Cluster cluster, Collection<Host> hosts)
+    {
+        this.cluster = cluster;
+        this.allHosts.addAll(hosts);
+        recalculateSelectedHosts();
+        childPolicy.init(cluster, hosts);
+    }
+
+    public HostDistance distance(Host host)
+    {
+        if (!selectedHosts.contains(host))
+        {
+            return HostDistance.IGNORED;
+        }
+        return childPolicy.distance(host);
+    }
+
+    public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement 
statement)
+    {
+        Iterator<Host> child = childPolicy.newQueryPlan(loggedKeyspace, 
statement);
+        // Filter the child policy to only selected hosts
+        return Iterators.filter(child, selectedHosts::contains);
+    }
+
+    public synchronized void onAdd(Host host)
+    {
+        this.allHosts.add(host);
+        childPolicy.onAdd(host);
+        onUp(host);
+    }
+
+    public synchronized void onUp(Host host)
+    {
+        if (selectedHosts.size() < totalRequestedConnections)
+        {
+            recalculateSelectedHosts();
+        }
+        childPolicy.onUp(host);
+    }
+
+    public synchronized void onDown(Host host)
+    {
+        // Don't remove local addresses from the selected host list
+        if (localHostAddresses.contains(host.getBroadcastRpcAddress()))
+        {
+            return;
+        }
+
+        boolean wasSelected = selectedHosts.remove(host);
+        if (!wasSelected)
+        {
+            // Non-selected nodes have been marked with HostDistance.IGNORED
+            // even if they may otherwise be useful. This has a side effect
+            // of preventing the driver from trying to reconnect to them
+            // if we miss the `onUp` event, so we need to schedule reconnects
+            // for these hosts explicitly unless we have active connections.
+            DriverExtensions.startPeriodicReconnectionAttempt(cluster, host);
+        }
+        recalculateSelectedHosts();
+        childPolicy.onDown(host);
+    }
+
+    public synchronized void onRemove(Host host)
+    {
+        this.allHosts.remove(host);
+        onDown(host);
+        childPolicy.onRemove(host);
+    }
+
+    public void close()
+    {
+        childPolicy.close();
+    }
+
+    /**
+     * Adds any local hosts to the selected host list
+     *
+     * @param hosts the list of hosts provided by the control connection
+     */
+    private void addLocalHosts(List<Host> hosts)
+    {
+        Iterator<Host> hostIterator = hosts.iterator();
+        while (hostIterator.hasNext())
+        {
+            Host h = hostIterator.next();
+            if (localHostAddresses.contains(h.getEndPoint().resolve()))
+            {
+                selectedHosts.add(h);
+                hostIterator.remove();
+            }
+        }
+    }
+
+    private synchronized void recalculateSelectedHosts()
+    {
+        // Copy the list to allow us to remove hosts as we build the list
+        List<Host> hosts = new ArrayList<>(this.allHosts);

Review Comment:
   should this be a set instead? given that we will add local hosts in the line 
below, we should avoid adding the local hosts twice if we ever encounter that 
situation



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java:
##########
@@ -42,16 +49,18 @@ public class CassandraAdapter implements ICassandraAdapter
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraAdapter.class);
     protected final DnsResolver dnsResolver;
     protected final JmxClient jmxClient;
-    private final CQLSessionProvider session;
+    private final CQLSessionProvider cqlSessionProvider;
     private final String sidecarVersion;
+    private final InetSocketAddress broadcastRpcAddress;
 
-    public CassandraAdapter(DnsResolver dnsResolver, JmxClient jmxClient, 
CQLSessionProvider session,
-                            String sidecarVersion)
+    public CassandraAdapter(DnsResolver dnsResolver, JmxClient jmxClient, 
CQLSessionProvider cqlSessionProvider,
+                            String sidecarVersion, InetSocketAddress 
broadcastRpcAddress)

Review Comment:
   I feel we need a better name for the `broadcastRpcAddress` variable



##########
src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java:
##########
@@ -68,28 +70,40 @@
  */
 public abstract class IntegrationTestBase
 {
-    protected Logger logger = LoggerFactory.getLogger(this.getClass());
-    protected Vertx vertx;
-    protected Server server;
-
     protected static final String TEST_KEYSPACE = "testkeyspace";
-    private static final String TEST_TABLE_PREFIX = "testtable";
-
     protected static final int DEFAULT_RF = 3;
+    private static final String TEST_TABLE_PREFIX = "testtable";
     private static final AtomicInteger TEST_TABLE_ID = new AtomicInteger(0);
+    protected Logger logger = LoggerFactory.getLogger(this.getClass());
+    protected Vertx vertx;
+    protected Server server;
     protected CassandraSidecarTestContext sidecarTestContext;
+    protected Injector injector;
+    private long healthCheckTimerId;
+    private InstancesConfig instancesConfig;
+
+    private static QualifiedTableName uniqueTestTableFullName()

Review Comment:
   private static methods should go at the end



##########
common/src/main/java/org/apache/cassandra/sidecar/common/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra 
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections 
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport 
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+    public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+    private static final Logger logger = 
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);

Review Comment:
   ```suggestion
       private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
   ```



##########
src/main/java/org/apache/cassandra/sidecar/server/MainModule.java:
##########
@@ -365,22 +381,21 @@ public SidecarStats sidecarStats()
      * @param vertx                      the vertx instance
      * @param cassandraInstance          the cassandra instance configuration
      * @param versionProvider            a Cassandra version provider
-     * @param healthCheckFrequencyMillis the health check frequency 
configuration in milliseconds
      * @param sidecarVersion             the version of the Sidecar from the 
current binary
      * @param jmxConfiguration           the configuration for the JMX Client
+     * @param session                         the CQL Session provider

Review Comment:
   nit
   ```suggestion
        * @param session                    the CQL Session provider
   ```



##########
common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java:
##########
@@ -26,9 +28,10 @@ public interface ICassandraFactory
     /**
      * Creates a new {@link ICassandraAdapter} with the provided {@link 
CQLSessionProvider} and {@link JmxClient}
      *
-     * @param session the session to the Cassandra database
-     * @param client  the JMX client to connect to the Cassandra database
+     * @param session             the session to the Cassandra database
+     * @param client              the JMX client to connect to the Cassandra 
database
+     * @param broadcastRpcAddress the native transport address and port of the 
instance

Review Comment:
   should we call this parameter `localInstanceNativeAddress` instead?



##########
src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config.yaml;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.config.DriverConfiguration;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public class DriverConfigurationImpl implements DriverConfiguration
+{
+    private final List<InetSocketAddress> contactPoints = new ArrayList<>();
+    private String localDc;
+    private int numConnections;
+
+    @JsonProperty("contact_points")
+    public List<InetSocketAddress> contactPoints()
+    {
+        return contactPoints;
+    }
+
+    @JsonProperty("num_connections")
+    public int numConnections()
+    {
+        return numConnections;

Review Comment:
   Should we do validation of the input? i.e. do not allow 0 or negative 
numbers? or what is the expectation to handle those values?



##########
src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -75,6 +80,7 @@ public class CassandraAdapterDelegate implements 
ICassandraAdapter, Host.StateLi
     private volatile NodeSettings nodeSettings = null;
     private final AtomicBoolean registered = new AtomicBoolean(false);
     private final AtomicBoolean isHealthCheckActive = new AtomicBoolean(false);
+    private InetSocketAddress broadcastRpcAddress;

Review Comment:
   ```suggestion
       private final InetSocketAddress broadcastRpcAddress;
   ```



##########
common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraAdapter.java:
##########
@@ -35,6 +40,15 @@ public interface ICassandraAdapter
 
     NodeSettings nodeSettings();
 
+    default ResultSet executeLocal(String query)

Review Comment:
   can we please add javadocs for all the new methods in the interface?



##########
src/main/java/org/apache/cassandra/sidecar/server/MainModule.java:
##########
@@ -395,7 +410,9 @@ private static InstanceMetadata buildInstanceMetadata(Vertx 
vertx,
                                                                          
versionProvider,
                                                                          
session,
                                                                          
jmxClient,
-                                                                         
sidecarVersion);
+                                                                         
sidecarVersion,
+                                                                         
cassandraInstance.host(),
+                                                                         
cassandraInstance.port());

Review Comment:
   ```suggestion
                                                                            
host,
                                                                            
port);
   ```



##########
src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public interface DriverConfiguration
+{
+    @JsonProperty("contact_points")
+    List<InetSocketAddress> contactPoints();
+
+    @JsonProperty("num_connections")
+    int numConnections();
+
+    @JsonProperty("local_dc")

Review Comment:
   Annotations under the implementation instead?
   ```suggestion
   ```



##########
src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java:
##########
@@ -292,6 +306,18 @@ public Builder 
healthCheckConfiguration(HealthCheckConfiguration healthCheckConf
             return update(b -> b.healthCheckConfiguration = 
healthCheckConfiguration);
         }
 
+        /**
+         * Sets the {@code driverConfiguration} and returns a reference to 
this Builder enabling
+         * method chaining.
+         *
+         * @param driverConfiguration the {@code 
cassandraInputValidationConfiguration} to set

Review Comment:
   ```suggestion
            * @param driverConfiguration the {@code driverConfiguration} to set
   ```



##########
src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java:
##########
@@ -114,6 +132,34 @@ void setup(AbstractCassandraTestContext 
cassandraTestContext) throws Interrupted
         context.awaitCompletion(5, TimeUnit.SECONDS);
     }
 
+    /**
+     * Some tests may want to "manage" fewer instances than the complete 
cluster.
+     * Therefore, override this if your test wants to manage fewer than the 
complete cluster size.
+     * The Sidecar will be configured to manage the first N instances in the 
cluster by instance number.
+     * Defaults to the entire cluster.
+     *
+     * @param clusterSize the size of the cluster as defined by the 
integration test
+     * @return the number of instances to manage
+     */
+    protected int getNumInstancesToManage(int clusterSize)
+    {
+        return clusterSize;
+    }
+
+    protected InstancesConfig getInstancesConfig()
+    {
+        return injector.getInstance(InstancesConfig.class);
+    }
+
+    private void scheduleHealthCheck(long intervalMillis)

Review Comment:
   this is done already by the `HealthCheckPeriodicTask` but you are probably 
facing issues because of CASSANDRASC-80  



##########
src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java:
##########
@@ -67,10 +69,24 @@ public void close()
             {
                 cluster.close();
             }
-            catch (ShutdownException shutdownException)
+            // ShutdownException may be thrown from a different classloader, 
and therefore the standard
+            // `catch (ShutdownException)` won't always work - compare the 
canonical names in stead.

Review Comment:
   ```suggestion
               // `catch (ShutdownException)` won't always work - compare the 
canonical names instead.
   ```



##########
common/src/main/java/org/apache/cassandra/sidecar/common/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra 
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections 
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport 
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+    public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+    private static final Logger logger = 
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+    private final Set<Host> selectedHosts = new HashSet<>();
+    private final Set<InetSocketAddress> localHostAddresses;
+    private final LoadBalancingPolicy childPolicy;
+    private final int totalRequestedConnections;
+    private final Random secureRandom = new SecureRandom();
+    private final HashSet<Host> allHosts = new HashSet<>();
+    private Cluster cluster;
+
+    public SidecarLoadBalancingPolicy(List<InetSocketAddress> 
localHostAddresses,
+                                      String localDc,
+                                      int numAdditionalConnections)
+    {
+        this.childPolicy = getChildPolicy(localDc);
+        this.localHostAddresses = new HashSet<>(localHostAddresses);
+        if (numAdditionalConnections < MIN_ADDITIONAL_CONNECTIONS)
+        {
+            logger.warn("Additional instances requested was {}, which is less 
than the minimum of {}. Using {}.",
+                        numAdditionalConnections, MIN_ADDITIONAL_CONNECTIONS, 
MIN_ADDITIONAL_CONNECTIONS);
+            numAdditionalConnections = MIN_ADDITIONAL_CONNECTIONS;
+        }
+        this.totalRequestedConnections = this.localHostAddresses.size() + 
numAdditionalConnections;
+    }
+
+    private LoadBalancingPolicy getChildPolicy(String localDc)
+    {
+        if (localDc != null)
+        {
+            return 
DCAwareRoundRobinPolicy.builder().withLocalDc(localDc).build();
+        }
+        return new RoundRobinPolicy();
+    }
+
+    public void init(Cluster cluster, Collection<Host> hosts)
+    {
+        this.cluster = cluster;
+        this.allHosts.addAll(hosts);
+        recalculateSelectedHosts();
+        childPolicy.init(cluster, hosts);
+    }
+
+    public HostDistance distance(Host host)
+    {
+        if (!selectedHosts.contains(host))
+        {
+            return HostDistance.IGNORED;
+        }
+        return childPolicy.distance(host);
+    }
+
+    public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement 
statement)
+    {
+        Iterator<Host> child = childPolicy.newQueryPlan(loggedKeyspace, 
statement);
+        // Filter the child policy to only selected hosts
+        return Iterators.filter(child, selectedHosts::contains);
+    }
+
+    public synchronized void onAdd(Host host)
+    {
+        this.allHosts.add(host);
+        childPolicy.onAdd(host);
+        onUp(host);
+    }
+
+    public synchronized void onUp(Host host)
+    {
+        if (selectedHosts.size() < totalRequestedConnections)
+        {
+            recalculateSelectedHosts();
+        }
+        childPolicy.onUp(host);
+    }
+
+    public synchronized void onDown(Host host)
+    {
+        // Don't remove local addresses from the selected host list
+        if (localHostAddresses.contains(host.getBroadcastRpcAddress()))
+        {
+            return;
+        }
+
+        boolean wasSelected = selectedHosts.remove(host);
+        if (!wasSelected)
+        {
+            // Non-selected nodes have been marked with HostDistance.IGNORED
+            // even if they may otherwise be useful. This has a side effect
+            // of preventing the driver from trying to reconnect to them
+            // if we miss the `onUp` event, so we need to schedule reconnects
+            // for these hosts explicitly unless we have active connections.
+            DriverExtensions.startPeriodicReconnectionAttempt(cluster, host);
+        }
+        recalculateSelectedHosts();
+        childPolicy.onDown(host);
+    }
+
+    public synchronized void onRemove(Host host)
+    {
+        this.allHosts.remove(host);
+        onDown(host);
+        childPolicy.onRemove(host);
+    }
+
+    public void close()
+    {
+        childPolicy.close();
+    }
+
+    /**
+     * Adds any local hosts to the selected host list
+     *
+     * @param hosts the list of hosts provided by the control connection
+     */
+    private void addLocalHosts(List<Host> hosts)
+    {
+        Iterator<Host> hostIterator = hosts.iterator();
+        while (hostIterator.hasNext())
+        {
+            Host h = hostIterator.next();
+            if (localHostAddresses.contains(h.getEndPoint().resolve()))
+            {
+                selectedHosts.add(h);
+                hostIterator.remove();
+            }
+        }
+    }
+
+    private synchronized void recalculateSelectedHosts()
+    {
+        // Copy the list to allow us to remove hosts as we build the list
+        List<Host> hosts = new ArrayList<>(this.allHosts);
+        addLocalHosts(hosts);
+        Collections.shuffle(hosts, this.secureRandom);
+        Iterator<Host> hostIterator = hosts.iterator();
+        while (selectedHosts.size() < totalRequestedConnections && 
hostIterator.hasNext())
+        {
+            Host h = hostIterator.next();

Review Comment:
   nit
   ```suggestion
               Host host = hostIterator.next();
   ```



##########
common/src/main/java/org/apache/cassandra/sidecar/common/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra 
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections 
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport 
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+    public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+    private static final Logger logger = 
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+    private final Set<Host> selectedHosts = new HashSet<>();
+    private final Set<InetSocketAddress> localHostAddresses;
+    private final LoadBalancingPolicy childPolicy;
+    private final int totalRequestedConnections;
+    private final Random secureRandom = new SecureRandom();
+    private final HashSet<Host> allHosts = new HashSet<>();
+    private Cluster cluster;
+
+    public SidecarLoadBalancingPolicy(List<InetSocketAddress> 
localHostAddresses,
+                                      String localDc,
+                                      int numAdditionalConnections)
+    {
+        this.childPolicy = getChildPolicy(localDc);
+        this.localHostAddresses = new HashSet<>(localHostAddresses);
+        if (numAdditionalConnections < MIN_ADDITIONAL_CONNECTIONS)
+        {
+            logger.warn("Additional instances requested was {}, which is less 
than the minimum of {}. Using {}.",
+                        numAdditionalConnections, MIN_ADDITIONAL_CONNECTIONS, 
MIN_ADDITIONAL_CONNECTIONS);
+            numAdditionalConnections = MIN_ADDITIONAL_CONNECTIONS;
+        }
+        this.totalRequestedConnections = this.localHostAddresses.size() + 
numAdditionalConnections;
+    }
+
+    private LoadBalancingPolicy getChildPolicy(String localDc)
+    {
+        if (localDc != null)
+        {
+            return 
DCAwareRoundRobinPolicy.builder().withLocalDc(localDc).build();
+        }
+        return new RoundRobinPolicy();
+    }
+
+    public void init(Cluster cluster, Collection<Host> hosts)
+    {
+        this.cluster = cluster;
+        this.allHosts.addAll(hosts);
+        recalculateSelectedHosts();
+        childPolicy.init(cluster, hosts);
+    }
+
+    public HostDistance distance(Host host)
+    {
+        if (!selectedHosts.contains(host))
+        {
+            return HostDistance.IGNORED;
+        }
+        return childPolicy.distance(host);
+    }
+
+    public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement 
statement)
+    {
+        Iterator<Host> child = childPolicy.newQueryPlan(loggedKeyspace, 
statement);
+        // Filter the child policy to only selected hosts
+        return Iterators.filter(child, selectedHosts::contains);
+    }
+
+    public synchronized void onAdd(Host host)
+    {
+        this.allHosts.add(host);
+        childPolicy.onAdd(host);
+        onUp(host);
+    }
+
+    public synchronized void onUp(Host host)
+    {
+        if (selectedHosts.size() < totalRequestedConnections)
+        {
+            recalculateSelectedHosts();
+        }
+        childPolicy.onUp(host);
+    }
+
+    public synchronized void onDown(Host host)
+    {
+        // Don't remove local addresses from the selected host list
+        if (localHostAddresses.contains(host.getBroadcastRpcAddress()))
+        {
+            return;
+        }
+
+        boolean wasSelected = selectedHosts.remove(host);
+        if (!wasSelected)
+        {
+            // Non-selected nodes have been marked with HostDistance.IGNORED
+            // even if they may otherwise be useful. This has a side effect
+            // of preventing the driver from trying to reconnect to them
+            // if we miss the `onUp` event, so we need to schedule reconnects
+            // for these hosts explicitly unless we have active connections.
+            DriverExtensions.startPeriodicReconnectionAttempt(cluster, host);
+        }
+        recalculateSelectedHosts();
+        childPolicy.onDown(host);
+    }
+
+    public synchronized void onRemove(Host host)
+    {
+        this.allHosts.remove(host);
+        onDown(host);
+        childPolicy.onRemove(host);
+    }
+
+    public void close()
+    {
+        childPolicy.close();
+    }
+
+    /**
+     * Adds any local hosts to the selected host list
+     *
+     * @param hosts the list of hosts provided by the control connection
+     */
+    private void addLocalHosts(List<Host> hosts)
+    {
+        Iterator<Host> hostIterator = hosts.iterator();
+        while (hostIterator.hasNext())
+        {
+            Host h = hostIterator.next();

Review Comment:
   nit
   ```suggestion
               Host host = hostIterator.next();
   ```



##########
src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java:
##########
@@ -304,6 +330,8 @@ public Builder 
cassandraInputValidationConfiguration(CassandraInputValidationCon
             return update(b -> b.cassandraInputValidationConfiguration = 
configuration);
         }
 
+

Review Comment:
   Nit: extra line break
   ```suggestion
   ```



##########
common/src/main/java/org/apache/cassandra/sidecar/common/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra 
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections 
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport 
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+    public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+    private static final Logger logger = 
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+    private final Set<Host> selectedHosts = new HashSet<>();
+    private final Set<InetSocketAddress> localHostAddresses;
+    private final LoadBalancingPolicy childPolicy;
+    private final int totalRequestedConnections;
+    private final Random secureRandom = new SecureRandom();
+    private final HashSet<Host> allHosts = new HashSet<>();
+    private Cluster cluster;
+
+    public SidecarLoadBalancingPolicy(List<InetSocketAddress> 
localHostAddresses,
+                                      String localDc,
+                                      int numAdditionalConnections)
+    {
+        this.childPolicy = getChildPolicy(localDc);
+        this.localHostAddresses = new HashSet<>(localHostAddresses);
+        if (numAdditionalConnections < MIN_ADDITIONAL_CONNECTIONS)
+        {
+            logger.warn("Additional instances requested was {}, which is less 
than the minimum of {}. Using {}.",
+                        numAdditionalConnections, MIN_ADDITIONAL_CONNECTIONS, 
MIN_ADDITIONAL_CONNECTIONS);
+            numAdditionalConnections = MIN_ADDITIONAL_CONNECTIONS;
+        }
+        this.totalRequestedConnections = this.localHostAddresses.size() + 
numAdditionalConnections;
+    }
+
+    private LoadBalancingPolicy getChildPolicy(String localDc)

Review Comment:
   nit
   ```suggestion
       private LoadBalancingPolicy childPolicy(String localDc)
   ```



##########
src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public interface DriverConfiguration
+{
+    @JsonProperty("contact_points")
+    List<InetSocketAddress> contactPoints();

Review Comment:
   can we please add javadocs here as well?



##########
src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java:
##########
@@ -68,28 +70,40 @@
  */
 public abstract class IntegrationTestBase
 {
-    protected Logger logger = LoggerFactory.getLogger(this.getClass());
-    protected Vertx vertx;
-    protected Server server;
-
     protected static final String TEST_KEYSPACE = "testkeyspace";
-    private static final String TEST_TABLE_PREFIX = "testtable";
-
     protected static final int DEFAULT_RF = 3;
+    private static final String TEST_TABLE_PREFIX = "testtable";
     private static final AtomicInteger TEST_TABLE_ID = new AtomicInteger(0);
+    protected Logger logger = LoggerFactory.getLogger(this.getClass());
+    protected Vertx vertx;
+    protected Server server;
     protected CassandraSidecarTestContext sidecarTestContext;
+    protected Injector injector;
+    private long healthCheckTimerId;
+    private InstancesConfig instancesConfig;
+
+    private static QualifiedTableName uniqueTestTableFullName()
+    {
+        return new QualifiedTableName(TEST_KEYSPACE, TEST_TABLE_PREFIX + 
TEST_TABLE_ID.getAndIncrement());
+    }
 
     @BeforeEach
-    void setup(AbstractCassandraTestContext cassandraTestContext) throws 
InterruptedException
+    void setup(AbstractCassandraTestContext cassandraTestContext, TestInfo 
testInfo) throws InterruptedException
     {
         IntegrationTestModule integrationTestModule = new 
IntegrationTestModule();
-        Injector injector = Guice.createInjector(Modules.override(new 
MainModule()).with(integrationTestModule));
+        System.setProperty("cassandra.testtag", 
testInfo.getTestClass().get().getCanonicalName());
+        System.setProperty("suitename", testInfo.getDisplayName() + ": " + 
cassandraTestContext.version);
+        int clusterSize = cassandraTestContext.clusterSize();
+        injector = Guice.createInjector(Modules.override(new 
MainModule()).with(integrationTestModule));
         vertx = injector.getInstance(Vertx.class);
-        sidecarTestContext = CassandraSidecarTestContext.from(vertx, 
cassandraTestContext, DnsResolver.DEFAULT);
+        sidecarTestContext = CassandraSidecarTestContext.from(vertx, 
cassandraTestContext, DnsResolver.DEFAULT,
+                                                              
getNumInstancesToManage(clusterSize));
+
         integrationTestModule.setCassandraTestContext(sidecarTestContext);
 
+        instancesConfig = getInstancesConfig();
         server = injector.getInstance(Server.class);
-
+        vertx = injector.getInstance(Vertx.class);

Review Comment:
   we already have the vertx instance
   ```suggestion
   ```



##########
common/src/main/java/org/apache/cassandra/sidecar/common/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra 
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections 
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport 
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+    public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+    private static final Logger logger = 
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+    private final Set<Host> selectedHosts = new HashSet<>();
+    private final Set<InetSocketAddress> localHostAddresses;
+    private final LoadBalancingPolicy childPolicy;
+    private final int totalRequestedConnections;
+    private final Random secureRandom = new SecureRandom();
+    private final HashSet<Host> allHosts = new HashSet<>();
+    private Cluster cluster;
+
+    public SidecarLoadBalancingPolicy(List<InetSocketAddress> 
localHostAddresses,
+                                      String localDc,
+                                      int numAdditionalConnections)
+    {
+        this.childPolicy = getChildPolicy(localDc);
+        this.localHostAddresses = new HashSet<>(localHostAddresses);
+        if (numAdditionalConnections < MIN_ADDITIONAL_CONNECTIONS)
+        {
+            logger.warn("Additional instances requested was {}, which is less 
than the minimum of {}. Using {}.",
+                        numAdditionalConnections, MIN_ADDITIONAL_CONNECTIONS, 
MIN_ADDITIONAL_CONNECTIONS);
+            numAdditionalConnections = MIN_ADDITIONAL_CONNECTIONS;
+        }
+        this.totalRequestedConnections = this.localHostAddresses.size() + 
numAdditionalConnections;
+    }
+
+    private LoadBalancingPolicy getChildPolicy(String localDc)
+    {
+        if (localDc != null)
+        {
+            return 
DCAwareRoundRobinPolicy.builder().withLocalDc(localDc).build();
+        }
+        return new RoundRobinPolicy();
+    }
+
+    public void init(Cluster cluster, Collection<Host> hosts)
+    {
+        this.cluster = cluster;
+        this.allHosts.addAll(hosts);
+        recalculateSelectedHosts();
+        childPolicy.init(cluster, hosts);
+    }
+
+    public HostDistance distance(Host host)
+    {
+        if (!selectedHosts.contains(host))
+        {
+            return HostDistance.IGNORED;
+        }
+        return childPolicy.distance(host);
+    }
+
+    public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement 
statement)
+    {
+        Iterator<Host> child = childPolicy.newQueryPlan(loggedKeyspace, 
statement);
+        // Filter the child policy to only selected hosts
+        return Iterators.filter(child, selectedHosts::contains);
+    }
+
+    public synchronized void onAdd(Host host)
+    {
+        this.allHosts.add(host);
+        childPolicy.onAdd(host);
+        onUp(host);
+    }
+
+    public synchronized void onUp(Host host)
+    {
+        if (selectedHosts.size() < totalRequestedConnections)
+        {
+            recalculateSelectedHosts();
+        }
+        childPolicy.onUp(host);
+    }
+
+    public synchronized void onDown(Host host)

Review Comment:
   can we add override annotations in this class to identify methods from super 
classes/interfaces?



##########
src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public interface DriverConfiguration
+{
+    @JsonProperty("contact_points")

Review Comment:
   Annotations under the implementation instead?
   ```suggestion
   ```



##########
src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -332,6 +361,11 @@ protected void markAsDownAndMaybeNotify()
     @Nullable
     private <T> T fromAdapter(Function<ICassandraAdapter, T> getter)
     {
+        if (this.adapter == null)

Review Comment:
   I am a strong -1 on this change. This basically defeats the purpose of the 
health checks, which periodically checks for connection state. This is a way to 
prevent from a thundering herd situation. The client should retry on 503 
(Service Unavailable) which is returned when we encounter this situation.



##########
src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java:
##########
@@ -103,7 +117,11 @@ void setup(AbstractCassandraTestContext 
cassandraTestContext) throws Interrupted
 
         server.start()
               .onSuccess(s -> {
+                  // TODO: DTR - Do we still need the new scheduleHealthCheck 
here (and the sleep below)?
+                  scheduleHealthCheck(5000); // Check health frequently so 
adapters are created in time for tests

Review Comment:
   this is an issue that should be fixed in CASSANDRASC-80



##########
common/src/main/java/com/datastax/driver/core/DriverExtensions.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datastax.driver.core;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A collection of methods that require access to package-private members in 
the datastax driver.
+ */
+
+public class DriverExtensions
+{
+    /**
+     * Check if a host has active connections
+     *
+     * @param host the host to check
+     * @return true if the host has active connections, false otherwise
+     */
+    public static boolean hasActiveConnections(Host host)
+    {
+        return host.convictionPolicy.hasActiveConnections();
+    }
+
+    /**
+     * Start attempting to reconnect to the given host, as hosts with 
`IGNORED` distance aren't attempted
+     * and the SidecarLoadBalancingPolicy marks non-selected nodes as IGNORED 
until they need to rotate in.
+     *
+     * @param cluster The cluster object
+     * @param host    the host to which reconnect attempts will be made
+     */
+    public static void startPeriodicReconnectionAttempt(Cluster cluster, Host 
host)
+    {
+        cluster.manager.startPeriodicReconnectionAttempt(host, false);
+    }
+
+    /**
+     * Gets a Host instance from metadata based on the native

Review Comment:
   incomplete sentence maybe? based on the native socket address? 



##########
src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java:
##########
@@ -158,9 +216,30 @@ protected void createTestKeyspace()
 
     protected void createTestKeyspace(Map<String, Integer> rf)
     {
-        Session session = maybeGetSession();
-        session.execute("CREATE KEYSPACE " + TEST_KEYSPACE +
-                        " WITH REPLICATION = { 'class' : 
'NetworkTopologyStrategy', " + generateRfString(rf) + " };");
+        int attempts = 1;
+        ArrayList<Throwable> thrown = new ArrayList<>(5);
+        while (attempts <= 5)
+        {
+            try
+            {
+                Session session = maybeGetSession();
+
+                session.execute("CREATE KEYSPACE IF NOT EXISTS " + 
TEST_KEYSPACE +

Review Comment:
   why is create keyspace failing? I haven't seen this before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to