yifan-c commented on code in PR #74:
URL: https://github.com/apache/cassandra-sidecar/pull/74#discussion_r1406476449
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java:
##########
@@ -90,20 +99,43 @@ public Metadata metadata()
@Nullable
public NodeSettings nodeSettings()
{
- Session activeSession = session.localCql();
- if (activeSession == null)
- {
- return null;
- }
+ ResultSet rs = executeLocal("select release_version, partitioner from
system.local");
+ if (rs == null) return null;
Review Comment:
I do not think the inlined if statement is preferred. Can you break it into
multiple lines?
##########
common/src/main/java/com/datastax/driver/core/DriverExtensions.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datastax.driver.core;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A collection of methods that require access to package-private members in
the datastax driver.
+ */
+public class DriverExtensions
Review Comment:
nit: `DriverUtils` might be a more conventional name.
##########
common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraAdapter.java:
##########
@@ -33,8 +38,35 @@ public interface ICassandraAdapter
*/
Metadata metadata();
+ /**
+ * The {@link NodeSettings} instance for this instance.
Review Comment:
nit: it is a bit hard to read with "instance" referring to different things.
I would drop the first "instance" in the sentence.
##########
src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -166,8 +177,18 @@ private void healthCheckInternal()
try
{
- Row oneResult = activeSession.execute("select release_version,
partitioner from system.local")
- .one();
+ // NOTE: We cannot use `executeLocal` here as there may be no
adapter yet.
+ SimpleStatement healthCheckStatement =
+ new SimpleStatement("select release_version, partitioner from
system.local");
+ Host host =
DriverExtensions.getHost(activeSession.getCluster().getMetadata(),
localNativeTransportAddress);
+ if (host == null)
+ {
+ LOGGER.warn("Could not find host in cluster metadata by
address and port {}",
+ localNativeTransportAddress);
+ return;
+ }
+ healthCheckStatement.setHost(host);
Review Comment:
Consistency level needs to be `ONE`
##########
src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -284,7 +314,7 @@ public boolean isUp()
public void close()
{
markAsDownAndMaybeNotify();
- Session activeSession = cqlSessionProvider.close();
+ Session activeSession = cqlSessionProvider.getIfConnected();
Review Comment:
where is the session closed?
##########
src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public interface DriverConfiguration
+{
+ /**
+ * A list of contact points to use for initial connection to Cassandra.
+ * At least 2 non-replica nodes are recommended.
+ * @return a list of contact points
+ */
+ List<InetSocketAddress> contactPoints();
+
+ /**
+ * The number of connections other than locally-managed nodes to use.
+ * The minimum is 2 - if your value is < 2, the Sidecar will use 2.
Review Comment:
I would just use 'less than', instead of the html escaped characters.
##########
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java:
##########
@@ -43,13 +45,16 @@ public CassandraFactory(DnsResolver dnsResolver, String
sidecarVersion)
/**
* Returns a new adapter for Cassandra 4.0 clusters.
*
- * @param session the session to the Cassandra database
- * @param jmxClient the JMX client to connect to the Cassandra database
+ * @param session the session to the Cassandra database
+ * @param jmxClient the JMX client to connect to the
Cassandra database
+ * @param localNativeTransportAddress the address and port on which this
instance is configured to listen
Review Comment:
There is no need to align the parameter descriptions, which leads to
unrelated changes when adding a new param.
##########
common/src/main/java/com/datastax/driver/core/DriverExtensions.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datastax.driver.core;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A collection of methods that require access to package-private members in
the datastax driver.
+ */
+public class DriverExtensions
+{
+ /**
+ * Check if a host has active connections
+ *
+ * @param host the host to check
+ * @return true if the host has active connections, false otherwise
+ */
+ public static boolean hasActiveConnections(Host host)
Review Comment:
+1 on adding the annotation
##########
src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public interface DriverConfiguration
+{
+ /**
+ * A list of contact points to use for initial connection to Cassandra.
+ * At least 2 non-replica nodes are recommended.
Review Comment:
Does "non-replica nodes" imply that the config is heterogeneous across
sidecar instances? My understanding is that the contact points are the subset
of the seed nodes and it is the same across all sidecar instances.
##########
src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+ public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+ private final Set<Host> selectedHosts = new HashSet<>();
+ private final Set<InetSocketAddress> localHostAddresses;
+ private final LoadBalancingPolicy childPolicy;
+ private final int totalRequestedConnections;
+ private final Random secureRandom = new SecureRandom();
+ private final HashSet<Host> allHosts = new HashSet<>();
+ private Cluster cluster;
+
+ public SidecarLoadBalancingPolicy(List<InetSocketAddress>
localHostAddresses,
+ String localDc,
+ int numAdditionalConnections)
+ {
+ this.childPolicy = createChildPolicy(localDc);
+ this.localHostAddresses = new HashSet<>(localHostAddresses);
+ if (numAdditionalConnections < MIN_ADDITIONAL_CONNECTIONS)
+ {
+ LOGGER.warn("Additional instances requested was {}, which is less
than the minimum of {}. Using {}.",
+ numAdditionalConnections, MIN_ADDITIONAL_CONNECTIONS,
MIN_ADDITIONAL_CONNECTIONS);
+ numAdditionalConnections = MIN_ADDITIONAL_CONNECTIONS;
+ }
+ this.totalRequestedConnections = this.localHostAddresses.size() +
numAdditionalConnections;
+ }
+
+ @Override
+ public void init(Cluster cluster, Collection<Host> hosts)
+ {
+ this.cluster = cluster;
+ this.allHosts.addAll(hosts);
+ recalculateSelectedHosts();
+ childPolicy.init(cluster, hosts);
+ }
+
+ @Override
+ public HostDistance distance(Host host)
+ {
+ if (!selectedHosts.contains(host))
+ {
+ return HostDistance.IGNORED;
+ }
+ return childPolicy.distance(host);
+ }
+
+ @Override
+ public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement
statement)
+ {
+ Iterator<Host> child = childPolicy.newQueryPlan(loggedKeyspace,
statement);
+ // Filter the child policy to only selected hosts
+ return Iterators.filter(child, selectedHosts::contains);
+ }
+
+ @Override
+ public synchronized void onAdd(Host host)
+ {
+ onUp(host);
+ childPolicy.onAdd(host);
+ }
+
+ @Override
+ public synchronized void onUp(Host host)
+ {
+ this.allHosts.add(host); // replace existing reference if there is one
+ if (selectedHosts.size() < totalRequestedConnections)
+ {
+ recalculateSelectedHosts();
+ }
+ childPolicy.onUp(host);
+ }
+
+ @Override
+ public synchronized void onDown(Host host)
+ {
+ // Don't remove local addresses from the selected host list
+ if (localHostAddresses.contains(host.getBroadcastRpcAddress()))
+ {
+ return;
+ }
Review Comment:
Why prevent removing a down local host? It cannot serve queries.
Besides that, how about adding a warn log?
##########
src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -196,7 +217,6 @@ private void healthCheckInternal()
// Unregister the host listener and nullify the session in order
to get a new object.
markAsDownAndMaybeNotify();
maybeUnregisterHostListener(activeSession);
- cqlSessionProvider.close();
Review Comment:
is it removed by mistake?
##########
common/src/main/java/com/datastax/driver/core/DriverExtensions.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datastax.driver.core;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A collection of methods that require access to package-private members in
the datastax driver.
+ */
+public class DriverExtensions
+{
+ /**
+ * Check if a host has active connections
+ *
+ * @param host the host to check
+ * @return true if the host has active connections, false otherwise
+ */
+ public static boolean hasActiveConnections(Host host)
+ {
+ return host.convictionPolicy.hasActiveConnections();
+ }
+
+ /**
+ * Start attempting to reconnect to the given host, as hosts with
`IGNORED` distance aren't attempted
+ * and the SidecarLoadBalancingPolicy marks non-selected nodes as IGNORED
until they need to rotate in.
+ *
+ * @param cluster The cluster object
+ * @param host the host to which reconnect attempts will be made
+ */
+ public static void startPeriodicReconnectionAttempt(Cluster cluster, Host
host)
+ {
+ cluster.manager.startPeriodicReconnectionAttempt(host, false);
+ }
+
+ /**
+ * Gets a Host instance from metadata based on the native transport address
+ *
+ * @param metadata the {@link Metadata} instance to
search for the host
+ * @param localNativeTransportAddress the native transport ip address and
port for the host to find
+ * @return the {@link Host} instance if found, else null
+ */
+ public static Host getHost(Metadata metadata, InetSocketAddress
localNativeTransportAddress)
+ {
+ // Because the driver can sometimes mess up the broadcast address, we
need to search by endpoint
+ // which is what it actually uses to connect to the cluster.
Therefore, create a TranslatedAddressEndpoint
+ // to use for searching. It has to be one of these because that's what
the driver is using internally,
+ // and the `.equals` method used when searching checks the type
explicitly.
+ TranslatedAddressEndPoint endPoint = new
TranslatedAddressEndPoint(localNativeTransportAddress);
+ return metadata.getHost(endPoint);
Review Comment:
Consider caching the lookup result? The `metadata.getHost` lookup is an O(n)
operation.
##########
common/src/main/java/org/apache/cassandra/sidecar/common/CQLSessionProvider.java:
##########
@@ -18,159 +18,36 @@
package org.apache.cassandra.sidecar.common;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.NettyOptions;
-import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.DriverException;
-import com.datastax.driver.core.exceptions.DriverInternalError;
-import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
-import com.datastax.driver.core.policies.ReconnectionPolicy;
-import com.datastax.driver.core.policies.RoundRobinPolicy;
-import com.datastax.driver.core.policies.WhiteListPolicy;
import org.jetbrains.annotations.Nullable;
/**
- * Provides connections to the local Cassandra cluster as defined in the
Configuration. Currently, it only supports
- * returning the local connection.
+ * A provider of a CQL Session. The session should be connected to:
+ * <ul>
+ * <li>All locally-managed instances.</li>
+ * <li>At least one non-local instance. Preferably, at least two
non-replica instances.</li>
+ * </ul>
*/
-public class CQLSessionProvider
+public interface CQLSessionProvider
{
- private static final Logger logger =
LoggerFactory.getLogger(CQLSessionProvider.class);
-
- @Nullable
- private Session localSession;
- private final InetSocketAddress inet;
- private final WhiteListPolicy wlp;
- private final NettyOptions nettyOptions;
- private final QueryOptions queryOptions;
- private final ReconnectionPolicy reconnectionPolicy;
-
- public CQLSessionProvider(String host, int port, int healthCheckInterval)
- {
- // this was originally using unresolved Inet addresses, but it would
fail when trying to
- // connect to a docker container
- logger.info("Connecting to {} on port {}", host, port);
- inet = new InetSocketAddress(host, port);
-
- wlp = new WhiteListPolicy(new RoundRobinPolicy(),
Collections.singletonList(inet));
- this.nettyOptions = new NettyOptions();
- this.queryOptions = new
QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
- this.reconnectionPolicy = new ExponentialReconnectionPolicy(1000,
healthCheckInterval);
- }
-
- public CQLSessionProvider(InetSocketAddress target, NettyOptions options)
- {
- inet = target;
- wlp = new WhiteListPolicy(new RoundRobinPolicy(),
Collections.singletonList(inet));
- this.nettyOptions = options;
- this.queryOptions = new
QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
- reconnectionPolicy = new ExponentialReconnectionPolicy(100, 1000);
- }
-
/**
- * Provides a Session connected only to the local node from configuration.
If null it means the connection was
- * not able to be established. The session still might throw a
NoHostAvailableException if the local host goes
- * offline or otherwise unavailable.
+ * Provides a Session connected to the cluster. If null it means the
connection was
+ * could not be established. The session still might throw a
NoHostAvailableException if the
+ * cluster is otherwise unreachable.
*
* @return Session
*/
- @Nullable
- public synchronized Session localCql()
- {
- Cluster cluster = null;
- try
- {
- if (localSession == null)
- {
- logger.info("Connecting to {}", inet);
- cluster = Cluster.builder()
- .addContactPointsWithPorts(inet)
- .withLoadBalancingPolicy(wlp)
- .withQueryOptions(queryOptions)
- .withReconnectionPolicy(reconnectionPolicy)
- .withoutMetrics()
- // tests can create a lot of these Cluster
objects, to avoid creating HWTs and
- // event thread pools for each we have the
override
- .withNettyOptions(nettyOptions)
- .build();
- localSession = cluster.connect();
- logger.info("Successfully connected to Cassandra instance!");
- }
- }
- catch (Exception e)
- {
- logger.error("Failed to reach Cassandra", e);
- if (cluster != null)
- {
- try
- {
- cluster.close();
- }
- catch (Exception ex)
- {
- logger.error("Failed to close cluster in cleanup", ex);
- }
- }
- }
- return localSession;
- }
+ @Nullable Session get();
- public Session close()
- {
- Session localSession;
- synchronized (this)
- {
- localSession = this.localSession;
- this.localSession = null;
- }
-
- if (localSession != null)
- {
- try
- {
- localSession.getCluster().closeAsync().get(1,
TimeUnit.MINUTES);
- localSession.closeAsync().get(1, TimeUnit.MINUTES);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- catch (TimeoutException e)
- {
- logger.warn("Unable to close session after 1 minute for
provider {}", this, e);
- }
- catch (ExecutionException e)
- {
- throw propagateCause(e);
- }
- }
- return localSession;
- }
-
- static RuntimeException propagateCause(ExecutionException e)
- {
- Throwable cause = e.getCause();
-
- if (cause instanceof Error) throw ((Error) cause);
+ /**
+ * Closes the CQLSessionProvider
+ */
+ void close();
- // We could just rethrow e.getCause(). However, the cause of the
ExecutionException has likely
- // been
- // created on the I/O thread receiving the response. Which means that
the stacktrace associated
- // with said cause will make no mention of the current thread. This is
painful for say, finding
- // out which execute() statement actually raised the exception. So
instead, we re-create the
- // exception.
- if (cause instanceof DriverException) throw ((DriverException)
cause).copy();
- else throw new DriverInternalError("Unexpected exception thrown",
cause);
- }
+ /**
+ * Gets the current Session object if it already exists.
+ * Otherwise, returns null.
+ * @return the connected {@link Session} object if available. Null
otherwise.
+ */
+ Session getIfConnected();
Review Comment:
Add `@Nullable` too?
##########
src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datastax.driver.core.exceptions.DriverInternalError;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.apache.cassandra.sidecar.config.DriverConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Provides connections to the local Cassandra cluster as defined in the
Configuration. Currently, it only supports
+ * returning the local connection.
+ */
+public class CQLSessionProviderImpl implements CQLSessionProvider
+{
+ private static final Logger logger =
LoggerFactory.getLogger(CQLSessionProviderImpl.class);
+ private final List<InetSocketAddress> contactPoints;
+ private final int numConnections;
+ private final String localDc;
+ private final NettyOptions nettyOptions;
+ private final ReconnectionPolicy reconnectionPolicy;
+ private final List<InetSocketAddress> localInstances;
+ @Nullable
+ private volatile Session session;
+
+ @VisibleForTesting
+ public CQLSessionProviderImpl(List<InetSocketAddress> contactPoints,
+ List<InetSocketAddress> localInstances,
+ int healthCheckFrequencyMillis,
+ String localDc,
+ int numConnections,
+ NettyOptions options)
+ {
+ this.contactPoints = contactPoints;
+ this.localInstances = localInstances;
+ this.localDc = localDc;
+ this.numConnections = numConnections;
+ this.nettyOptions = options;
+ this.reconnectionPolicy = new ExponentialReconnectionPolicy(500,
healthCheckFrequencyMillis);
+ }
+
+ public CQLSessionProviderImpl(SidecarConfiguration configuration,
+ NettyOptions options)
+ {
+ DriverConfiguration driverConfiguration =
configuration.driverConfiguration();
+ this.contactPoints = driverConfiguration.contactPoints();
+ this.localInstances = configuration.cassandraInstances()
+ .stream()
+ .map(i -> new
InetSocketAddress(i.host(), i.port()))
+ .collect(Collectors.toList());
+ this.localDc = driverConfiguration.localDc();
+ this.numConnections = driverConfiguration.numConnections();
+ this.nettyOptions = options;
+ int maxDelayMs =
configuration.healthCheckConfiguration().checkIntervalMillis();
+ this.reconnectionPolicy = new ExponentialReconnectionPolicy(500,
maxDelayMs);
+ }
+
+ static RuntimeException propagateCause(ExecutionException e)
+ {
+ Throwable cause = e.getCause();
+
+ if (cause instanceof Error) throw ((Error) cause);
+
+ // We could just rethrow e.getCause(). However, the cause of the
ExecutionException has likely
+ // been
+ // created on the I/O thread receiving the response. Which means that
the stacktrace associated
+ // with said cause will make no mention of the current thread. This is
painful for say, finding
+ // out which execute() statement actually raised the exception. So
instead, we re-create the
+ // exception.
+ if (cause instanceof DriverException) throw ((DriverException)
cause).copy();
+ else throw new DriverInternalError("Unexpected exception thrown",
cause);
+ }
+
+ /**
+ * Provides a Session connected to the cluster. If null it means the
connection was
+ * could not be established. The session still might throw a
NoHostAvailableException if the
+ * cluster is otherwise unreachable.
+ *
+ * @return Session
+ */
+ @Nullable
+ public synchronized Session get()
+ {
+ Cluster cluster = null;
+ try
+ {
+ if (session == null)
+ {
+ logger.info("Connecting to cluster using contact points {}",
contactPoints);
+
+ LoadBalancingPolicy lbp = new
SidecarLoadBalancingPolicy(localInstances, localDc, numConnections);
+ // Prevent spurious reconnects of ignored down nodes on `onUp`
events
+ QueryOptions queryOptions = new
QueryOptions().setReprepareOnUp(false);
+ cluster = Cluster.builder()
+ .addContactPointsWithPorts(contactPoints)
+ .withReconnectionPolicy(reconnectionPolicy)
+ .withoutMetrics()
+ .withLoadBalancingPolicy(lbp)
+ .withQueryOptions(queryOptions)
+ // tests can create a lot of these Cluster
objects, to avoid creating HWTs and
+ // event thread pools for each we have the
override
+ .withNettyOptions(nettyOptions)
+ .build();
+ session = cluster.connect();
+ logger.info("Successfully connected to Cassandra!");
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error("Failed to reach Cassandra", e);
+ if (cluster != null)
+ {
+ try
+ {
+ cluster.close();
+ }
+ catch (Exception ex)
+ {
+ logger.error("Failed to close cluster in cleanup", ex);
+ }
+ }
+ }
+ return session;
+ }
Review Comment:
How about minimizing the try-catch block by returning the non-null session
early?
```suggestion
public synchronized Session get()
{
Cluster cluster = null;
if (session != null)
{
return session;
}
try
{
logger.info("Connecting to cluster using contact points {}",
contactPoints);
LoadBalancingPolicy lbp = new
SidecarLoadBalancingPolicy(localInstances, localDc, numConnections);
// Prevent spurious reconnects of ignored down nodes on `onUp`
events
QueryOptions queryOptions = new
QueryOptions().setReprepareOnUp(false);
cluster = Cluster.builder()
.addContactPointsWithPorts(contactPoints)
.withReconnectionPolicy(reconnectionPolicy)
.withoutMetrics()
.withLoadBalancingPolicy(lbp)
.withQueryOptions(queryOptions)
// tests can create a lot of these Cluster
objects, to avoid creating HWTs and
// event thread pools for each we have the
override
.withNettyOptions(nettyOptions)
.build();
session = cluster.connect();
logger.info("Successfully connected to Cassandra!");
}
catch (Exception e)
{
logger.error("Failed to reach Cassandra", e);
if (cluster != null)
{
try
{
cluster.close();
}
catch (Exception ex)
{
logger.error("Failed to close cluster in cleanup", ex);
}
}
}
return session;
}
```
##########
src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -166,8 +177,18 @@ private void healthCheckInternal()
try
{
- Row oneResult = activeSession.execute("select release_version,
partitioner from system.local")
- .one();
+ // NOTE: We cannot use `executeLocal` here as there may be no
adapter yet.
+ SimpleStatement healthCheckStatement =
+ new SimpleStatement("select release_version, partitioner from
system.local");
+ Host host =
DriverExtensions.getHost(activeSession.getCluster().getMetadata(),
localNativeTransportAddress);
Review Comment:
It is an indicator that the helper methods, `executeLocal`, do not belong to
adapter. How about moving them into a utility class? So that the code is better
reused. For example, the alternative is
```java
public class SessionUtils
{
public ResultSet executeLocal(Session session,
InetSocketAddress
localNativeTransportAddress,
String query)
{
return executeLocal(session, localNativeTransportAddress, new
SimpleStatement(query));
}
public ResultSet executeLocal(Session session,
InetSocketAddress
localNativeTransportAddress,
Statement statement)
{
Host host =
DriverExtensions.getHost(session.getCluster().getMetadata(),
localNativeTransportAddress);
if (host == null)
{
LOGGER.warn("Could not find host in cluster metadata by address
and port {}",
localNativeTransportAddress);
throw new NoHostAvailableException(Collections.emptyMap());
}
statement.setHost(host);
statement.setConsistencyLevel(ConsistencyLevel.ONE);
return session.execute(statement);
}
}
```
##########
src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+ public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
Review Comment:
"Additional" is relative/implicit, how about `MIN_NON_LOCAL_CONNECTIONS`? Or
add a comment to explain what is the additional connections.
##########
src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+ public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+ private final Set<Host> selectedHosts = new HashSet<>();
+ private final Set<InetSocketAddress> localHostAddresses;
+ private final LoadBalancingPolicy childPolicy;
+ private final int totalRequestedConnections;
+ private final Random secureRandom = new SecureRandom();
Review Comment:
I do not think secure random is desired. The random instance is only used to
shuffle. It does not require the output to be cryptographic. Using `Random`
should suffice.
##########
src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.Assertions;
+
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * A test for the SidecarLoadBalancingPolicy
+ */
+public class SidecarLoadBalancingPolicyTest extends IntegrationTestBase
+{
+
+ public static final int SIDECAR_MANAGED_INSTANCES = 2;
+
+ private static List<Host> getConnectedHosts(Set<Host> hosts)
+ {
+ return hosts.stream()
+ .filter(DriverExtensions::hasActiveConnections)
+ .collect(Collectors.toList());
+ }
+
+ protected int getNumInstancesToManage(int clusterSize)
+ {
+ return SIDECAR_MANAGED_INSTANCES; // we only want to manage the first
2 instances in the "cluster"
+ }
+
+ @CassandraIntegrationTest(nodesPerDc = 6)
+ public void shouldMaintainMinimumConnections() throws ExecutionException,
InterruptedException
+ {
+ Set<Host> hosts =
sidecarTestContext.session().getCluster().getMetadata().getAllHosts();
+ List<Host> connectedHosts = getConnectedHosts(hosts);
+ // We manage 2 hosts, and ask for an additional 4 (the default) for
connections.
Review Comment:
ask for an additional 2?
##########
src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.Assertions;
+
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * A test for the SidecarLoadBalancingPolicy
+ */
+public class SidecarLoadBalancingPolicyTest extends IntegrationTestBase
+{
+
+ public static final int SIDECAR_MANAGED_INSTANCES = 2;
+
+ private static List<Host> getConnectedHosts(Set<Host> hosts)
+ {
+ return hosts.stream()
+ .filter(DriverExtensions::hasActiveConnections)
+ .collect(Collectors.toList());
+ }
+
+ protected int getNumInstancesToManage(int clusterSize)
+ {
+ return SIDECAR_MANAGED_INSTANCES; // we only want to manage the first
2 instances in the "cluster"
+ }
+
+ @CassandraIntegrationTest(nodesPerDc = 6)
+ public void shouldMaintainMinimumConnections() throws ExecutionException,
InterruptedException
+ {
+ Set<Host> hosts =
sidecarTestContext.session().getCluster().getMetadata().getAllHosts();
+ List<Host> connectedHosts = getConnectedHosts(hosts);
+ // We manage 2 hosts, and ask for an additional 4 (the default) for
connections.
+ // Therefore, we expect 4 hosts to have connections at startup.
+ int expectedConnections = SIDECAR_MANAGED_INSTANCES +
SidecarLoadBalancingPolicy.MIN_ADDITIONAL_CONNECTIONS;
+ assertThat(connectedHosts.size()).isEqualTo(expectedConnections);
+ // Now, shut down one of the hosts and make sure that we connect to a
different node
+ UpgradeableCluster cluster = sidecarTestContext.cluster();
+ IUpgradeableInstance inst = shutDownNonLocalInstance(
+ cluster,
+ sidecarTestContext.instancesConfig().instances());
Review Comment:
There is no indentation for the method parameters.
```suggestion
IUpgradeableInstance inst =
shutDownNonLocalInstance(sidecarTestContext.cluster(),
sidecarTestContext.instancesConfig().instances());
```
##########
src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverExtensions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra
Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections
to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport
disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+ public static final int MIN_ADDITIONAL_CONNECTIONS = 2;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+ private final Set<Host> selectedHosts = new HashSet<>();
+ private final Set<InetSocketAddress> localHostAddresses;
+ private final LoadBalancingPolicy childPolicy;
+ private final int totalRequestedConnections;
+ private final Random secureRandom = new SecureRandom();
+ private final HashSet<Host> allHosts = new HashSet<>();
+ private Cluster cluster;
+
+ public SidecarLoadBalancingPolicy(List<InetSocketAddress>
localHostAddresses,
+ String localDc,
+ int numAdditionalConnections)
+ {
+ this.childPolicy = createChildPolicy(localDc);
+ this.localHostAddresses = new HashSet<>(localHostAddresses);
+ if (numAdditionalConnections < MIN_ADDITIONAL_CONNECTIONS)
+ {
+ LOGGER.warn("Additional instances requested was {}, which is less
than the minimum of {}. Using {}.",
+ numAdditionalConnections, MIN_ADDITIONAL_CONNECTIONS,
MIN_ADDITIONAL_CONNECTIONS);
+ numAdditionalConnections = MIN_ADDITIONAL_CONNECTIONS;
+ }
+ this.totalRequestedConnections = this.localHostAddresses.size() +
numAdditionalConnections;
+ }
+
+ @Override
+ public void init(Cluster cluster, Collection<Host> hosts)
+ {
+ this.cluster = cluster;
+ this.allHosts.addAll(hosts);
+ recalculateSelectedHosts();
+ childPolicy.init(cluster, hosts);
+ }
+
+ @Override
+ public HostDistance distance(Host host)
+ {
+ if (!selectedHosts.contains(host))
+ {
+ return HostDistance.IGNORED;
+ }
+ return childPolicy.distance(host);
+ }
+
+ @Override
+ public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement
statement)
+ {
+ Iterator<Host> child = childPolicy.newQueryPlan(loggedKeyspace,
statement);
+ // Filter the child policy to only selected hosts
+ return Iterators.filter(child, selectedHosts::contains);
+ }
+
+ @Override
+ public synchronized void onAdd(Host host)
+ {
+ onUp(host);
+ childPolicy.onAdd(host);
+ }
+
+ @Override
+ public synchronized void onUp(Host host)
+ {
+ this.allHosts.add(host); // replace existing reference if there is one
+ if (selectedHosts.size() < totalRequestedConnections)
+ {
+ recalculateSelectedHosts();
+ }
+ childPolicy.onUp(host);
+ }
+
+ @Override
+ public synchronized void onDown(Host host)
+ {
+ // Don't remove local addresses from the selected host list
+ if (localHostAddresses.contains(host.getBroadcastRpcAddress()))
+ {
+ return;
+ }
+
+ boolean wasSelected = selectedHosts.remove(host);
+ if (!wasSelected)
+ {
+ // Non-selected nodes have been marked with HostDistance.IGNORED
+ // even if they may otherwise be useful. This has a side effect
+ // of preventing the driver from trying to reconnect to them
+ // if we miss the `onUp` event, so we need to schedule reconnects
+ // for these hosts explicitly unless we have active connections.
+ DriverExtensions.startPeriodicReconnectionAttempt(cluster, host);
+ }
+ recalculateSelectedHosts();
+ childPolicy.onDown(host);
+ }
+
+ @Override
+ public synchronized void onRemove(Host host)
+ {
+ this.allHosts.remove(host);
+ onDown(host);
+ childPolicy.onRemove(host);
+ }
+
+ @Override
+ public void close()
+ {
+ childPolicy.close();
+ }
+
+
+ /**
+ * Creates the child policy based on the presence of a local datacenter
+ * @param localDc the local datacenter to use, or null
+ * @return a {@link LoadBalancingPolicy}
+ */
+ private LoadBalancingPolicy createChildPolicy(String localDc)
+ {
+ if (localDc != null)
+ {
+ return
DCAwareRoundRobinPolicy.builder().withLocalDc(localDc).build();
+ }
+ return new RoundRobinPolicy();
+ }
+
+ /**
+ * Adds any local hosts to the selected host list
+ *
+ * @param sourceHosts the list of hosts provided by the control connection
+ */
+ private void addLocalHostsToSelected(List<Host> sourceHosts)
+ {
+ Iterator<Host> hostIterator = sourceHosts.iterator();
+ while (hostIterator.hasNext())
+ {
+ Host host = hostIterator.next();
+ if (localHostAddresses.contains(host.getEndPoint().resolve()))
+ {
+ selectedHosts.add(host);
+ hostIterator.remove();
+ }
+ }
+ }
+
+ private synchronized void recalculateSelectedHosts()
+ {
+ // Copy the list to allow us to remove hosts as we build the list
+ List<Host> sourceHosts = new ArrayList<>(this.allHosts);
+ addLocalHostsToSelected(sourceHosts);
+ Collections.shuffle(sourceHosts, this.secureRandom);
+ Iterator<Host> hostIterator = sourceHosts.iterator();
+ while (selectedHosts.size() < totalRequestedConnections &&
hostIterator.hasNext())
+ {
+ Host host = hostIterator.next();
+ if (!selectedHosts.contains(host) && host.isUp())
+ {
+ selectedHosts.add(host);
+ hostIterator.remove();
+ }
+ }
Review Comment:
The implementation would be easier to read by splitting `this.allHosts` into
local and non-local host lists, and add the first N hosts from the shuffled
non-local host list.
The current one iterates through list and remove items (from different
methods), which is a bit harder to digest.
```java
Map<Boolean, List<Host>> partitionedHosts = allHosts.stream()
.collect(Collectors.partitioningBy(host ->
localHostAddresses.contains(host.getEndPoint().resolve())));
List<Host> localHosts = partitionedHosts.get(true);
if (localHosts == null)
{
// log warn
}
else
{
selectedHosts.addAll(localHosts);
}
List<Host> nonLocalHosts = partitionedHosts.get(false);
if (nonLocalHosts == null)
{
// log warn and exit
}
Collections.shuffle(nonLocalHosts, this.secureRandom);
// add first N hosts
```
##########
src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public interface DriverConfiguration
+{
+ /**
+ * A list of contact points to use for initial connection to Cassandra.
+ * At least 2 non-replica nodes are recommended.
+ * @return a list of contact points
+ */
+ List<InetSocketAddress> contactPoints();
+
+ /**
+ * The number of connections other than locally-managed nodes to use.
+ * The minimum is 2 - if your value is < 2, the Sidecar will use 2.
+ * @return the number of connections to make to the cluster.
+ */
+ int numConnections();
+
+ /**
+ * The local datacenter to use for non-local queries to the cluster.
+ * @return the local datacenter, or null if no local datacenter is
specified.
+ */
+ String localDc();
Review Comment:
`String localDc()` means the sidecar instances in different DC have
inconsistent configuration of the field.
It could be a pain for operations.
Sidecar should be able to figure out the name of the local DC via java
driver. The config here could be `boolean shouldQueryLocalDcOnly()` (or some
other name) to prevent the queries to the other DCs, when true.
##########
src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.Promise;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.codec.BodyCodec;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.CassandraTestContext;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static
io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test CQLSessionProvider in a variety of cluster states
+ */
+@ExtendWith(VertxExtension.class)
+public class CQLSessionProviderTest extends IntegrationTestBase
+{
+
+ public static final String OK_KEYSPACE_RESPONSE_START =
"{\"schema\":\"CREATE KEYSPACE ";
+ public static final String KEYSPACE_FAILED_RESPONSE_START =
"{\"status\":\"Service Unavailable\",";
+
+ @CassandraIntegrationTest(nodesPerDc = 2, startCluster = false)
+ void testCqlSessionProviderWorksAsExpected(VertxTestContext context,
CassandraTestContext cassandraTestContext)
+ throws Exception
+ {
+ UpgradeableCluster cluster = cassandraTestContext.getCluster();
+ testWithClient(context, false, webClient -> {
+
+ // To start, both instances are stopped, so we
should get 503s for both
+ buildInstanceHealthRequest(webClient, "1")
+ .send()
+ .onSuccess(response ->
assertHealthCheckFailed(response, context))
+ .compose(_ignored ->
+ buildInstanceHealthRequest(webClient, "2")
+ .send()
+ .onSuccess(response ->
assertHealthCheckFailed(response, context)))
+ .compose(_ignored ->
+ buildKeyspaceRequest(webClient)
+ .send()
+ // With no instances available in the
cluster, keyspace requests should fail
+ .onSuccess(response ->
assertKeyspaceFailed(response, context)))
+ .compose(_ignored -> {
+ // Start instance 1 and check both again
+ Promise<Void> promise = Promise.promise();
+ new Thread(() -> {
+ cluster.get(1).startup();
+ // Instance 1 should now be up - wait for
reconnect before testing
+ Uninterruptibles.sleepUninterruptibly(5000,
TimeUnit.MILLISECONDS);
+ promise.complete();
+ }
+ ).start();
+ return promise.future();
+ })
+ .compose(_ignored ->
+ buildInstanceHealthRequest(webClient, "1")
+ .send()
+ .onSuccess(response ->
assertHealthCheckOk(response, context)))
+ .compose(_ignored ->
+ buildInstanceHealthRequest(webClient, "2")
+ .send()
+ .onSuccess(response ->
assertHealthCheckFailed(response, context))
+ )
+ .compose(_ignored ->
+ // Even with only 1 instance connected/up,
we should still have keyspace metadata
+ buildKeyspaceRequest(webClient)
+ .send()
+ .onSuccess(response ->
assertKeyspaceOk(response, context)))
+ .compose(_ignored -> {
+ // Start instance 2 and check both again
+ Promise<Void> promise = Promise.promise();
+ new Thread(() -> {
+ cluster.get(2).startup();
+ ClusterUtils.assertInRing(cluster.get(1),
cluster.get(2));
+ // Instance 2 should now be up - wait for
gossip
+ // and reconnect before testing
+ Uninterruptibles.sleepUninterruptibly(5000,
TimeUnit.MILLISECONDS);
+ promise.complete();
+ }
+ ).start();
+ return promise.future();
+ })
+ .compose(_ignored ->
+ buildInstanceHealthRequest(webClient, "1")
+ .send()
+ .onSuccess(response ->
assertHealthCheckOk(response, context)))
+ .compose(_ignored ->
+ buildInstanceHealthRequest(webClient, "2")
+ .send()
+ .onSuccess(response ->
assertHealthCheckOk(response, context))
+ )
+ .onSuccess(_ignored -> context.completeNow())
+ .onFailure(context::failNow);
+ }
+ );
+ }
+
+
Review Comment:
- Less indentation
- use `Future.future` instead of new thread in compose
```suggestion
testWithClient(context, false, webClient -> {
// To start, both instances are stopped, so we should get 503s
for both
buildInstanceHealthRequest(webClient, "1")
.send()
.onSuccess(response -> assertHealthCheckFailed(response,
context))
.compose(_ignored ->
buildInstanceHealthRequest(webClient, "2")
.send()
.onSuccess(response ->
assertHealthCheckFailed(response, context)))
.compose(_ignored ->
buildKeyspaceRequest(webClient)
.send()
// With no instances available in the cluster, keyspace
requests should fail
.onSuccess(response -> assertKeyspaceFailed(response,
context)))
.compose(_ignored -> {
// Start instance 1 and check both again
return Future.future(promise -> {
cluster.get(1).startup();
// Instance 1 should now be up - wait for reconnect
before testing
Uninterruptibles.sleepUninterruptibly(5000,
TimeUnit.MILLISECONDS);
promise.complete();
});
})
.compose(_ignored ->
buildInstanceHealthRequest(webClient, "1")
.send()
.onSuccess(response -> assertHealthCheckOk(response,
context)))
.compose(_ignored ->
buildInstanceHealthRequest(webClient, "2")
.send()
.onSuccess(response ->
assertHealthCheckFailed(response, context))
)
.compose(_ignored ->
// Even with only 1 instance connected/up, we should
still have keyspace metadata
buildKeyspaceRequest(webClient)
.send()
.onSuccess(response -> assertKeyspaceOk(response,
context)))
.compose(_ignored -> {
// Start instance 2 and check both again
return Future.future(promise -> {
cluster.get(2).startup();
ClusterUtils.assertInRing(cluster.get(1),
cluster.get(2));
// Instance 2 should now be up - wait for gossip
// and reconnect before testing
Uninterruptibles.sleepUninterruptibly(5000,
TimeUnit.MILLISECONDS);
promise.complete();
});
})
.compose(_ignored ->
buildInstanceHealthRequest(webClient, "1")
.send()
.onSuccess(response -> assertHealthCheckOk(response,
context)))
.compose(_ignored ->
buildInstanceHealthRequest(webClient, "2")
.send()
.onSuccess(response -> assertHealthCheckOk(response,
context))
)
.onSuccess(_ignored -> context.completeNow())
.onFailure(context::failNow);
});
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]