joao-r-reis commented on code in PR #1929:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1929#discussion_r2924829809
##########
cluster.go:
##########
@@ -394,6 +397,60 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
}
+// MetadataConfig configures driver's internal metadata caching and event
listening.
+type MetadataConfig struct {
+ CacheMode MetadataCacheMode
+
+ // HostListener configures event listeners for host state and topology
changes.
+ HostListener HostListenersConfig
+
+ // SchemaListener configures event listeners for schema changes.
+ //
+ // If CacheMode is Full, listeners will be notified about all schema
changes.
+ //
+ // If CacheMode is KeyspaceOnly, only KeyspaceChangeListener will be
notified,
+ // having other listeners registered will result in an error during
session creation.
+ //
+ // If CacheMode is Disabled, having these listeners will result in an
error during session creation.
+ SchemaListener SchemaListenersConfig
+
+ // SessionReadyListener will be notified when the session is ready to
be used.
Review Comment:
```
// SessionReadyListener will be notified when the session is ready to be
used. This is meant to be implemented by Host and Schema listeners but it can
also be used as a generic callback for when the session is ready regardless of
whether a metadata listener is implemented or not.
```
##########
cluster.go:
##########
@@ -394,6 +397,60 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
}
+// MetadataConfig configures driver's internal metadata caching and event
listening.
+type MetadataConfig struct {
+ CacheMode MetadataCacheMode
+
+ // HostListener configures event listeners for host state and topology
changes.
+ HostListener HostListenersConfig
+
+ // SchemaListener configures event listeners for schema changes.
+ //
+ // If CacheMode is Full, listeners will be notified about all schema
changes.
+ //
+ // If CacheMode is KeyspaceOnly, only KeyspaceChangeListener will be
notified,
+ // having other listeners registered will result in an error during
session creation.
+ //
+ // If CacheMode is Disabled, having these listeners will result in an
error during session creation.
+ SchemaListener SchemaListenersConfig
+
+ // SessionReadyListener will be notified when the session is ready to
be used.
+ SessionReadyListener SessionReadyListener
+}
+
+type HostListenersConfig struct {
Review Comment:
I think we can remove the `Config` suffix from these:
```
type MetadataConfig struct {
CacheMode MetadataCacheMode
// HostListener configures event listeners for host state and topology
changes.
HostListeners HostListeners
// SchemaListener configures event listeners for schema changes.
//
// If CacheMode is Full, listeners will be notified about all schema
changes.
//
// If CacheMode is KeyspaceOnly, only KeyspaceChangeListener will be
notified,
// having other listeners registered will result in an error during
session creation.
//
// If CacheMode is Disabled, having these listeners will result in an
error during session creation.
SchemaListeners SchemaListeners
// SessionReadyListener will be notified when the session is ready to
be used.
SessionReadyListener SessionReadyListener
}
```
##########
cluster.go:
##########
@@ -394,6 +397,60 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
}
+// MetadataConfig configures driver's internal metadata caching and event
listening.
+type MetadataConfig struct {
+ CacheMode MetadataCacheMode
+
+ // HostListener configures event listeners for host state and topology
changes.
+ HostListener HostListenersConfig
+
+ // SchemaListener configures event listeners for schema changes.
+ //
+ // If CacheMode is Full, listeners will be notified about all schema
changes.
+ //
+ // If CacheMode is KeyspaceOnly, only KeyspaceChangeListener will be
notified,
+ // having other listeners registered will result in an error during
session creation.
+ //
+ // If CacheMode is Disabled, having these listeners will result in an
error during session creation.
Review Comment:
We should probably move these docs about the cache mode to the cache mode
field itself
##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be
used.
+type SessionReadyListener interface {
+ OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+ OnNewHost(event NewHostEvent)
+ OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+ Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+ Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+ OnHostUp(event HostUpEvent)
+ OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+ Host *HostInfo
+}
+
+type HostDownEvent struct {
+ Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+ OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+ OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+ OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+ OnTableCreated(event OnTableCreatedEvent)
+ OnTableUpdated(event OnTableUpdatedEvent)
+ OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+ OnUserTypeCreated(event OnUserTypeCreatedEvent)
+ OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+ OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+ OnFunctionCreated(event OnFunctionCreatedEvent)
+ OnFunctionUpdated(event OnFunctionUpdatedEvent)
+ OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+ OnAggregateCreated(event OnAggregateCreatedEvent)
+ OnAggregateUpdated(event OnAggregateUpdatedEvent)
+ OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+ Old *KeyspaceMetadata
+ New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+ Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+ Old *TableMetadata
+ New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+ Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+ Old *UserTypeMetadata
+ New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+ Old *FunctionMetadata
+ New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+ Old *AggregateMetadata
+ New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+ Keyspaces []KeyspaceChangeListener
+ Tables []TableChangeListener
+ UserTypes []UserTypeChangeListener
+ Functions []FunctionChangeListener
+ Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event
OnKeyspaceCreatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event
OnKeyspaceUpdatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event
OnKeyspaceDroppedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event
OnUserTypeCreatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event
OnUserTypeUpdatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event
OnUserTypeDroppedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event
OnFunctionCreatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event
OnFunctionUpdatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event
OnFunctionDroppedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event
OnAggregateCreatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event
OnAggregateUpdatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event
OnAggregateDroppedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateDropped(event)
+ }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology
change listeners.
+// Allows to register multiple listeners for the same type of host state and
topology change.
+type HostStateChangeListenersMux struct {
Review Comment:
HostListenersMux
##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be
used.
+type SessionReadyListener interface {
+ OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+ OnNewHost(event NewHostEvent)
+ OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+ Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+ Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+ OnHostUp(event HostUpEvent)
+ OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+ Host *HostInfo
+}
+
+type HostDownEvent struct {
+ Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+ OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+ OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+ OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+ OnTableCreated(event OnTableCreatedEvent)
+ OnTableUpdated(event OnTableUpdatedEvent)
+ OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+ OnUserTypeCreated(event OnUserTypeCreatedEvent)
+ OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+ OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+ OnFunctionCreated(event OnFunctionCreatedEvent)
+ OnFunctionUpdated(event OnFunctionUpdatedEvent)
+ OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+ OnAggregateCreated(event OnAggregateCreatedEvent)
+ OnAggregateUpdated(event OnAggregateUpdatedEvent)
+ OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+ Old *KeyspaceMetadata
+ New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+ Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+ Old *TableMetadata
+ New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+ Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+ Old *UserTypeMetadata
+ New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+ Old *FunctionMetadata
+ New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+ Old *AggregateMetadata
+ New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+ Keyspaces []KeyspaceChangeListener
+ Tables []TableChangeListener
+ UserTypes []UserTypeChangeListener
+ Functions []FunctionChangeListener
+ Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event
OnKeyspaceCreatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event
OnKeyspaceUpdatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event
OnKeyspaceDroppedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event
OnUserTypeCreatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event
OnUserTypeUpdatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event
OnUserTypeDroppedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event
OnFunctionCreatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event
OnFunctionUpdatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event
OnFunctionDroppedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event
OnAggregateCreatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event
OnAggregateUpdatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event
OnAggregateDroppedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateDropped(event)
+ }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology
change listeners.
+// Allows to register multiple listeners for the same type of host state and
topology change.
+type HostStateChangeListenersMux struct {
+ HostStateChangeListeners []HostStatusChangeListener
+ TopologyChangeListeners []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostUp(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostDown(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnNewHost(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnRemovedHost(event)
+ }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is
initialized.
+type internalHostStateAndTopologyChangeListener struct {
+ hostStateChangeListener HostStatusChangeListener
+ topologyChangeListener TopologyChangeListener
+
+ // To avoid calling the listeners before the session is initialized, we
need to track if the session is initialized.
+ // Updated by the sessionReady method.
+ sessionInitialized bool
+}
+
+func newInternalHostStateAndTopologyChangeListener(hostStateChangeListener
HostStatusChangeListener, topologyChangeListener TopologyChangeListener)
*internalHostStateAndTopologyChangeListener {
+ return &internalHostStateAndTopologyChangeListener{
+ hostStateChangeListener: hostStateChangeListener,
+ topologyChangeListener: topologyChangeListener,
+ sessionInitialized: false,
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostUp(event
HostUpEvent) {
+ if l.hostStateChangeListener != nil && l.sessionInitialized {
+ l.hostStateChangeListener.OnHostUp(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostDown(event
HostDownEvent) {
+ if l.hostStateChangeListener != nil && l.sessionInitialized {
+ l.hostStateChangeListener.OnHostDown(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnNewHost(event
NewHostEvent) {
+ if l.topologyChangeListener != nil && l.sessionInitialized {
+ l.topologyChangeListener.OnNewHost(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnRemovedHost(event
RemovedHostEvent) {
+ if l.topologyChangeListener != nil && l.sessionInitialized {
+ l.topologyChangeListener.OnRemovedHost(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnSessionReady() {
Review Comment:
We should avoid internal components using the public API like this in
general, just makes for a clearer boundary. Either we provide the session
pointer to this struct or we just provide a simple `func() bool`
##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be
used.
+type SessionReadyListener interface {
+ OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+ OnNewHost(event NewHostEvent)
+ OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+ Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+ Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+ OnHostUp(event HostUpEvent)
+ OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+ Host *HostInfo
+}
+
+type HostDownEvent struct {
+ Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+ OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+ OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+ OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+ OnTableCreated(event OnTableCreatedEvent)
+ OnTableUpdated(event OnTableUpdatedEvent)
+ OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+ OnUserTypeCreated(event OnUserTypeCreatedEvent)
+ OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+ OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+ OnFunctionCreated(event OnFunctionCreatedEvent)
+ OnFunctionUpdated(event OnFunctionUpdatedEvent)
+ OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+ OnAggregateCreated(event OnAggregateCreatedEvent)
+ OnAggregateUpdated(event OnAggregateUpdatedEvent)
+ OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+ Old *KeyspaceMetadata
+ New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+ Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+ Old *TableMetadata
+ New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+ Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+ Old *UserTypeMetadata
+ New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+ Old *FunctionMetadata
+ New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+ Old *AggregateMetadata
+ New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+ Keyspaces []KeyspaceChangeListener
+ Tables []TableChangeListener
+ UserTypes []UserTypeChangeListener
+ Functions []FunctionChangeListener
+ Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event
OnKeyspaceCreatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event
OnKeyspaceUpdatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event
OnKeyspaceDroppedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event
OnUserTypeCreatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event
OnUserTypeUpdatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event
OnUserTypeDroppedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event
OnFunctionCreatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event
OnFunctionUpdatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event
OnFunctionDroppedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event
OnAggregateCreatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event
OnAggregateUpdatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event
OnAggregateDroppedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateDropped(event)
+ }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology
change listeners.
+// Allows to register multiple listeners for the same type of host state and
topology change.
+type HostStateChangeListenersMux struct {
+ HostStateChangeListeners []HostStatusChangeListener
+ TopologyChangeListeners []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostUp(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostDown(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnNewHost(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnRemovedHost(event)
+ }
+}
Review Comment:
We're missing the SessionReadyListenersMux no?
##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be
used.
+type SessionReadyListener interface {
+ OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+ OnNewHost(event NewHostEvent)
+ OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+ Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+ Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+ OnHostUp(event HostUpEvent)
+ OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+ Host *HostInfo
+}
+
+type HostDownEvent struct {
+ Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+ OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+ OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+ OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+ OnTableCreated(event OnTableCreatedEvent)
+ OnTableUpdated(event OnTableUpdatedEvent)
+ OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+ OnUserTypeCreated(event OnUserTypeCreatedEvent)
+ OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+ OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+ OnFunctionCreated(event OnFunctionCreatedEvent)
+ OnFunctionUpdated(event OnFunctionUpdatedEvent)
+ OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+ OnAggregateCreated(event OnAggregateCreatedEvent)
+ OnAggregateUpdated(event OnAggregateUpdatedEvent)
+ OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+ Old *KeyspaceMetadata
+ New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+ Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+ Old *TableMetadata
+ New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+ Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+ Old *UserTypeMetadata
+ New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+ Old *FunctionMetadata
+ New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+ Old *AggregateMetadata
+ New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+ Keyspaces []KeyspaceChangeListener
+ Tables []TableChangeListener
+ UserTypes []UserTypeChangeListener
+ Functions []FunctionChangeListener
+ Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event
OnKeyspaceCreatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event
OnKeyspaceUpdatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event
OnKeyspaceDroppedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event
OnUserTypeCreatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event
OnUserTypeUpdatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event
OnUserTypeDroppedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event
OnFunctionCreatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event
OnFunctionUpdatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event
OnFunctionDroppedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event
OnAggregateCreatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event
OnAggregateUpdatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event
OnAggregateDroppedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateDropped(event)
+ }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology
change listeners.
+// Allows to register multiple listeners for the same type of host state and
topology change.
+type HostStateChangeListenersMux struct {
+ HostStateChangeListeners []HostStatusChangeListener
+ TopologyChangeListeners []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostUp(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostDown(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnNewHost(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnRemovedHost(event)
+ }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is
initialized.
+type internalHostStateAndTopologyChangeListener struct {
+ hostStateChangeListener HostStatusChangeListener
+ topologyChangeListener TopologyChangeListener
+
+ // To avoid calling the listeners before the session is initialized, we
need to track if the session is initialized.
+ // Updated by the sessionReady method.
+ sessionInitialized bool
+}
+
+func newInternalHostStateAndTopologyChangeListener(hostStateChangeListener
HostStatusChangeListener, topologyChangeListener TopologyChangeListener)
*internalHostStateAndTopologyChangeListener {
+ return &internalHostStateAndTopologyChangeListener{
+ hostStateChangeListener: hostStateChangeListener,
+ topologyChangeListener: topologyChangeListener,
+ sessionInitialized: false,
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostUp(event
HostUpEvent) {
+ if l.hostStateChangeListener != nil && l.sessionInitialized {
+ l.hostStateChangeListener.OnHostUp(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostDown(event
HostDownEvent) {
+ if l.hostStateChangeListener != nil && l.sessionInitialized {
+ l.hostStateChangeListener.OnHostDown(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnNewHost(event
NewHostEvent) {
+ if l.topologyChangeListener != nil && l.sessionInitialized {
+ l.topologyChangeListener.OnNewHost(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnRemovedHost(event
RemovedHostEvent) {
+ if l.topologyChangeListener != nil && l.sessionInitialized {
+ l.topologyChangeListener.OnRemovedHost(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnSessionReady() {
+ l.sessionInitialized = true
+}
+
+// Wrapper around the session ready listeners.
+type sessionReadyListeners struct {
+ sessionReadyListeners []SessionReadyListener
+}
+
+func (mux *sessionReadyListeners) OnSessionReady() {
Review Comment:
no need for pointers
##########
cluster.go:
##########
@@ -394,6 +397,60 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
}
+// MetadataConfig configures driver's internal metadata caching and event
listening.
+type MetadataConfig struct {
+ CacheMode MetadataCacheMode
+
+ // HostListener configures event listeners for host state and topology
changes.
+ HostListener HostListenersConfig
Review Comment:
We should mention and reference the Mux types on these API docs comments
(both on each config field + the struct docs + the individual listener field
docs. And we need a new section on `doc.go` to document this new feature (maybe
improve an existing Metadata section if it already exists otherwise create it)
##########
cluster.go:
##########
@@ -394,6 +397,60 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
}
+// MetadataConfig configures driver's internal metadata caching and event
listening.
+type MetadataConfig struct {
+ CacheMode MetadataCacheMode
+
+ // HostListener configures event listeners for host state and topology
changes.
+ HostListener HostListenersConfig
+
+ // SchemaListener configures event listeners for schema changes.
+ //
+ // If CacheMode is Full, listeners will be notified about all schema
changes.
+ //
+ // If CacheMode is KeyspaceOnly, only KeyspaceChangeListener will be
notified,
+ // having other listeners registered will result in an error during
session creation.
+ //
+ // If CacheMode is Disabled, having these listeners will result in an
error during session creation.
+ SchemaListener SchemaListenersConfig
+
+ // SessionReadyListener will be notified when the session is ready to
be used.
+ SessionReadyListener SessionReadyListener
+}
+
+type HostListenersConfig struct {
+ // HostStateChangeListener will be notified about host state events
(UP, DOWN).
+ HostStateChangeListener HostStatusChangeListener
+
+ // TopologyChangeListener will be notified about topology change events
+ // (NEW_NODE, REMOVED_NODE).
+ TopologyChangeListener TopologyChangeListener
+}
+
+type SchemaListenersConfig struct {
+ KeyspaceChangeListener KeyspaceChangeListener
+ TableChangeListener TableChangeListener
+ UserTypeChangeListener UserTypeChangeListener
+ FunctionChangeListener FunctionChangeListener
+ AggregateChangeListener AggregateChangeListener
+}
+
+func (cfg *SchemaListenersConfig) hasSchemaChangeListeners() bool {
+ return cfg.hasKeyspaceListener() ||
cfg.hasNonKeyspaceSchemaChangeListeners()
+}
+
+func (cfg *SchemaListenersConfig) hasKeyspaceListener() bool {
+ return cfg.KeyspaceChangeListener != nil
+}
+
+// hasNonKeyspaceSchemaChangeListeners returns true if any schema change
listener is set except for keyspace one.
+func (cfg *SchemaListenersConfig) hasNonKeyspaceSchemaChangeListeners() bool {
+ return cfg.TableChangeListener != nil ||
+ cfg.UserTypeChangeListener != nil ||
+ cfg.FunctionChangeListener != nil ||
+ cfg.AggregateChangeListener != nil
+}
Review Comment:
We can move all of these `SchemaListenersConfig` methods to a new internal
schemalisteners object like you already did for the host listeners and
sessionready listener
##########
metadata.go:
##########
@@ -2049,3 +2101,319 @@ func isIdentifierChar(c byte) bool {
c == '_' ||
c == '&'
}
+
+// Handles the full schema changes including table, user type, function,
aggregate.
+func handleFullSchemaChanges(session *Session, oldKeyspaces, newKeyspaces
map[string]*KeyspaceMetadata) {
+ for _, oldKsMeta := range oldKeyspaces {
+ newKsMeta, ok := newKeyspaces[oldKsMeta.Name]
+ if !ok {
+ // Skip, KeyspaceChangeListener is already notified in
refreshSchemas()
+ continue
+ }
+
+ if session.schemaChangeListeners.TableChangeListener != nil {
Review Comment:
with the internal schema listeners object we can replace this with just
`hasTableListener` and all the nil checks will be contained inside the internal
schema listeners struct code (and 0 risk of panic'ing on nil references)
##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be
used.
+type SessionReadyListener interface {
+ OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+ OnNewHost(event NewHostEvent)
+ OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+ Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+ Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+ OnHostUp(event HostUpEvent)
+ OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+ Host *HostInfo
+}
+
+type HostDownEvent struct {
+ Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+ OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+ OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+ OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+ OnTableCreated(event OnTableCreatedEvent)
+ OnTableUpdated(event OnTableUpdatedEvent)
+ OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+ OnUserTypeCreated(event OnUserTypeCreatedEvent)
+ OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+ OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+ OnFunctionCreated(event OnFunctionCreatedEvent)
+ OnFunctionUpdated(event OnFunctionUpdatedEvent)
+ OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+ OnAggregateCreated(event OnAggregateCreatedEvent)
+ OnAggregateUpdated(event OnAggregateUpdatedEvent)
+ OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+ Old *KeyspaceMetadata
+ New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+ Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+ Old *TableMetadata
+ New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+ Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+ Old *UserTypeMetadata
+ New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+ Old *FunctionMetadata
+ New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+ Old *AggregateMetadata
+ New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+ Keyspaces []KeyspaceChangeListener
+ Tables []TableChangeListener
+ UserTypes []UserTypeChangeListener
+ Functions []FunctionChangeListener
+ Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event
OnKeyspaceCreatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event
OnKeyspaceUpdatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event
OnKeyspaceDroppedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event
OnUserTypeCreatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event
OnUserTypeUpdatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event
OnUserTypeDroppedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event
OnFunctionCreatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event
OnFunctionUpdatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event
OnFunctionDroppedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event
OnAggregateCreatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event
OnAggregateUpdatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event
OnAggregateDroppedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateDropped(event)
+ }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology
change listeners.
+// Allows to register multiple listeners for the same type of host state and
topology change.
+type HostStateChangeListenersMux struct {
+ HostStateChangeListeners []HostStatusChangeListener
+ TopologyChangeListeners []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostUp(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostDown(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnNewHost(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnRemovedHost(event)
+ }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is
initialized.
+type internalHostStateAndTopologyChangeListener struct {
Review Comment:
hostListenersWrapper? internalHostListeners?
##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be
used.
+type SessionReadyListener interface {
+ OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+ OnNewHost(event NewHostEvent)
+ OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+ Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+ Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+ OnHostUp(event HostUpEvent)
+ OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+ Host *HostInfo
+}
+
+type HostDownEvent struct {
+ Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+ OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+ OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+ OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+ OnTableCreated(event OnTableCreatedEvent)
+ OnTableUpdated(event OnTableUpdatedEvent)
+ OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+ OnUserTypeCreated(event OnUserTypeCreatedEvent)
+ OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+ OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+ OnFunctionCreated(event OnFunctionCreatedEvent)
+ OnFunctionUpdated(event OnFunctionUpdatedEvent)
+ OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+ OnAggregateCreated(event OnAggregateCreatedEvent)
+ OnAggregateUpdated(event OnAggregateUpdatedEvent)
+ OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+ Old *KeyspaceMetadata
+ New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+ Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+ Old *TableMetadata
+ New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+ Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+ Old *UserTypeMetadata
+ New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+ Old *FunctionMetadata
+ New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+ Old *AggregateMetadata
+ New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+ Keyspaces []KeyspaceChangeListener
+ Tables []TableChangeListener
+ UserTypes []UserTypeChangeListener
+ Functions []FunctionChangeListener
+ Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event
OnKeyspaceCreatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event
OnKeyspaceUpdatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event
OnKeyspaceDroppedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event
OnUserTypeCreatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event
OnUserTypeUpdatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event
OnUserTypeDroppedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event
OnFunctionCreatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event
OnFunctionUpdatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event
OnFunctionDroppedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event
OnAggregateCreatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event
OnAggregateUpdatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event
OnAggregateDroppedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateDropped(event)
+ }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology
change listeners.
+// Allows to register multiple listeners for the same type of host state and
topology change.
+type HostStateChangeListenersMux struct {
+ HostStateChangeListeners []HostStatusChangeListener
+ TopologyChangeListeners []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostUp(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostDown(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnNewHost(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnRemovedHost(event)
+ }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is
initialized.
+type internalHostStateAndTopologyChangeListener struct {
+ hostStateChangeListener HostStatusChangeListener
+ topologyChangeListener TopologyChangeListener
+
+ // To avoid calling the listeners before the session is initialized, we
need to track if the session is initialized.
+ // Updated by the sessionReady method.
+ sessionInitialized bool
+}
+
+func newInternalHostStateAndTopologyChangeListener(hostStateChangeListener
HostStatusChangeListener, topologyChangeListener TopologyChangeListener)
*internalHostStateAndTopologyChangeListener {
+ return &internalHostStateAndTopologyChangeListener{
+ hostStateChangeListener: hostStateChangeListener,
+ topologyChangeListener: topologyChangeListener,
+ sessionInitialized: false,
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostUp(event
HostUpEvent) {
+ if l.hostStateChangeListener != nil && l.sessionInitialized {
+ l.hostStateChangeListener.OnHostUp(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostDown(event
HostDownEvent) {
+ if l.hostStateChangeListener != nil && l.sessionInitialized {
+ l.hostStateChangeListener.OnHostDown(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnNewHost(event
NewHostEvent) {
+ if l.topologyChangeListener != nil && l.sessionInitialized {
+ l.topologyChangeListener.OnNewHost(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnRemovedHost(event
RemovedHostEvent) {
+ if l.topologyChangeListener != nil && l.sessionInitialized {
+ l.topologyChangeListener.OnRemovedHost(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnSessionReady() {
+ l.sessionInitialized = true
+}
+
Review Comment:
Why no `internalSchemaListeners` ?
##########
metadata.go:
##########
@@ -483,6 +504,43 @@ func refreshSchemas(session *Session) error {
session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: updatedKeyspace,
Change: SchemaChangeTypeUpdated})
}
}
+
+ // Notify the keyspace change listeners if they are set.
+ if session.schemaChangeListeners.hasKeyspaceListener() &&
sessionInitialized {
+ keyspaceListener :=
session.schemaChangeListeners.KeyspaceChangeListener
+ for _, createdKeyspace := range newKeyspaces {
+
keyspaceListener.OnKeyspaceCreated(OnKeyspaceCreatedEvent{Keyspace:
keyspaceMeta[createdKeyspace]})
+
+ }
+ for _, droppedKeyspace := range droppedKeyspaces {
+
keyspaceListener.OnKeyspaceDropped(OnKeyspaceDroppedEvent{Keyspace:
oldKeyspaceMeta[droppedKeyspace]})
+
+ }
+ for _, updatedKeyspace := range updatedKeyspaces {
Review Comment:
we may already have looped through all the keyspaces beforehand, shouldn't
we try to merge these to optimize a bit? Rough code example:
```
// Check notification capabilities once
notifier, supportsRefresh := session.policy.(schemaRefreshNotifier)
hasListener := session.schemaChangeListeners.hasKeyspaceListener() &&
sessionInitialized
keyspaceListener := session.schemaChangeListeners.KeyspaceChangeListener
if supportsRefresh {
notifier.schemaRefreshed(sd.getSchemaMetaForRead())
}
// If we don't support schemaRefreshed OR we have listeners, we need to loop
if !supportsRefresh || hasListener {
for _, name := range newKeyspaces {
if !supportsRefresh {
session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace:
name, Change: SchemaChangeTypeCreated})
}
if hasListener {
keyspaceListener.OnKeyspaceCreated(OnKeyspaceCreatedEvent{Keyspace:
keyspaceMeta[name]})
}
}
for _, name := range droppedKeyspaces {
if !supportsRefresh {
session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace:
name, Change: SchemaChangeTypeDropped})
}
if hasListener {
keyspaceListener.OnKeyspaceDropped(OnKeyspaceDroppedEvent{Keyspace:
oldKeyspaceMeta[name]})
}
}
for _, name := range updatedKeyspaces {
if !supportsRefresh {
session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace:
name, Change: SchemaChangeTypeUpdated})
}
if hasListener {
keyspaceListener.OnKeyspaceUpdated(OnKeyspaceUpdatedEvent{
Old: oldKeyspaceMeta[name],
New: keyspaceMeta[name],
})
}
}
}
```
##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be
used.
+type SessionReadyListener interface {
+ OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+ OnNewHost(event NewHostEvent)
+ OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+ Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+ Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+ OnHostUp(event HostUpEvent)
+ OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+ Host *HostInfo
+}
+
+type HostDownEvent struct {
+ Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+ OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+ OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+ OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+ OnTableCreated(event OnTableCreatedEvent)
+ OnTableUpdated(event OnTableUpdatedEvent)
+ OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+ OnUserTypeCreated(event OnUserTypeCreatedEvent)
+ OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+ OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+ OnFunctionCreated(event OnFunctionCreatedEvent)
+ OnFunctionUpdated(event OnFunctionUpdatedEvent)
+ OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+ OnAggregateCreated(event OnAggregateCreatedEvent)
+ OnAggregateUpdated(event OnAggregateUpdatedEvent)
+ OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+ Old *KeyspaceMetadata
+ New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+ Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+ Old *TableMetadata
+ New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+ Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+ Old *UserTypeMetadata
+ New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+ Old *FunctionMetadata
+ New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+ Old *AggregateMetadata
+ New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+ Keyspaces []KeyspaceChangeListener
+ Tables []TableChangeListener
+ UserTypes []UserTypeChangeListener
+ Functions []FunctionChangeListener
+ Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event
OnKeyspaceCreatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event
OnKeyspaceUpdatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event
OnKeyspaceDroppedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event
OnUserTypeCreatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event
OnUserTypeUpdatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event
OnUserTypeDroppedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event
OnFunctionCreatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event
OnFunctionUpdatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event
OnFunctionDroppedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event
OnAggregateCreatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event
OnAggregateUpdatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event
OnAggregateDroppedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateDropped(event)
+ }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology
change listeners.
+// Allows to register multiple listeners for the same type of host state and
topology change.
+type HostStateChangeListenersMux struct {
+ HostStateChangeListeners []HostStatusChangeListener
+ TopologyChangeListeners []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostUp(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostDown(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnNewHost(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnRemovedHost(event)
+ }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is
initialized.
+type internalHostStateAndTopologyChangeListener struct {
+ hostStateChangeListener HostStatusChangeListener
+ topologyChangeListener TopologyChangeListener
+
+ // To avoid calling the listeners before the session is initialized, we
need to track if the session is initialized.
+ // Updated by the sessionReady method.
+ sessionInitialized bool
+}
+
+func newInternalHostStateAndTopologyChangeListener(hostStateChangeListener
HostStatusChangeListener, topologyChangeListener TopologyChangeListener)
*internalHostStateAndTopologyChangeListener {
Review Comment:
When we remove the `sessionInitialized` field we can make this return a
value instead of pointer (and change the receivers)
##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be
used.
+type SessionReadyListener interface {
+ OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+ OnNewHost(event NewHostEvent)
+ OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+ Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+ Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+ OnHostUp(event HostUpEvent)
+ OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+ Host *HostInfo
+}
+
+type HostDownEvent struct {
+ Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+ OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+ OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+ OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+ OnTableCreated(event OnTableCreatedEvent)
+ OnTableUpdated(event OnTableUpdatedEvent)
+ OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+ OnUserTypeCreated(event OnUserTypeCreatedEvent)
+ OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+ OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+ OnFunctionCreated(event OnFunctionCreatedEvent)
+ OnFunctionUpdated(event OnFunctionUpdatedEvent)
+ OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+ OnAggregateCreated(event OnAggregateCreatedEvent)
+ OnAggregateUpdated(event OnAggregateUpdatedEvent)
+ OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+ Old *KeyspaceMetadata
+ New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+ Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+ Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+ Old *TableMetadata
+ New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+ Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+ Old *UserTypeMetadata
+ New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+ UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+ Old *FunctionMetadata
+ New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+ Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+ Old *AggregateMetadata
+ New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+ Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+ Keyspaces []KeyspaceChangeListener
+ Tables []TableChangeListener
+ UserTypes []UserTypeChangeListener
+ Functions []FunctionChangeListener
+ Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event
OnKeyspaceCreatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event
OnKeyspaceUpdatedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event
OnKeyspaceDroppedEvent) {
+ for _, listener := range mux.Keyspaces {
+ listener.OnKeyspaceDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent)
{
+ for _, listener := range mux.Tables {
+ listener.OnTableDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event
OnUserTypeCreatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event
OnUserTypeUpdatedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event
OnUserTypeDroppedEvent) {
+ for _, listener := range mux.UserTypes {
+ listener.OnUserTypeDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event
OnFunctionCreatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event
OnFunctionUpdatedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event
OnFunctionDroppedEvent) {
+ for _, listener := range mux.Functions {
+ listener.OnFunctionDropped(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event
OnAggregateCreatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateCreated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event
OnAggregateUpdatedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateUpdated(event)
+ }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event
OnAggregateDroppedEvent) {
+ for _, listener := range mux.Aggregates {
+ listener.OnAggregateDropped(event)
+ }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology
change listeners.
+// Allows to register multiple listeners for the same type of host state and
topology change.
+type HostStateChangeListenersMux struct {
+ HostStateChangeListeners []HostStatusChangeListener
+ TopologyChangeListeners []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostUp(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+ for _, listener := range mux.HostStateChangeListeners {
+ listener.OnHostDown(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnNewHost(event)
+ }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+ for _, listener := range mux.TopologyChangeListeners {
+ listener.OnRemovedHost(event)
+ }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is
initialized.
+type internalHostStateAndTopologyChangeListener struct {
+ hostStateChangeListener HostStatusChangeListener
+ topologyChangeListener TopologyChangeListener
+
+ // To avoid calling the listeners before the session is initialized, we
need to track if the session is initialized.
+ // Updated by the sessionReady method.
+ sessionInitialized bool
+}
+
+func newInternalHostStateAndTopologyChangeListener(hostStateChangeListener
HostStatusChangeListener, topologyChangeListener TopologyChangeListener)
*internalHostStateAndTopologyChangeListener {
+ return &internalHostStateAndTopologyChangeListener{
+ hostStateChangeListener: hostStateChangeListener,
+ topologyChangeListener: topologyChangeListener,
+ sessionInitialized: false,
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostUp(event
HostUpEvent) {
+ if l.hostStateChangeListener != nil && l.sessionInitialized {
+ l.hostStateChangeListener.OnHostUp(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostDown(event
HostDownEvent) {
+ if l.hostStateChangeListener != nil && l.sessionInitialized {
+ l.hostStateChangeListener.OnHostDown(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnNewHost(event
NewHostEvent) {
+ if l.topologyChangeListener != nil && l.sessionInitialized {
+ l.topologyChangeListener.OnNewHost(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnRemovedHost(event
RemovedHostEvent) {
+ if l.topologyChangeListener != nil && l.sessionInitialized {
+ l.topologyChangeListener.OnRemovedHost(event)
+ }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnSessionReady() {
+ l.sessionInitialized = true
+}
+
+// Wrapper around the session ready listeners.
+type sessionReadyListeners struct {
+ sessionReadyListeners []SessionReadyListener
+}
+
+func (mux *sessionReadyListeners) OnSessionReady() {
+ for _, listener := range mux.sessionReadyListeners {
+ listener.OnSessionReady()
+ }
+}
+
+func (mux *sessionReadyListeners) addListener(listener SessionReadyListener) {
Review Comment:
make it consistent with the other internal listeners object above by adding
a `newSessionReadyListeners` function instead of `addListener` when we replace
the slice field
##########
metadata.go:
##########
@@ -483,6 +504,43 @@ func refreshSchemas(session *Session) error {
session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: updatedKeyspace,
Change: SchemaChangeTypeUpdated})
}
}
+
+ // Notify the keyspace change listeners if they are set.
+ if session.schemaChangeListeners.hasKeyspaceListener() &&
sessionInitialized {
+ keyspaceListener :=
session.schemaChangeListeners.KeyspaceChangeListener
+ for _, createdKeyspace := range newKeyspaces {
+
keyspaceListener.OnKeyspaceCreated(OnKeyspaceCreatedEvent{Keyspace:
keyspaceMeta[createdKeyspace]})
+
+ }
+ for _, droppedKeyspace := range droppedKeyspaces {
+
keyspaceListener.OnKeyspaceDropped(OnKeyspaceDroppedEvent{Keyspace:
oldKeyspaceMeta[droppedKeyspace]})
+
+ }
+ for _, updatedKeyspace := range updatedKeyspaces {
+
keyspaceListener.OnKeyspaceUpdated(OnKeyspaceUpdatedEvent{
+ Old: oldKeyspaceMeta[updatedKeyspace],
+ New: keyspaceMeta[updatedKeyspace],
+ })
+ }
+ }
+
+ // If we have full metadata cache mode, notify the non-keyspace change
listeners if they are set.
+ if session.cfg.Metadata.CacheMode == Full &&
+
session.cfg.Metadata.SchemaListener.hasNonKeyspaceSchemaChangeListeners() &&
+ sessionInitialized {
+ session.logger.Debug("Processing full schema changes.")
Review Comment:
I'd say we can remove this log message, one is enough ("Finished calling
schema change listeners")
##########
metadata.go:
##########
@@ -2049,3 +2101,319 @@ func isIdentifierChar(c byte) bool {
c == '_' ||
c == '&'
}
+
+// Handles the full schema changes including table, user type, function,
aggregate.
+func handleFullSchemaChanges(session *Session, oldKeyspaces, newKeyspaces
map[string]*KeyspaceMetadata) {
+ for _, oldKsMeta := range oldKeyspaces {
+ newKsMeta, ok := newKeyspaces[oldKsMeta.Name]
+ if !ok {
+ // Skip, KeyspaceChangeListener is already notified in
refreshSchemas()
+ continue
+ }
+
+ if session.schemaChangeListeners.TableChangeListener != nil {
+ handleSchemaTableChanges(session, oldKsMeta, newKsMeta)
+ }
+ if session.schemaChangeListeners.UserTypeChangeListener != nil {
+ handleSchemaUserTypeChanges(session, oldKsMeta,
newKsMeta)
+ }
+ if session.schemaChangeListeners.FunctionChangeListener != nil {
+ handleSchemaFunctionChanges(session, oldKsMeta,
newKsMeta)
+ }
+ if session.schemaChangeListeners.AggregateChangeListener != nil
{
+ handleSchemaAggregateChanges(session, oldKsMeta,
newKsMeta)
+ }
+ }
+}
+
+// Computes the schema table changes and notifies the event listener if it is
set.
+// It is expected that the TableChangeListener is set on the Session.
+func handleSchemaTableChanges(session *Session, oldKeyspace, newKeyspace
*KeyspaceMetadata) {
+ var createdEvents []OnTableCreatedEvent
+ var droppedEvents []OnTableDroppedEvent
+ var updatedEvents []OnTableUpdatedEvent
+
+ for _, oldTableMeta := range oldKeyspace.Tables {
+ newTableMeta, ok := newKeyspace.Tables[oldTableMeta.Name]
+ if !ok {
+ droppedEvents = append(droppedEvents,
OnTableDroppedEvent{Table: oldTableMeta})
+ } else {
+ if !compareTablesMetadata(oldTableMeta, newTableMeta) {
+ updatedEvents = append(updatedEvents,
OnTableUpdatedEvent{Old: oldTableMeta, New: newTableMeta})
+ }
+ }
+ }
+
+ for _, newTableMeta := range newKeyspace.Tables {
+ _, ok := oldKeyspace.Tables[newTableMeta.Name]
+ if !ok {
+ createdEvents = append(createdEvents,
OnTableCreatedEvent{Table: newTableMeta})
+ }
+ }
+
+ session.logger.Debug("Computed schema table change events",
+ NewLogFieldInt("created_table_events_count",
len(createdEvents)),
+ NewLogFieldInt("updated_table_events_count",
len(updatedEvents)),
+ NewLogFieldInt("dropped_table_events_count",
len(droppedEvents)),
Review Comment:
I like this for debugging, maybe we should try to do this for the keyspace
events too? Even if a keyspace listener isn't set the policy might require the
events so it would be useful during debug to see if the number of events that
are being provided to the hostselectionpolicy (or the keyspace listener) seem
normal
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]