yifan-c commented on code in PR #83:
URL: https://github.com/apache/cassandra-sidecar/pull/83#discussion_r1424162396
##########
src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java:
##########
@@ -205,46 +241,84 @@ private void healthCheckInternal()
}
healthCheckStatement.setHost(host);
healthCheckStatement.setConsistencyLevel(ConsistencyLevel.ONE);
- Row oneResult = activeSession.execute(healthCheckStatement).one();
-
- // Note that within the scope of this method, we should keep on
using the local releaseVersion
- String releaseVersion =
oneResult.getString(RELEASE_VERSION_COLUMN_NAME);
- NodeSettings newNodeSettings = NodeSettings.builder()
-
.releaseVersion(releaseVersion)
-
.partitioner(oneResult.getString(PARTITIONER_COLUMN_NAME))
-
.sidecarVersion(sidecarVersion)
-
.datacenter(oneResult.getString(DATA_CENTER_COLUMN_NAME))
-
.tokens(oneResult.getSet(TOKENS_COLUMN_NAME, String.class))
-
.rpcAddress(oneResult.getInet(RPC_ADDRESS_COLUMN_NAME))
-
.rpcPort(oneResult.getInt(RPC_PORT_COLUMN_NAME))
- .build();
-
- if (!newNodeSettings.equals(nodeSettings))
- {
- // Update the nodeSettings cache
- SimpleCassandraVersion previousVersion = currentVersion;
- currentVersion = SimpleCassandraVersion.create(releaseVersion);
- adapter = versionProvider.cassandra(releaseVersion)
- .create(cqlSessionProvider,
jmxClient, localNativeTransportAddress);
- nodeSettings = newNodeSettings;
- LOGGER.info("Cassandra version change detected (from={} to={})
for cassandraInstanceId={}. " +
- "New adapter loaded={}", previousVersion,
currentVersion, cassandraInstanceId, adapter);
+ Row row = activeSession.execute(healthCheckStatement).one();
- notifyCqlConnection();
+ if (row != null)
+ {
+ if (!isNativeUp)
+ {
+ isNativeUp = true;
+ LOGGER.info("Cassandra native connectivity established for
cassandraInstanceId={}",
+ cassandraInstanceId);
+ notifyNativeConnection();
+ }
+ }
+ else
+ {
+ // This should never happen but added for completeness
+ LOGGER.error("Expected to query the release_version from
system.local but encountered null {}",
+ cassandraInstanceId);
+ // The cassandra native protocol connection to the node is
down.
+ markNativeDownAndMaybeNotifyDisconnection();
+ // Unregister the host listener.
+ maybeUnregisterHostListener(activeSession);
}
- LOGGER.debug("Cassandra version {}", releaseVersion);
}
catch (IllegalArgumentException | NoHostAvailableException e)
{
- LOGGER.error("Unexpected error connecting to Cassandra instance
{}", cassandraInstanceId, e);
- // The cassandra node is down.
+ LOGGER.error("Unexpected error querying Cassandra instance {}",
cassandraInstanceId, e);
+ // The cassandra native protocol connection to the node is down.
+ markNativeDownAndMaybeNotifyDisconnection();
// Unregister the host listener.
- markAsDownAndMaybeNotify();
maybeUnregisterHostListener(activeSession);
}
}
- private Host getHost(Metadata metadata)
+ protected NodeSettings newNodeSettingsFromJmx()
+ {
+ LimitedStorageOperations storageOperations =
+ jmxClient.proxy(LimitedStorageOperations.class,
STORAGE_SERVICE_OBJ_NAME);
+ LimitedEndpointSnitchOperations endpointSnitchOperations =
+ jmxClient.proxy(LimitedEndpointSnitchOperations.class,
ENDPOINT_SNITCH_INFO_OBJ_NAME);
+
+ String releaseVersion = storageOperations.getReleaseVersion();
+ String partitionerName = storageOperations.getPartitionerName();
+ List<String> tokens = maybeGetTokens(storageOperations);
+ String dataCenter = endpointSnitchOperations.getDatacenter();
+
+ return NodeSettings.builder()
+ .releaseVersion(releaseVersion)
+ .partitioner(partitionerName)
+ .sidecarVersion(sidecarVersion)
+ .datacenter(dataCenter)
+ .tokens(new LinkedHashSet<>(tokens))
+
.rpcAddress(localNativeTransportAddress.getAddress())
+ .rpcPort(localNativeTransportAddress.getPort())
+ .build();
+ }
+
+ /**
+ * Attempts to return the tokens assigned to the Cassandra instance.
+ *
+ * @param storageOperations the interface to perform the operations
+ * @return the list of tokens assigned to the Cassandra instance
+ */
+ protected List<String> maybeGetTokens(LimitedStorageOperations
storageOperations)
+ {
+ try
+ {
+ return storageOperations.getTokens();
+ }
+ catch (AssertionError aex)
+ {
+ // On a joining node, the JMX call will with an AssertionError; we
catch this scenario to prevent failure
Review Comment:
```suggestion
// On a joining node, the JMX call will fail with an
AssertionError; we catch this scenario to prevent failure
```
##########
src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java:
##########
@@ -125,11 +125,115 @@ void
testCqlSessionProviderWorksAsExpected(VertxTestContext context, CassandraTe
);
}
+// @CassandraIntegrationTest(nodesPerDc = 2, newNodesPerDc = 1)
Review Comment:
It this chunk to be removed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]