yifan-c commented on code in PR #74:
URL: https://github.com/apache/cassandra-sidecar/pull/74#discussion_r1406476449


##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java:
##########
@@ -90,20 +99,43 @@ public Metadata metadata()
     @Nullable
     public NodeSettings nodeSettings()
     {
-        Session activeSession = session.localCql();
-        if (activeSession == null)
-        {
-            return null;
-        }
+        ResultSet rs = executeLocal("select release_version, partitioner from 
system.local");
+        if (rs == null) return null;

Review Comment:
   I do not think the inlined if statement is preferred. Can you break it into 
multiple lines? 



##########
common/src/main/java/com/datastax/driver/core/DriverExtensions.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datastax.driver.core;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A collection of methods that require access to package-private members in 
the datastax driver.
+ */
+public class DriverExtensions

Review Comment:
   nit: `DriverUtils` might be a more conventional name.



##########
common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraAdapter.java:
##########
@@ -33,8 +38,35 @@ public interface ICassandraAdapter
      */
     Metadata metadata();
 
+    /**
+     * The {@link NodeSettings} instance for this instance.

Review Comment:
   nit: it is a bit hard to read with "instance" referring to different things. 
I would drop the first "instance" in the sentence.



##########
src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -166,8 +177,18 @@ private void healthCheckInternal()
 
         try
         {
-            Row oneResult = activeSession.execute("select release_version, 
partitioner from system.local")
-                                         .one();
+            // NOTE: We cannot use `executeLocal` here as there may be no 
adapter yet.
+            SimpleStatement healthCheckStatement =
+            new SimpleStatement("select release_version, partitioner from 
system.local");
+            Host host = 
DriverExtensions.getHost(activeSession.getCluster().getMetadata(), 
localNativeTransportAddress);
+            if (host == null)
+            {
+                LOGGER.warn("Could not find host in cluster metadata by 
address and port {}",
+                            localNativeTransportAddress);
+                return;
+            }
+            healthCheckStatement.setHost(host);

Review Comment:
   Consistency level needs to be `ONE`



##########
src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -284,7 +314,7 @@ public boolean isUp()
     public void close()
     {
         markAsDownAndMaybeNotify();
-        Session activeSession = cqlSessionProvider.close();
+        Session activeSession = cqlSessionProvider.getIfConnected();

Review Comment:
   where is the session closed?



##########
src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public interface DriverConfiguration
+{
+    /**
+     * A list of contact points to use for initial connection to Cassandra.
+     * At least 2 non-replica nodes are recommended.
+     * @return a list of contact points
+     */
+    List<InetSocketAddress> contactPoints();
+
+    /**
+     * The number of connections other than locally-managed nodes to use.
+     * The minimum is 2 - if your value is &lt; 2, the Sidecar will use 2.

Review Comment:
   I would just use 'less than', instead of the html escaped characters. 



##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java:
##########
@@ -43,13 +45,16 @@ public CassandraFactory(DnsResolver dnsResolver, String 
sidecarVersion)
     /**
      * Returns a new adapter for Cassandra 4.0 clusters.
      *
-     * @param session   the session to the Cassandra database
-     * @param jmxClient the JMX client to connect to the Cassandra database
+     * @param session                     the session to the Cassandra database
+     * @param jmxClient                   the JMX client to connect to the 
Cassandra database
+     * @param localNativeTransportAddress the address and port on which this 
instance is configured to listen

Review Comment:
   There is no need to align the parameter descriptions, which leads to 
unrelated changes when adding a new param. 



##########
common/src/main/java/com/datastax/driver/core/DriverExtensions.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datastax.driver.core;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A collection of methods that require access to package-private members in 
the datastax driver.
+ */
+public class DriverExtensions
+{
+    /**
+     * Check if a host has active connections
+     *
+     * @param host the host to check
+     * @return true if the host has active connections, false otherwise
+     */
+    public static boolean hasActiveConnections(Host host)

Review Comment:
   +1 on adding the annotation



##########
src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public interface DriverConfiguration
+{
+    /**
+     * A list of contact points to use for initial connection to Cassandra.
+     * At least 2 non-replica nodes are recommended.

Review Comment:
   Does "non-replica nodes" imply that the config is heterogeneous across 
sidecar instances? My understanding is that the contact points are the subset 
of the seed nodes and it is the same across all sidecar instances. 



##########
src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra 
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections 
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport 
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+    public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+    private final Set<Host> selectedHosts = new HashSet<>();
+    private final Set<InetSocketAddress> localHostAddresses;
+    private final LoadBalancingPolicy childPolicy;
+    private final int totalRequestedConnections;
+    private final Random secureRandom = new SecureRandom();
+    private final HashSet<Host> allHosts = new HashSet<>();
+    private Cluster cluster;
+
+    public SidecarLoadBalancingPolicy(List<InetSocketAddress> 
localHostAddresses,
+                                      String localDc,
+                                      int numAdditionalConnections)
+    {
+        this.childPolicy = createChildPolicy(localDc);
+        this.localHostAddresses = new HashSet<>(localHostAddresses);
+        if (numAdditionalConnections < MIN_ADDITIONAL_CONNECTIONS)
+        {
+            LOGGER.warn("Additional instances requested was {}, which is less 
than the minimum of {}. Using {}.",
+                        numAdditionalConnections, MIN_ADDITIONAL_CONNECTIONS, 
MIN_ADDITIONAL_CONNECTIONS);
+            numAdditionalConnections = MIN_ADDITIONAL_CONNECTIONS;
+        }
+        this.totalRequestedConnections = this.localHostAddresses.size() + 
numAdditionalConnections;
+    }
+
+    @Override
+    public void init(Cluster cluster, Collection<Host> hosts)
+    {
+        this.cluster = cluster;
+        this.allHosts.addAll(hosts);
+        recalculateSelectedHosts();
+        childPolicy.init(cluster, hosts);
+    }
+
+    @Override
+    public HostDistance distance(Host host)
+    {
+        if (!selectedHosts.contains(host))
+        {
+            return HostDistance.IGNORED;
+        }
+        return childPolicy.distance(host);
+    }
+
+    @Override
+    public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement 
statement)
+    {
+        Iterator<Host> child = childPolicy.newQueryPlan(loggedKeyspace, 
statement);
+        // Filter the child policy to only selected hosts
+        return Iterators.filter(child, selectedHosts::contains);
+    }
+
+    @Override
+    public synchronized void onAdd(Host host)
+    {
+        onUp(host);
+        childPolicy.onAdd(host);
+    }
+
+    @Override
+    public synchronized void onUp(Host host)
+    {
+        this.allHosts.add(host); // replace existing reference if there is one
+        if (selectedHosts.size() < totalRequestedConnections)
+        {
+            recalculateSelectedHosts();
+        }
+        childPolicy.onUp(host);
+    }
+
+    @Override
+    public synchronized void onDown(Host host)
+    {
+        // Don't remove local addresses from the selected host list
+        if (localHostAddresses.contains(host.getBroadcastRpcAddress()))
+        {
+            return;
+        }

Review Comment:
   Why prevent removing a down local host? It cannot serve queries. 
   Besides that, how about adding a warn log?



##########
src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -196,7 +217,6 @@ private void healthCheckInternal()
             // Unregister the host listener and nullify the session in order 
to get a new object.
             markAsDownAndMaybeNotify();
             maybeUnregisterHostListener(activeSession);
-            cqlSessionProvider.close();

Review Comment:
   is it removed by mistake?



##########
common/src/main/java/com/datastax/driver/core/DriverExtensions.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datastax.driver.core;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A collection of methods that require access to package-private members in 
the datastax driver.
+ */
+public class DriverExtensions
+{
+    /**
+     * Check if a host has active connections
+     *
+     * @param host the host to check
+     * @return true if the host has active connections, false otherwise
+     */
+    public static boolean hasActiveConnections(Host host)
+    {
+        return host.convictionPolicy.hasActiveConnections();
+    }
+
+    /**
+     * Start attempting to reconnect to the given host, as hosts with 
`IGNORED` distance aren't attempted
+     * and the SidecarLoadBalancingPolicy marks non-selected nodes as IGNORED 
until they need to rotate in.
+     *
+     * @param cluster The cluster object
+     * @param host    the host to which reconnect attempts will be made
+     */
+    public static void startPeriodicReconnectionAttempt(Cluster cluster, Host 
host)
+    {
+        cluster.manager.startPeriodicReconnectionAttempt(host, false);
+    }
+
+    /**
+     * Gets a Host instance from metadata based on the native transport address
+     *
+     * @param metadata                    the {@link Metadata} instance to 
search for the host
+     * @param localNativeTransportAddress the native transport ip address and 
port for the host to find
+     * @return the {@link Host}           instance if found, else null
+     */
+    public static Host getHost(Metadata metadata, InetSocketAddress 
localNativeTransportAddress)
+    {
+        // Because the driver can sometimes mess up the broadcast address, we 
need to search by endpoint
+        // which is what it actually uses to connect to the cluster. 
Therefore, create a TranslatedAddressEndpoint
+        // to use for searching. It has to be one of these because that's what 
the driver is using internally,
+        // and the `.equals` method used when searching checks the type 
explicitly.
+        TranslatedAddressEndPoint endPoint = new 
TranslatedAddressEndPoint(localNativeTransportAddress);
+        return metadata.getHost(endPoint);

Review Comment:
   Consider caching the lookup result? The `metadata.getHost` lookup is an O(n) 
operation.



##########
common/src/main/java/org/apache/cassandra/sidecar/common/CQLSessionProvider.java:
##########
@@ -18,159 +18,36 @@
 
 package org.apache.cassandra.sidecar.common;
 
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.NettyOptions;
-import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.DriverException;
-import com.datastax.driver.core.exceptions.DriverInternalError;
-import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
-import com.datastax.driver.core.policies.ReconnectionPolicy;
-import com.datastax.driver.core.policies.RoundRobinPolicy;
-import com.datastax.driver.core.policies.WhiteListPolicy;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Provides connections to the local Cassandra cluster as defined in the 
Configuration. Currently, it only supports
- * returning the local connection.
+ * A provider of a CQL Session. The session should be connected to:
+ * <ul>
+ *     <li>All locally-managed instances.</li>
+ *     <li>At least one non-local instance. Preferably, at least two 
non-replica instances.</li>
+ * </ul>
  */
-public class CQLSessionProvider
+public interface CQLSessionProvider
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(CQLSessionProvider.class);
-
-    @Nullable
-    private Session localSession;
-    private final InetSocketAddress inet;
-    private final WhiteListPolicy wlp;
-    private final NettyOptions nettyOptions;
-    private final QueryOptions queryOptions;
-    private final ReconnectionPolicy reconnectionPolicy;
-
-    public CQLSessionProvider(String host, int port, int healthCheckInterval)
-    {
-        // this was originally using unresolved Inet addresses, but it would 
fail when trying to
-        // connect to a docker container
-        logger.info("Connecting to {} on port {}", host, port);
-        inet = new InetSocketAddress(host, port);
-
-        wlp = new WhiteListPolicy(new RoundRobinPolicy(), 
Collections.singletonList(inet));
-        this.nettyOptions = new NettyOptions();
-        this.queryOptions = new 
QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
-        this.reconnectionPolicy = new ExponentialReconnectionPolicy(1000, 
healthCheckInterval);
-    }
-
-    public CQLSessionProvider(InetSocketAddress target, NettyOptions options)
-    {
-        inet = target;
-        wlp = new WhiteListPolicy(new RoundRobinPolicy(), 
Collections.singletonList(inet));
-        this.nettyOptions = options;
-        this.queryOptions = new 
QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
-        reconnectionPolicy = new ExponentialReconnectionPolicy(100, 1000);
-    }
-
     /**
-     * Provides a Session connected only to the local node from configuration. 
If null it means the connection was
-     * not able to be established. The session still might throw a 
NoHostAvailableException if the local host goes
-     * offline or otherwise unavailable.
+     * Provides a Session connected to the cluster. If null it means the 
connection was
+     * could not be established. The session still might throw a 
NoHostAvailableException if the
+     * cluster is otherwise unreachable.
      *
      * @return Session
      */
-    @Nullable
-    public synchronized Session localCql()
-    {
-        Cluster cluster = null;
-        try
-        {
-            if (localSession == null)
-            {
-                logger.info("Connecting to {}", inet);
-                cluster = Cluster.builder()
-                                 .addContactPointsWithPorts(inet)
-                                 .withLoadBalancingPolicy(wlp)
-                                 .withQueryOptions(queryOptions)
-                                 .withReconnectionPolicy(reconnectionPolicy)
-                                 .withoutMetrics()
-                                 // tests can create a lot of these Cluster 
objects, to avoid creating HWTs and
-                                 // event thread pools for each we have the 
override
-                                 .withNettyOptions(nettyOptions)
-                                 .build();
-                localSession = cluster.connect();
-                logger.info("Successfully connected to Cassandra instance!");
-            }
-        }
-        catch (Exception e)
-        {
-            logger.error("Failed to reach Cassandra", e);
-            if (cluster != null)
-            {
-                try
-                {
-                    cluster.close();
-                }
-                catch (Exception ex)
-                {
-                    logger.error("Failed to close cluster in cleanup", ex);
-                }
-            }
-        }
-        return localSession;
-    }
+    @Nullable Session get();
 
-    public Session close()
-    {
-        Session localSession;
-        synchronized (this)
-        {
-            localSession = this.localSession;
-            this.localSession = null;
-        }
-
-        if (localSession != null)
-        {
-            try
-            {
-                localSession.getCluster().closeAsync().get(1, 
TimeUnit.MINUTES);
-                localSession.closeAsync().get(1, TimeUnit.MINUTES);
-            }
-            catch (InterruptedException e)
-            {
-                Thread.currentThread().interrupt();
-            }
-            catch (TimeoutException e)
-            {
-                logger.warn("Unable to close session after 1 minute for 
provider {}", this, e);
-            }
-            catch (ExecutionException e)
-            {
-                throw propagateCause(e);
-            }
-        }
-        return localSession;
-    }
-
-    static RuntimeException propagateCause(ExecutionException e)
-    {
-        Throwable cause = e.getCause();
-
-        if (cause instanceof Error) throw ((Error) cause);
+    /**
+     * Closes the CQLSessionProvider
+     */
+    void close();
 
-        // We could just rethrow e.getCause(). However, the cause of the 
ExecutionException has likely
-        // been
-        // created on the I/O thread receiving the response. Which means that 
the stacktrace associated
-        // with said cause will make no mention of the current thread. This is 
painful for say, finding
-        // out which execute() statement actually raised the exception. So 
instead, we re-create the
-        // exception.
-        if (cause instanceof DriverException) throw ((DriverException) 
cause).copy();
-        else throw new DriverInternalError("Unexpected exception thrown", 
cause);
-    }
+    /**
+     * Gets the current Session object if it already exists.
+     * Otherwise, returns null.
+     * @return the connected {@link Session} object if available. Null 
otherwise.
+     */
+    Session getIfConnected();

Review Comment:
   Add `@Nullable` too?



##########
src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datastax.driver.core.exceptions.DriverInternalError;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.apache.cassandra.sidecar.config.DriverConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Provides connections to the local Cassandra cluster as defined in the 
Configuration. Currently, it only supports
+ * returning the local connection.
+ */
+public class CQLSessionProviderImpl implements CQLSessionProvider
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CQLSessionProviderImpl.class);
+    private final List<InetSocketAddress> contactPoints;
+    private final int numConnections;
+    private final String localDc;
+    private final NettyOptions nettyOptions;
+    private final ReconnectionPolicy reconnectionPolicy;
+    private final List<InetSocketAddress> localInstances;
+    @Nullable
+    private volatile Session session;
+
+    @VisibleForTesting
+    public CQLSessionProviderImpl(List<InetSocketAddress> contactPoints,
+                                  List<InetSocketAddress> localInstances,
+                                  int healthCheckFrequencyMillis,
+                                  String localDc,
+                                  int numConnections,
+                                  NettyOptions options)
+    {
+        this.contactPoints = contactPoints;
+        this.localInstances = localInstances;
+        this.localDc = localDc;
+        this.numConnections = numConnections;
+        this.nettyOptions = options;
+        this.reconnectionPolicy = new ExponentialReconnectionPolicy(500, 
healthCheckFrequencyMillis);
+    }
+
+    public CQLSessionProviderImpl(SidecarConfiguration configuration,
+                                  NettyOptions options)
+    {
+        DriverConfiguration driverConfiguration = 
configuration.driverConfiguration();
+        this.contactPoints = driverConfiguration.contactPoints();
+        this.localInstances = configuration.cassandraInstances()
+                                           .stream()
+                                           .map(i -> new 
InetSocketAddress(i.host(), i.port()))
+                                           .collect(Collectors.toList());
+        this.localDc = driverConfiguration.localDc();
+        this.numConnections = driverConfiguration.numConnections();
+        this.nettyOptions = options;
+        int maxDelayMs = 
configuration.healthCheckConfiguration().checkIntervalMillis();
+        this.reconnectionPolicy = new ExponentialReconnectionPolicy(500, 
maxDelayMs);
+    }
+
+    static RuntimeException propagateCause(ExecutionException e)
+    {
+        Throwable cause = e.getCause();
+
+        if (cause instanceof Error) throw ((Error) cause);
+
+        // We could just rethrow e.getCause(). However, the cause of the 
ExecutionException has likely
+        // been
+        // created on the I/O thread receiving the response. Which means that 
the stacktrace associated
+        // with said cause will make no mention of the current thread. This is 
painful for say, finding
+        // out which execute() statement actually raised the exception. So 
instead, we re-create the
+        // exception.
+        if (cause instanceof DriverException) throw ((DriverException) 
cause).copy();
+        else throw new DriverInternalError("Unexpected exception thrown", 
cause);
+    }
+
+    /**
+     * Provides a Session connected to the cluster. If null it means the 
connection was
+     * could not be established. The session still might throw a 
NoHostAvailableException if the
+     * cluster is otherwise unreachable.
+     *
+     * @return Session
+     */
+    @Nullable
+    public synchronized Session get()
+    {
+        Cluster cluster = null;
+        try
+        {
+            if (session == null)
+            {
+                logger.info("Connecting to cluster using contact points {}", 
contactPoints);
+
+                LoadBalancingPolicy lbp = new 
SidecarLoadBalancingPolicy(localInstances, localDc, numConnections);
+                // Prevent spurious reconnects of ignored down nodes on `onUp` 
events
+                QueryOptions queryOptions = new 
QueryOptions().setReprepareOnUp(false);
+                cluster = Cluster.builder()
+                                 .addContactPointsWithPorts(contactPoints)
+                                 .withReconnectionPolicy(reconnectionPolicy)
+                                 .withoutMetrics()
+                                 .withLoadBalancingPolicy(lbp)
+                                 .withQueryOptions(queryOptions)
+                                 // tests can create a lot of these Cluster 
objects, to avoid creating HWTs and
+                                 // event thread pools for each we have the 
override
+                                 .withNettyOptions(nettyOptions)
+                                 .build();
+                session = cluster.connect();
+                logger.info("Successfully connected to Cassandra!");
+            }
+        }
+        catch (Exception e)
+        {
+            logger.error("Failed to reach Cassandra", e);
+            if (cluster != null)
+            {
+                try
+                {
+                    cluster.close();
+                }
+                catch (Exception ex)
+                {
+                    logger.error("Failed to close cluster in cleanup", ex);
+                }
+            }
+        }
+        return session;
+    }

Review Comment:
   How about minimizing the try-catch block by returning the non-null session 
early?
   
   ```suggestion
       public synchronized Session get()
       {
           Cluster cluster = null;
           if (session != null)
           {
               return session;
           }
   
           try
           {
               logger.info("Connecting to cluster using contact points {}", 
contactPoints);
   
               LoadBalancingPolicy lbp = new 
SidecarLoadBalancingPolicy(localInstances, localDc, numConnections);
               // Prevent spurious reconnects of ignored down nodes on `onUp` 
events
               QueryOptions queryOptions = new 
QueryOptions().setReprepareOnUp(false);
               cluster = Cluster.builder()
                                .addContactPointsWithPorts(contactPoints)
                                .withReconnectionPolicy(reconnectionPolicy)
                                .withoutMetrics()
                                .withLoadBalancingPolicy(lbp)
                                .withQueryOptions(queryOptions)
                                // tests can create a lot of these Cluster 
objects, to avoid creating HWTs and
                                // event thread pools for each we have the 
override
                                .withNettyOptions(nettyOptions)
                                .build();
               session = cluster.connect();
               logger.info("Successfully connected to Cassandra!");
           }
           catch (Exception e)
           {
               logger.error("Failed to reach Cassandra", e);
               if (cluster != null)
               {
                   try
                   {
                       cluster.close();
                   }
                   catch (Exception ex)
                   {
                       logger.error("Failed to close cluster in cleanup", ex);
                   }
               }
           }
           return session;
       }
   ```



##########
src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -166,8 +177,18 @@ private void healthCheckInternal()
 
         try
         {
-            Row oneResult = activeSession.execute("select release_version, 
partitioner from system.local")
-                                         .one();
+            // NOTE: We cannot use `executeLocal` here as there may be no 
adapter yet.
+            SimpleStatement healthCheckStatement =
+            new SimpleStatement("select release_version, partitioner from 
system.local");
+            Host host = 
DriverExtensions.getHost(activeSession.getCluster().getMetadata(), 
localNativeTransportAddress);

Review Comment:
   It is an indicator that the helper methods, `executeLocal`, do not belong to 
adapter. How about moving them into a utility class? So that the code is better 
reused. For example, the alternative is 
   
   ```java 
   public class SessionUtils
   {
       public ResultSet executeLocal(Session session,
                                     InetSocketAddress 
localNativeTransportAddress,
                                     String query)
       {
           return executeLocal(session, localNativeTransportAddress, new 
SimpleStatement(query));
       }
   
       public ResultSet executeLocal(Session session,
                                     InetSocketAddress 
localNativeTransportAddress,
                                     Statement statement)
       {
           Host host = 
DriverExtensions.getHost(session.getCluster().getMetadata(), 
localNativeTransportAddress);
           if (host == null)
           {
               LOGGER.warn("Could not find host in cluster metadata by address 
and port {}",
                           localNativeTransportAddress);
               throw new NoHostAvailableException(Collections.emptyMap());
           }
           statement.setHost(host);
           statement.setConsistencyLevel(ConsistencyLevel.ONE);
           return session.execute(statement);
       }
   }
   ```



##########
src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra 
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections 
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport 
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+    public static final int MIN_ADDITIONAL_CONNECTIONS = 2;

Review Comment:
   "Additional" is relative/implicit, how about `MIN_NON_LOCAL_CONNECTIONS`? Or 
add a comment to explain what is the additional connections.



##########
src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra 
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections 
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport 
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+    public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+    private final Set<Host> selectedHosts = new HashSet<>();
+    private final Set<InetSocketAddress> localHostAddresses;
+    private final LoadBalancingPolicy childPolicy;
+    private final int totalRequestedConnections;
+    private final Random secureRandom = new SecureRandom();

Review Comment:
   I do not think secure random is desired. The random instance is only used to 
shuffle. It does not require the output to be cryptographic. Using `Random` 
should suffice. 



##########
src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.Assertions;
+
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * A test for the SidecarLoadBalancingPolicy
+ */
+public class SidecarLoadBalancingPolicyTest extends IntegrationTestBase
+{
+
+    public static final int SIDECAR_MANAGED_INSTANCES = 2;
+
+    private static List<Host> getConnectedHosts(Set<Host> hosts)
+    {
+        return hosts.stream()
+                    .filter(DriverExtensions::hasActiveConnections)
+                    .collect(Collectors.toList());
+    }
+
+    protected int getNumInstancesToManage(int clusterSize)
+    {
+        return SIDECAR_MANAGED_INSTANCES; // we only want to manage the first 
2 instances in the "cluster"
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 6)
+    public void shouldMaintainMinimumConnections() throws ExecutionException, 
InterruptedException
+    {
+        Set<Host> hosts = 
sidecarTestContext.session().getCluster().getMetadata().getAllHosts();
+        List<Host> connectedHosts = getConnectedHosts(hosts);
+        // We manage 2 hosts, and ask for an additional 4 (the default) for 
connections.

Review Comment:
   ask for an additional 2?



##########
src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.Assertions;
+
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * A test for the SidecarLoadBalancingPolicy
+ */
+public class SidecarLoadBalancingPolicyTest extends IntegrationTestBase
+{
+
+    public static final int SIDECAR_MANAGED_INSTANCES = 2;
+
+    private static List<Host> getConnectedHosts(Set<Host> hosts)
+    {
+        return hosts.stream()
+                    .filter(DriverExtensions::hasActiveConnections)
+                    .collect(Collectors.toList());
+    }
+
+    protected int getNumInstancesToManage(int clusterSize)
+    {
+        return SIDECAR_MANAGED_INSTANCES; // we only want to manage the first 
2 instances in the "cluster"
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 6)
+    public void shouldMaintainMinimumConnections() throws ExecutionException, 
InterruptedException
+    {
+        Set<Host> hosts = 
sidecarTestContext.session().getCluster().getMetadata().getAllHosts();
+        List<Host> connectedHosts = getConnectedHosts(hosts);
+        // We manage 2 hosts, and ask for an additional 4 (the default) for 
connections.
+        // Therefore, we expect 4 hosts to have connections at startup.
+        int expectedConnections = SIDECAR_MANAGED_INSTANCES + 
SidecarLoadBalancingPolicy.MIN_ADDITIONAL_CONNECTIONS;
+        assertThat(connectedHosts.size()).isEqualTo(expectedConnections);
+        // Now, shut down one of the hosts and make sure that we connect to a 
different node
+        UpgradeableCluster cluster = sidecarTestContext.cluster();
+        IUpgradeableInstance inst = shutDownNonLocalInstance(
+        cluster,
+        sidecarTestContext.instancesConfig().instances());

Review Comment:
   There is no indentation for the method parameters. 
   
   ```suggestion
           IUpgradeableInstance inst = 
shutDownNonLocalInstance(sidecarTestContext.cluster(),
                                                                
sidecarTestContext.instancesConfig().instances());
   ```



##########
src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra 
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections 
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport 
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+    public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+    private final Set<Host> selectedHosts = new HashSet<>();
+    private final Set<InetSocketAddress> localHostAddresses;
+    private final LoadBalancingPolicy childPolicy;
+    private final int totalRequestedConnections;
+    private final Random secureRandom = new SecureRandom();
+    private final HashSet<Host> allHosts = new HashSet<>();
+    private Cluster cluster;
+
+    public SidecarLoadBalancingPolicy(List<InetSocketAddress> 
localHostAddresses,
+                                      String localDc,
+                                      int numAdditionalConnections)
+    {
+        this.childPolicy = createChildPolicy(localDc);
+        this.localHostAddresses = new HashSet<>(localHostAddresses);
+        if (numAdditionalConnections < MIN_ADDITIONAL_CONNECTIONS)
+        {
+            LOGGER.warn("Additional instances requested was {}, which is less 
than the minimum of {}. Using {}.",
+                        numAdditionalConnections, MIN_ADDITIONAL_CONNECTIONS, 
MIN_ADDITIONAL_CONNECTIONS);
+            numAdditionalConnections = MIN_ADDITIONAL_CONNECTIONS;
+        }
+        this.totalRequestedConnections = this.localHostAddresses.size() + 
numAdditionalConnections;
+    }
+
+    @Override
+    public void init(Cluster cluster, Collection<Host> hosts)
+    {
+        this.cluster = cluster;
+        this.allHosts.addAll(hosts);
+        recalculateSelectedHosts();
+        childPolicy.init(cluster, hosts);
+    }
+
+    @Override
+    public HostDistance distance(Host host)
+    {
+        if (!selectedHosts.contains(host))
+        {
+            return HostDistance.IGNORED;
+        }
+        return childPolicy.distance(host);
+    }
+
+    @Override
+    public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement 
statement)
+    {
+        Iterator<Host> child = childPolicy.newQueryPlan(loggedKeyspace, 
statement);
+        // Filter the child policy to only selected hosts
+        return Iterators.filter(child, selectedHosts::contains);
+    }
+
+    @Override
+    public synchronized void onAdd(Host host)
+    {
+        onUp(host);
+        childPolicy.onAdd(host);
+    }
+
+    @Override
+    public synchronized void onUp(Host host)
+    {
+        this.allHosts.add(host); // replace existing reference if there is one
+        if (selectedHosts.size() < totalRequestedConnections)
+        {
+            recalculateSelectedHosts();
+        }
+        childPolicy.onUp(host);
+    }
+
+    @Override
+    public synchronized void onDown(Host host)
+    {
+        // Don't remove local addresses from the selected host list
+        if (localHostAddresses.contains(host.getBroadcastRpcAddress()))
+        {
+            return;
+        }
+
+        boolean wasSelected = selectedHosts.remove(host);
+        if (!wasSelected)
+        {
+            // Non-selected nodes have been marked with HostDistance.IGNORED
+            // even if they may otherwise be useful. This has a side effect
+            // of preventing the driver from trying to reconnect to them
+            // if we miss the `onUp` event, so we need to schedule reconnects
+            // for these hosts explicitly unless we have active connections.
+            DriverExtensions.startPeriodicReconnectionAttempt(cluster, host);
+        }
+        recalculateSelectedHosts();
+        childPolicy.onDown(host);
+    }
+
+    @Override
+    public synchronized void onRemove(Host host)
+    {
+        this.allHosts.remove(host);
+        onDown(host);
+        childPolicy.onRemove(host);
+    }
+
+    @Override
+    public void close()
+    {
+        childPolicy.close();
+    }
+
+
+    /**
+     * Creates the child policy based on the presence of a local datacenter
+     * @param localDc the local datacenter to use, or null
+     * @return a {@link LoadBalancingPolicy}
+     */
+    private LoadBalancingPolicy createChildPolicy(String localDc)
+    {
+        if (localDc != null)
+        {
+            return 
DCAwareRoundRobinPolicy.builder().withLocalDc(localDc).build();
+        }
+        return new RoundRobinPolicy();
+    }
+
+    /**
+     * Adds any local hosts to the selected host list
+     *
+     * @param sourceHosts the list of hosts provided by the control connection
+     */
+    private void addLocalHostsToSelected(List<Host> sourceHosts)
+    {
+        Iterator<Host> hostIterator = sourceHosts.iterator();
+        while (hostIterator.hasNext())
+        {
+            Host host = hostIterator.next();
+            if (localHostAddresses.contains(host.getEndPoint().resolve()))
+            {
+                selectedHosts.add(host);
+                hostIterator.remove();
+            }
+        }
+    }
+
+    private synchronized void recalculateSelectedHosts()
+    {
+        // Copy the list to allow us to remove hosts as we build the list
+        List<Host> sourceHosts = new ArrayList<>(this.allHosts);
+        addLocalHostsToSelected(sourceHosts);
+        Collections.shuffle(sourceHosts, this.secureRandom);
+        Iterator<Host> hostIterator = sourceHosts.iterator();
+        while (selectedHosts.size() < totalRequestedConnections && 
hostIterator.hasNext())
+        {
+            Host host = hostIterator.next();
+            if (!selectedHosts.contains(host) && host.isUp())
+            {
+                selectedHosts.add(host);
+                hostIterator.remove();
+            }
+        }

Review Comment:
   The implementation would be easier to read by splitting `this.allHosts` into 
local and non-local host lists, and add the first N hosts from the shuffled 
non-local host list. 
   The current one iterates through list and remove items (from different 
methods), which is a bit harder to digest. 
   
   ```java
           Map<Boolean, List<Host>> partitionedHosts = allHosts.stream()
                                                               
.collect(Collectors.partitioningBy(host -> 
localHostAddresses.contains(host.getEndPoint().resolve())));
           List<Host> localHosts = partitionedHosts.get(true);
           if (localHosts == null)
           {
               // log warn
           }
           else
           {
               selectedHosts.addAll(localHosts);
           }
           List<Host> nonLocalHosts = partitionedHosts.get(false);
           if (nonLocalHosts == null)
           {
               // log warn and exit
           }
   
           Collections.shuffle(nonLocalHosts, this.secureRandom);
           // add first N hosts
   ```



##########
src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public interface DriverConfiguration
+{
+    /**
+     * A list of contact points to use for initial connection to Cassandra.
+     * At least 2 non-replica nodes are recommended.
+     * @return a list of contact points
+     */
+    List<InetSocketAddress> contactPoints();
+
+    /**
+     * The number of connections other than locally-managed nodes to use.
+     * The minimum is 2 - if your value is &lt; 2, the Sidecar will use 2.
+     * @return the number of connections to make to the cluster.
+     */
+    int numConnections();
+
+    /**
+     * The local datacenter to use for non-local queries to the cluster.
+     * @return the local datacenter, or null if no local datacenter is 
specified.
+     */
+    String localDc();

Review Comment:
   `String localDc()` means the sidecar instances in different DC have 
inconsistent configuration of the field. 
   It could be a pain for operations. 
   
   Sidecar should be able to figure out the name of the local DC via java 
driver. The config here could be `boolean shouldQueryLocalDcOnly()` (or some 
other name) to prevent the queries to the other DCs, when true. 



##########
src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.Promise;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.codec.BodyCodec;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.CassandraTestContext;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test CQLSessionProvider in a variety of cluster states
+ */
+@ExtendWith(VertxExtension.class)
+public class CQLSessionProviderTest extends IntegrationTestBase
+{
+
+    public static final String OK_KEYSPACE_RESPONSE_START = 
"{\"schema\":\"CREATE KEYSPACE ";
+    public static final String KEYSPACE_FAILED_RESPONSE_START = 
"{\"status\":\"Service Unavailable\",";
+
+    @CassandraIntegrationTest(nodesPerDc = 2, startCluster = false)
+    void testCqlSessionProviderWorksAsExpected(VertxTestContext context, 
CassandraTestContext cassandraTestContext)
+    throws Exception
+    {
+        UpgradeableCluster cluster = cassandraTestContext.getCluster();
+        testWithClient(context, false, webClient -> {
+
+                           // To start, both instances are stopped, so we 
should get 503s for both
+                           buildInstanceHealthRequest(webClient, "1")
+                           .send()
+                           .onSuccess(response -> 
assertHealthCheckFailed(response, context))
+                           .compose(_ignored ->
+                                    buildInstanceHealthRequest(webClient, "2")
+                                    .send()
+                                    .onSuccess(response -> 
assertHealthCheckFailed(response, context)))
+                           .compose(_ignored ->
+                                    buildKeyspaceRequest(webClient)
+                                    .send()
+                                    // With no instances available in the 
cluster, keyspace requests should fail
+                                    .onSuccess(response -> 
assertKeyspaceFailed(response, context)))
+                           .compose(_ignored -> {
+                               // Start instance 1 and check both again
+                               Promise<Void> promise = Promise.promise();
+                               new Thread(() -> {
+                                   cluster.get(1).startup();
+                                   // Instance 1 should now be up - wait for 
reconnect before testing
+                                   Uninterruptibles.sleepUninterruptibly(5000, 
TimeUnit.MILLISECONDS);
+                                   promise.complete();
+                               }
+                               ).start();
+                               return promise.future();
+                           })
+                           .compose(_ignored ->
+                                    buildInstanceHealthRequest(webClient, "1")
+                                    .send()
+                                    .onSuccess(response -> 
assertHealthCheckOk(response, context)))
+                           .compose(_ignored ->
+                                    buildInstanceHealthRequest(webClient, "2")
+                                    .send()
+                                    .onSuccess(response -> 
assertHealthCheckFailed(response, context))
+                           )
+                           .compose(_ignored ->
+                                    // Even with only 1 instance connected/up, 
we should still have keyspace metadata
+                                    buildKeyspaceRequest(webClient)
+                                    .send()
+                                    .onSuccess(response -> 
assertKeyspaceOk(response, context)))
+                           .compose(_ignored -> {
+                               // Start instance 2 and check both again
+                               Promise<Void> promise = Promise.promise();
+                               new Thread(() -> {
+                                   cluster.get(2).startup();
+                                   ClusterUtils.assertInRing(cluster.get(1), 
cluster.get(2));
+                                   // Instance 2 should now be up - wait for 
gossip
+                                   // and reconnect before testing
+                                   Uninterruptibles.sleepUninterruptibly(5000, 
TimeUnit.MILLISECONDS);
+                                   promise.complete();
+                               }
+                               ).start();
+                               return promise.future();
+                           })
+                           .compose(_ignored ->
+                                    buildInstanceHealthRequest(webClient, "1")
+                                    .send()
+                                    .onSuccess(response -> 
assertHealthCheckOk(response, context)))
+                           .compose(_ignored ->
+                                    buildInstanceHealthRequest(webClient, "2")
+                                    .send()
+                                    .onSuccess(response -> 
assertHealthCheckOk(response, context))
+                           )
+                           .onSuccess(_ignored -> context.completeNow())
+                           .onFailure(context::failNow);
+                       }
+        );
+    }
+
+

Review Comment:
   - Less indentation 
   - use `Future.future` instead of new thread in compose
   ```suggestion
           testWithClient(context, false, webClient -> {
   
               // To start, both instances are stopped, so we should get 503s 
for both
               buildInstanceHealthRequest(webClient, "1")
               .send()
               .onSuccess(response -> assertHealthCheckFailed(response, 
context))
               .compose(_ignored ->
                        buildInstanceHealthRequest(webClient, "2")
                        .send()
                        .onSuccess(response -> 
assertHealthCheckFailed(response, context)))
               .compose(_ignored ->
                        buildKeyspaceRequest(webClient)
                        .send()
                        // With no instances available in the cluster, keyspace 
requests should fail
                        .onSuccess(response -> assertKeyspaceFailed(response, 
context)))
               .compose(_ignored -> {
                   // Start instance 1 and check both again
                   return Future.future(promise -> {
                       cluster.get(1).startup();
                       // Instance 1 should now be up - wait for reconnect 
before testing
                       Uninterruptibles.sleepUninterruptibly(5000, 
TimeUnit.MILLISECONDS);
                       promise.complete();
                   });
               })
               .compose(_ignored ->
                        buildInstanceHealthRequest(webClient, "1")
                        .send()
                        .onSuccess(response -> assertHealthCheckOk(response, 
context)))
               .compose(_ignored ->
                        buildInstanceHealthRequest(webClient, "2")
                        .send()
                        .onSuccess(response -> 
assertHealthCheckFailed(response, context))
               )
               .compose(_ignored ->
                        // Even with only 1 instance connected/up, we should 
still have keyspace metadata
                        buildKeyspaceRequest(webClient)
                        .send()
                        .onSuccess(response -> assertKeyspaceOk(response, 
context)))
               .compose(_ignored -> {
                   // Start instance 2 and check both again
                   return Future.future(promise -> {
                       cluster.get(2).startup();
                       ClusterUtils.assertInRing(cluster.get(1), 
cluster.get(2));
                       // Instance 2 should now be up - wait for gossip
                       // and reconnect before testing
                       Uninterruptibles.sleepUninterruptibly(5000, 
TimeUnit.MILLISECONDS);
                       promise.complete();
                   });
               })
               .compose(_ignored ->
                        buildInstanceHealthRequest(webClient, "1")
                        .send()
                        .onSuccess(response -> assertHealthCheckOk(response, 
context)))
               .compose(_ignored ->
                        buildInstanceHealthRequest(webClient, "2")
                        .send()
                        .onSuccess(response -> assertHealthCheckOk(response, 
context))
               )
               .onSuccess(_ignored -> context.completeNow())
               .onFailure(context::failNow);
           });
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to