joao-r-reis commented on code in PR #1926:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1926#discussion_r2842639316
##########
metadata.go:
##########
@@ -243,95 +246,257 @@ const (
// queries the cluster for schema information for a specific keyspace
type schemaDescriber struct {
- session *Session
- mu sync.Mutex
+ session *Session
+ schemaRefresher *refreshDebouncer
+ schemaMeta atomic.Value // *schemaMeta
+}
+
+// Schema change type constants as defined in the Cassandra Native Protocol
specification.
+// These values indicate the nature of schema modifications that occurred.
+//
+// See:
https://cassandra.apache.org/doc/latest/cassandra/reference/native-protocol.html
+//
+// Schema change events are server-initiated messages sent to clients that
have registered
+// for schema change notifications. These events indicate modifications to
keyspaces, tables,
+// user-defined types, functions, or aggregates.
+const (
+ SchemaChangeTypeCreated = "CREATED" // Schema object was created
+ SchemaChangeTypeUpdated = "UPDATED" // Schema object was modified
+ SchemaChangeTypeDropped = "DROPPED" // Schema object was removed
+)
- cache map[string]*KeyspaceMetadata
+type schemaMeta struct {
+ keyspaceMeta map[string]*KeyspaceMetadata
}
// creates a session bound schema describer which will query and cache
// keyspace metadata
-func newSchemaDescriber(session *Session) *schemaDescriber {
- return &schemaDescriber{
+func newSchemaDescriber(session *Session, schemaRefresher *refreshDebouncer)
*schemaDescriber {
+ meta := new(schemaMeta)
+ describer := &schemaDescriber{
session: session,
- cache: map[string]*KeyspaceMetadata{},
}
+ describer.schemaMeta.Store(meta)
+ describer.schemaRefresher = schemaRefresher
+ return describer
+}
+
+func (s *schemaDescriber) getSchemaMetaForRead() *schemaMeta {
+ meta, _ := s.schemaMeta.Load().(*schemaMeta)
+ return meta
+}
+
+func (s *schemaDescriber) getSchemaMetaForUpdate() *schemaMeta {
+ meta := s.getSchemaMetaForRead()
+ metaNew := new(schemaMeta)
+ if meta != nil {
+ *metaNew = *meta
+ }
+ return metaNew
}
// returns the cached KeyspaceMetadata held by the describer for the named
// keyspace.
func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata,
error) {
- s.mu.Lock()
- defer s.mu.Unlock()
+ if s.session.cfg.MetadataCacheMode == Disabled {
+ return s.fetchSchema(keyspaceName)
+ }
+ metadata, found := s.getSchemaMetaForRead().keyspaceMeta[keyspaceName]
- metadata, found := s.cache[keyspaceName]
if !found {
- // refresh the cache for this keyspace
- err := s.refreshSchema(keyspaceName)
- if err != nil {
- return nil, err
- }
-
- metadata = s.cache[keyspaceName]
+ return nil, ErrKeyspaceDoesNotExist
}
return metadata, nil
}
-// clears the already cached keyspace metadata
-func (s *schemaDescriber) clearSchema(keyspaceName string) {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- delete(s.cache, keyspaceName)
-}
-
-// forcibly updates the current KeyspaceMetadata held by the schema describer
-// for a given named keyspace.
-func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
+func (s *schemaDescriber) fetchSchema(keyspaceName string) (*KeyspaceMetadata,
error) {
var err error
// query the system keyspace for schema data
// TODO retrieve concurrently
keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
tables, err := getTableMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
columns, err := getColumnMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
functions, err := getFunctionsMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
aggregates, err := getAggregatesMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
userTypes, err := getUserTypeMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
materializedViews, err := getMaterializedViewsMetadata(s.session,
keyspaceName)
if err != nil {
- return err
+ return nil, err
}
// organize the schema data
compileMetadata(s.session, keyspace, tables, columns, functions,
aggregates, userTypes,
materializedViews)
- // update the cache
- s.cache[keyspaceName] = keyspace
+ return keyspace, nil
+}
+// forcibly updates the current KeyspaceMetadata held by the schema describer
+// for all the keyspaces.
+// This function is called via schemaRefresher refreshDebouncer to batch and
+// debounce schema refresh requests.
+func refreshSchemas(session *Session) error {
+ start := time.Now()
+ defer func() {
+ elapsed := time.Since(start)
+ session.logger.Debug("Schema refresh completed",
Review Comment:
I just noticed while troubleshooting that this log message will be
misleading when the refresh fails because it's always printed the same way
regardless of whether this functions fails or not
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]