OleksiienkoMykyta commented on code in PR #1664:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1664#discussion_r1987457168
##########
session.go:
##########
@@ -553,6 +562,12 @@ func (s *Session) KeyspaceMetadata(keyspace string)
(*KeyspaceMetadata, error) {
return s.schemaDescriber.getSchema(keyspace)
}
+// ClusterMetadata returns the cluster metadata.
+// The returned value is a snapshot of the metadata at the time of the call.
Do not store for later reuse.
Review Comment:
We should provide more details to the user here, it's a potential weak spot.
Something like:
```
// ClusterMetadata returns a snapshot of the current cluster metadata at the
time of the call.
// The metadata may change over time, so storing the returned pointer for
later reuse can lead to inconsistencies.
// Potential Data Race Risk:
// If metaMngr is modified by another goroutine while being read, a data
race may occur.
```
##########
cluster_metadata.go:
##########
@@ -0,0 +1,180 @@
+package gocql
+
+import (
+ "sync"
+ "sync/atomic"
+)
+
+// ClusterMetadata holds metadata about cluster topology.
+// It is used inside atomic.Value and shallow copies are used when replacing
it,
+// so fields should not be modified in-place. Instead, to modify a field a
copy of the field should be made
+// and the pointer in ClusterMetadata updated to point to the new value.
+type ClusterMetadata struct {
+ // replicas is map[keyspace]map[Token]hosts
+ replicas map[string]tokenRingReplicas
+ tokenRing *TokenRing
+}
+
+// TokenRing returns the token ring.
+// Please note that the token ring is only available if at least one cluster
node is known and up.
+// Several [ClusterConfig] parameters can affect the availability or
reliability of the token ring:
+// * DisableInitialHostLookup will disable host discovery and therefore the
token ring availability.
+// * Events.DisableNodeStatusEvents will turn off processing of STATUS_CHANGE
events,
+// therefore the token ring will not be updated in response to host UP/DOWN
events.
+// * Events.DisableTopologyEvents will turn off processing of TOPOLOGY_CHANGE
events,
+// therefore the token ring will not be updated in response to cluster
topology changes.
+func (m *ClusterMetadata) TokenRing() *TokenRing {
+ return m.tokenRing
+}
+
+// resetTokenRing creates a new TokenRing.
+// It must be called with t.mu locked.
+func (m *ClusterMetadata) resetTokenRing(partitioner string, hosts
[]*HostInfo, logger StdLogger) {
+ if partitioner == "" {
+ // partitioner not yet set
+ return
+ }
+
+ // create a new Token ring
+ tokenRing, err := newTokenRing(partitioner, hosts)
+ if err != nil {
+ logger.Printf("Unable to update the token ring due to error:
%s", err)
+ return
+ }
+
+ // replace the Token ring
+ m.tokenRing = tokenRing
+}
+
+// clusterMetadataManager manages cluster metadata.
+type clusterMetadataManager struct {
+ getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error)
+ getKeyspaceName func() string
+
+ // mu protects writes to hosts, partitioner, metadata.
+ // reads can be unlocked as long as they are not used for updating
state later.
+ mu sync.Mutex
+ hosts cowHostList
+ partitioner string
+ metadata atomic.Value // *ClusterMetadata
+
+ logger StdLogger
+}
+
+func (m *clusterMetadataManager) init(s *Session) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if m.getKeyspaceMetadata != nil {
+ // Init was already called.
+ // See https://github.com/scylladb/gocql/issues/94.
+ panic("sharing token aware host selection policy between
sessions is not supported")
+ }
+ m.getKeyspaceMetadata = s.KeyspaceMetadata
+ m.getKeyspaceName = func() string { return s.cfg.Keyspace }
+ m.logger = s.logger
+}
+
+func (m *clusterMetadataManager) keyspaceChanged(update KeyspaceUpdateEvent) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ meta := m.getMetadataForUpdate()
+ m.updateReplicas(meta, update.Keyspace)
+ m.metadata.Store(meta)
+}
+
+func (m *clusterMetadataManager) setPartitioner(partitioner string) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if m.partitioner != partitioner {
+ m.partitioner = partitioner
+ meta := m.getMetadataForUpdate()
+ meta.resetTokenRing(m.partitioner, m.hosts.get(), m.logger)
+ m.updateReplicas(meta, m.getKeyspaceName())
+ m.metadata.Store(meta)
+ }
+}
+
+func (m *clusterMetadataManager) addHost(host *HostInfo) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if m.hosts.add(host) {
+ meta := m.getMetadataForUpdate()
+ meta.resetTokenRing(m.partitioner, m.hosts.get(), m.logger)
+ m.updateReplicas(meta, m.getKeyspaceName())
+ m.metadata.Store(meta)
+ }
+}
+
+func (m *clusterMetadataManager) addHosts(hosts []*HostInfo) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ for _, host := range hosts {
+ m.hosts.add(host)
+ }
+
+ meta := m.getMetadataForUpdate()
+ meta.resetTokenRing(m.partitioner, m.hosts.get(), m.logger)
+ m.updateReplicas(meta, m.getKeyspaceName())
+ m.metadata.Store(meta)
+}
+
+func (m *clusterMetadataManager) removeHost(host *HostInfo) {
Review Comment:
To be more consistent and help future developers, it would be nice to have
all methods/functions commented on.
##########
token.go:
##########
@@ -202,7 +217,99 @@ func (t *tokenRing) String() string {
return string(buf.Bytes())
}
-func (t *tokenRing) GetHostForToken(token token) (host *HostInfo, endToken
token) {
+// Tokens returns the token range corresponding to the primary replica.
+// The elements are sorted by token ascending.
+// The range for a given item starts after preceding range and ends with the
token at the current position.
+// The end token is part of the range.
+// The lowest (i.e. index 0) range wraps around the ring (its preceding range
is the one with the largest index).
+// You can obtain the owner host/vnode of the range by calling HostForToken
with the end token.
+//
+// The following example constructs one TOKEN-based query for each token range:
+// func buildTokenQueries(s *Session, t *TokenRing) []*Query {
Review Comment:
As I understand, you just added the buildTokenQueries function to
demonstrate how to use token-based queries. I think that this commented example
would be enough, but the unexportable function (line 276) ```func
buildTokenQueries(s *Session, t *TokenRing) []*Query {``` should be deleted,
or used for testing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]