This is an automated email from the ASF dual-hosted git repository. blerer pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 138569b Open java driver connections in CQLTester in a lazy way 138569b is described below commit 138569b079b3d17b1020a24463adabecd903b79f Author: Benjamin Lerer <b.le...@gmail.com> AuthorDate: Mon Sep 6 13:36:14 2021 +0200 Open java driver connections in CQLTester in a lazy way patch by Benjamin Lerer; reviewed by Andrés de la Peña for CASSANDRA-16918 --- .../apache/cassandra/transport/DriverBurnTest.java | 2 +- test/unit/org/apache/cassandra/cql3/CQLTester.java | 122 +++++++++++---------- .../cassandra/cql3/PreparedStatementsTest.java | 10 +- .../metrics/ClientRequestSizeMetricsTest.java | 4 +- .../transport/ClientNotificiationsTest.java | 3 +- .../transport/ClientResourceLimitsTest.java | 5 +- .../cassandra/transport/RateLimitingTest.java | 5 +- 7 files changed, 80 insertions(+), 71 deletions(-) diff --git a/test/burn/org/apache/cassandra/transport/DriverBurnTest.java b/test/burn/org/apache/cassandra/transport/DriverBurnTest.java index 37ebec1..8aaf87e 100644 --- a/test/burn/org/apache/cassandra/transport/DriverBurnTest.java +++ b/test/burn/org/apache/cassandra/transport/DriverBurnTest.java @@ -62,7 +62,7 @@ public class DriverBurnTest extends CQLTester } }; - requireNetwork((builder) -> builder.withPipelineConfigurator(configurator)); + requireNetwork(builder -> builder.withPipelineConfigurator(configurator), builder -> {}); } @Test diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index f3c279c..56be6f6 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -125,7 +125,9 @@ public abstract class CQLTester protected static final InetAddress nativeAddr; protected static final Set<InetAddressAndPort> remoteAddrs = new HashSet<>(); private static final Map<ProtocolVersion, Cluster> clusters = new HashMap<>(); - protected static final Map<ProtocolVersion, Session> sessions = new HashMap<>(); + private static final Map<ProtocolVersion, Session> sessions = new HashMap<>(); + + private static Consumer<Cluster.Builder> clusterBuilderConfigurator; public static final List<ProtocolVersion> PROTOCOL_VERSIONS = new ArrayList<>(ProtocolVersion.SUPPORTED.size()); @@ -411,26 +413,27 @@ public abstract class CQLTester return allArgs; } - protected static void requireNetworkWithoutDriver() - { - startServices(); - startServer(server -> {}); - } - - // lazy initialization for all tests that require Java Driver + /** + * Initialize Native Transport for test that need it. + */ protected static void requireNetwork() throws ConfigurationException { - requireNetwork(server -> {}); + requireNetwork(server -> {}, cluster -> {}); } - // lazy initialization for all tests that require Java Driver - protected static void requireNetwork(Consumer<Server.Builder> decorator) throws ConfigurationException + /** + * Initialize Native Transport for the tests that need it. + */ + protected static void requireNetwork(Consumer<Server.Builder> serverConfigurator, + Consumer<Cluster.Builder> clusterConfigurator) throws ConfigurationException { if (server != null) return; + clusterBuilderConfigurator = clusterConfigurator; + startServices(); - initializeNetwork(decorator, null); + startServer(serverConfigurator); } private static void startServices() @@ -443,10 +446,11 @@ public abstract class CQLTester protected static void reinitializeNetwork() { - reinitializeNetwork(null); + reinitializeNetwork(server -> {}, cluster -> {}); } - protected static void reinitializeNetwork(Consumer<Cluster.Builder> clusterConfigurator) + protected static void reinitializeNetwork(Consumer<Server.Builder> serverConfigurator, + Consumer<Cluster.Builder> clusterConfigurator) { if (server != null && server.isRunning()) { @@ -462,54 +466,49 @@ public abstract class CQLTester clusters.clear(); sessions.clear(); - initializeNetwork(server -> {}, clusterConfigurator); + clusterBuilderConfigurator = clusterConfigurator; + + startServer(serverConfigurator); } - private static void initializeNetwork(Consumer<Server.Builder> decorator, Consumer<Cluster.Builder> clusterConfigurator) + private static void startServer(Consumer<Server.Builder> decorator) { - startServer(decorator); - - for (ProtocolVersion version : PROTOCOL_VERSIONS) - { - if (clusters.containsKey(version)) - continue; + Server.Builder serverBuilder = new Server.Builder().withHost(nativeAddr).withPort(nativePort); + decorator.accept(serverBuilder); + server = serverBuilder.build(); + ClientMetrics.instance.init(Collections.singleton(server)); + server.start(); + } - SocketOptions socketOptions = new SocketOptions() - .setConnectTimeoutMillis(Integer.getInteger("cassandra.test.driver.connection_timeout_ms", DEFAULT_CONNECT_TIMEOUT_MILLIS)) // default is 5000 - .setReadTimeoutMillis(Integer.getInteger("cassandra.test.driver.read_timeout_ms", DEFAULT_READ_TIMEOUT_MILLIS)); // default is 12000 + private static Cluster initClientCluster(ProtocolVersion version) + { + SocketOptions socketOptions = + new SocketOptions().setConnectTimeoutMillis(Integer.getInteger("cassandra.test.driver.connection_timeout_ms", + DEFAULT_CONNECT_TIMEOUT_MILLIS)) // default is 5000 + .setReadTimeoutMillis(Integer.getInteger("cassandra.test.driver.read_timeout_ms", + DEFAULT_READ_TIMEOUT_MILLIS)); // default is 12000 - logger.info("Timeouts: {} / {}", socketOptions.getConnectTimeoutMillis(), socketOptions.getReadTimeoutMillis()); + logger.info("Timeouts: {} / {}", socketOptions.getConnectTimeoutMillis(), socketOptions.getReadTimeoutMillis()); - Cluster.Builder builder = Cluster.builder() - .withoutJMXReporting() - .addContactPoints(nativeAddr) - .withClusterName("Test Cluster") - .withPort(nativePort) - .withSocketOptions(socketOptions); + Cluster.Builder builder = Cluster.builder() + .withoutJMXReporting() + .addContactPoints(nativeAddr) + .withClusterName("Test Cluster") + .withPort(nativePort) + .withSocketOptions(socketOptions); - if (clusterConfigurator != null) - clusterConfigurator.accept(builder); + if (version.isBeta()) + builder = builder.allowBetaProtocolVersion(); + else + builder = builder.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt())); - if (version.isBeta()) - builder = builder.allowBetaProtocolVersion(); - else - builder = builder.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt())); + clusterBuilderConfigurator.accept(builder); - Cluster cluster = builder.build(); - clusters.put(version, cluster); - sessions.put(version, cluster.connect()); + Cluster cluster = builder.build(); - logger.info("Started Java Driver instance for protocol version {}", version); - } - } + logger.info("Started Java Driver instance for protocol version {}", version); - private static void startServer(Consumer<Server.Builder> decorator) - { - Server.Builder serverBuilder = new Server.Builder().withHost(nativeAddr).withPort(nativePort); - decorator.accept(serverBuilder); - server = serverBuilder.build(); - ClientMetrics.instance.init(Collections.singleton(server)); - server.start(); + return cluster; } protected void dropPerTestKeyspace() throws Throwable @@ -1023,7 +1022,18 @@ public abstract class CQLTester { requireNetwork(); - return sessions.get(protocolVersion); + return getSession(protocolVersion); + } + + private Session getSession(ProtocolVersion protocolVersion) + { + Cluster cluster = getCluster(protocolVersion); + return sessions.computeIfAbsent(protocolVersion, userProto -> cluster.connect()); + } + + private Cluster getCluster(ProtocolVersion protocolVersion) + { + return clusters.computeIfAbsent(protocolVersion, userProto -> initClientCluster(protocolVersion)); } protected SimpleClient newSimpleClient(ProtocolVersion version) throws IOException @@ -1125,9 +1135,9 @@ public abstract class CQLTester for (int j = 0; j < meta.size(); j++) { DataType type = meta.getType(j); - com.datastax.driver.core.TypeCodec<Object> codec = clusters.get(protocolVersion).getConfiguration() - .getCodecRegistry() - .codecFor(type); + com.datastax.driver.core.TypeCodec<Object> codec = getCluster(protocolVersion).getConfiguration() + .getCodecRegistry() + .codecFor(type); ByteBuffer expectedByteValue = codec.serialize(expected[j], com.datastax.driver.core.ProtocolVersion.fromInt(protocolVersion.asInt())); int expectedBytes = expectedByteValue == null ? -1 : expectedByteValue.remaining(); ByteBuffer actualValue = actual.getBytesUnsafe(meta.getName(j)); @@ -1825,7 +1835,7 @@ public abstract class CQLTester protected com.datastax.driver.core.TupleType tupleTypeOf(ProtocolVersion protocolVersion, com.datastax.driver.core.DataType...types) { requireNetwork(); - return clusters.get(protocolVersion).getMetadata().newTupleType(types); + return getCluster(protocolVersion).getMetadata().newTupleType(types); } @SuppressWarnings({ "rawtypes", "unchecked" }) diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java index ef705bd..5b3ae3e 100644 --- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java +++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java @@ -58,7 +58,7 @@ public class PreparedStatementsTest extends CQLTester @Test public void testInvalidatePreparedStatementsOnDrop() { - Session session = sessions.get(ProtocolVersion.V5); + Session session = sessionNet(ProtocolVersion.V5); session.execute(dropKsStatement); session.execute(createKsStatement); @@ -102,7 +102,7 @@ public class PreparedStatementsTest extends CQLTester private void testInvalidatePreparedStatementOnAlter(ProtocolVersion version, boolean supportsMetadataChange) { - Session session = sessions.get(version); + Session session = sessionNet(version); String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (a int PRIMARY KEY, b int, c int);"; String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;"; @@ -162,7 +162,7 @@ public class PreparedStatementsTest extends CQLTester private void testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion version) { - Session session = sessions.get(version); + Session session = sessionNet(version); String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (a int PRIMARY KEY, b int, c int);"; String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;"; @@ -200,7 +200,7 @@ public class PreparedStatementsTest extends CQLTester @Test public void testStatementRePreparationOnReconnect() { - Session session = sessions.get(ProtocolVersion.V5); + Session session = sessionNet(ProtocolVersion.V5); session.execute("USE " + keyspace()); session.execute(dropKsStatement); @@ -241,7 +241,7 @@ public class PreparedStatementsTest extends CQLTester @Test public void prepareAndExecuteWithCustomExpressions() throws Throwable { - Session session = sessions.get(ProtocolVersion.V5); + Session session = sessionNet(ProtocolVersion.V5); session.execute(dropKsStatement); session.execute(createKsStatement); diff --git a/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java b/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java index 1f2f771..f19fca5 100644 --- a/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java @@ -69,7 +69,9 @@ public class ClientRequestSizeMetricsTest extends CQLTester // We explicitly disable scheme fetching to avoid that effect try { - reinitializeNetwork(builder -> builder.withQueryOptions(new QueryOptions().setMetadataEnabled(false))); + reinitializeNetwork(builder -> {}, builder -> builder.withQueryOptions(new QueryOptions().setMetadataEnabled(false))); + sessionNet(version); // Ensure that the connection is open + // We want to ignore all the messages sent by the driver upon connection as well as // the event sent upon schema updates clearMetrics(); diff --git a/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java index bd1ec63..4ff844a 100644 --- a/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java +++ b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.transport; import java.util.Collections; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.junit.Before; @@ -43,7 +42,7 @@ public class ClientNotificiationsTest extends CQLTester @Before public void setup() { - requireNetwork(builder -> builder.withEventNotifier(notifier)); + requireNetwork(builder -> builder.withEventNotifier(notifier), builder -> {}); } @Parameterized.Parameter(0) diff --git a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java index 5cea90c..1ffd9b4 100644 --- a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java +++ b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java @@ -71,9 +71,8 @@ public class ClientResourceLimitsTest extends CQLTester DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1); DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(LOW_LIMIT); DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(LOW_LIMIT); - - // The driver control connections would send queries that might interfere with the tests. - requireNetworkWithoutDriver(); + + requireNetwork(); } @AfterClass diff --git a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java index 8497c01..215542d 100644 --- a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java +++ b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java @@ -76,9 +76,8 @@ public class RateLimitingTest extends CQLTester // If we don't exceed the queue capacity, we won't actually use the global/endpoint // bytes-in-flight limits, and the assertions we make below around releasing them would be useless. DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1); - - // The driver control connections would send queries that might interfere with the tests. - requireNetworkWithoutDriver(); + + requireNetwork(); } @Before --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org