joao-r-reis commented on code in PR #1929:
URL: 
https://github.com/apache/cassandra-gocql-driver/pull/1929#discussion_r2924829809


##########
cluster.go:
##########
@@ -394,6 +397,60 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
        return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
 }
 
+// MetadataConfig configures driver's internal metadata caching and event 
listening.
+type MetadataConfig struct {
+       CacheMode MetadataCacheMode
+
+       // HostListener configures event listeners for host state and topology 
changes.
+       HostListener HostListenersConfig
+
+       // SchemaListener configures event listeners for schema changes.
+       //
+       // If CacheMode is Full, listeners will be notified about all schema 
changes.
+       //
+       // If CacheMode is KeyspaceOnly, only KeyspaceChangeListener will be 
notified,
+       // having other listeners registered will result in an error during 
session creation.
+       //
+       // If CacheMode is Disabled, having these listeners will result in an 
error during session creation.
+       SchemaListener SchemaListenersConfig
+
+       // SessionReadyListener will be notified when the session is ready to 
be used.

Review Comment:
   ```
   // SessionReadyListener will be notified when the session is ready to be 
used. This is meant to be implemented by Host and Schema listeners but it can 
also be used as a generic callback for when the session is ready regardless of 
whether a metadata listener is implemented or not.
   ```



##########
cluster.go:
##########
@@ -394,6 +397,60 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
        return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
 }
 
+// MetadataConfig configures driver's internal metadata caching and event 
listening.
+type MetadataConfig struct {
+       CacheMode MetadataCacheMode
+
+       // HostListener configures event listeners for host state and topology 
changes.
+       HostListener HostListenersConfig
+
+       // SchemaListener configures event listeners for schema changes.
+       //
+       // If CacheMode is Full, listeners will be notified about all schema 
changes.
+       //
+       // If CacheMode is KeyspaceOnly, only KeyspaceChangeListener will be 
notified,
+       // having other listeners registered will result in an error during 
session creation.
+       //
+       // If CacheMode is Disabled, having these listeners will result in an 
error during session creation.
+       SchemaListener SchemaListenersConfig
+
+       // SessionReadyListener will be notified when the session is ready to 
be used.
+       SessionReadyListener SessionReadyListener
+}
+
+type HostListenersConfig struct {

Review Comment:
   I think we can remove the `Config` suffix from these: 
   
   ```
   type MetadataConfig struct {
        CacheMode MetadataCacheMode
   
        // HostListener configures event listeners for host state and topology 
changes.
        HostListeners HostListeners
   
        // SchemaListener configures event listeners for schema changes.
        //
        // If CacheMode is Full, listeners will be notified about all schema 
changes.
        //
        // If CacheMode is KeyspaceOnly, only KeyspaceChangeListener will be 
notified,
        // having other listeners registered will result in an error during 
session creation.
        //
        // If CacheMode is Disabled, having these listeners will result in an 
error during session creation.
        SchemaListeners SchemaListeners
   
        // SessionReadyListener will be notified when the session is ready to 
be used.
        SessionReadyListener SessionReadyListener
   }
   ```



##########
cluster.go:
##########
@@ -394,6 +397,60 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
        return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
 }
 
+// MetadataConfig configures driver's internal metadata caching and event 
listening.
+type MetadataConfig struct {
+       CacheMode MetadataCacheMode
+
+       // HostListener configures event listeners for host state and topology 
changes.
+       HostListener HostListenersConfig
+
+       // SchemaListener configures event listeners for schema changes.
+       //
+       // If CacheMode is Full, listeners will be notified about all schema 
changes.
+       //
+       // If CacheMode is KeyspaceOnly, only KeyspaceChangeListener will be 
notified,
+       // having other listeners registered will result in an error during 
session creation.
+       //
+       // If CacheMode is Disabled, having these listeners will result in an 
error during session creation.

Review Comment:
   We should probably move these docs about the cache mode to the cache mode 
field itself



##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be 
used.
+type SessionReadyListener interface {
+       OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+       OnNewHost(event NewHostEvent)
+       OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+       Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+       Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+       OnHostUp(event HostUpEvent)
+       OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+       Host *HostInfo
+}
+
+type HostDownEvent struct {
+       Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+       OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+       OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+       OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+       OnTableCreated(event OnTableCreatedEvent)
+       OnTableUpdated(event OnTableUpdatedEvent)
+       OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+       OnUserTypeCreated(event OnUserTypeCreatedEvent)
+       OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+       OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+       OnFunctionCreated(event OnFunctionCreatedEvent)
+       OnFunctionUpdated(event OnFunctionUpdatedEvent)
+       OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+       OnAggregateCreated(event OnAggregateCreatedEvent)
+       OnAggregateUpdated(event OnAggregateUpdatedEvent)
+       OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+       Old *KeyspaceMetadata
+       New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+       Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+       Old *TableMetadata
+       New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+       Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+       Old *UserTypeMetadata
+       New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+       Old *FunctionMetadata
+       New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+       Old *AggregateMetadata
+       New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+       Keyspaces  []KeyspaceChangeListener
+       Tables     []TableChangeListener
+       UserTypes  []UserTypeChangeListener
+       Functions  []FunctionChangeListener
+       Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event 
OnKeyspaceCreatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event 
OnKeyspaceUpdatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event 
OnKeyspaceDroppedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event 
OnUserTypeCreatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event 
OnUserTypeUpdatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event 
OnUserTypeDroppedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event 
OnFunctionCreatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event 
OnFunctionUpdatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event 
OnFunctionDroppedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event 
OnAggregateCreatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event 
OnAggregateUpdatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event 
OnAggregateDroppedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateDropped(event)
+       }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology 
change listeners.
+// Allows to register multiple listeners for the same type of host state and 
topology change.
+type HostStateChangeListenersMux struct {

Review Comment:
   HostListenersMux



##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be 
used.
+type SessionReadyListener interface {
+       OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+       OnNewHost(event NewHostEvent)
+       OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+       Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+       Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+       OnHostUp(event HostUpEvent)
+       OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+       Host *HostInfo
+}
+
+type HostDownEvent struct {
+       Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+       OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+       OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+       OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+       OnTableCreated(event OnTableCreatedEvent)
+       OnTableUpdated(event OnTableUpdatedEvent)
+       OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+       OnUserTypeCreated(event OnUserTypeCreatedEvent)
+       OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+       OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+       OnFunctionCreated(event OnFunctionCreatedEvent)
+       OnFunctionUpdated(event OnFunctionUpdatedEvent)
+       OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+       OnAggregateCreated(event OnAggregateCreatedEvent)
+       OnAggregateUpdated(event OnAggregateUpdatedEvent)
+       OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+       Old *KeyspaceMetadata
+       New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+       Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+       Old *TableMetadata
+       New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+       Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+       Old *UserTypeMetadata
+       New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+       Old *FunctionMetadata
+       New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+       Old *AggregateMetadata
+       New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+       Keyspaces  []KeyspaceChangeListener
+       Tables     []TableChangeListener
+       UserTypes  []UserTypeChangeListener
+       Functions  []FunctionChangeListener
+       Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event 
OnKeyspaceCreatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event 
OnKeyspaceUpdatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event 
OnKeyspaceDroppedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event 
OnUserTypeCreatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event 
OnUserTypeUpdatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event 
OnUserTypeDroppedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event 
OnFunctionCreatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event 
OnFunctionUpdatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event 
OnFunctionDroppedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event 
OnAggregateCreatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event 
OnAggregateUpdatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event 
OnAggregateDroppedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateDropped(event)
+       }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology 
change listeners.
+// Allows to register multiple listeners for the same type of host state and 
topology change.
+type HostStateChangeListenersMux struct {
+       HostStateChangeListeners []HostStatusChangeListener
+       TopologyChangeListeners  []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostUp(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostDown(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnNewHost(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnRemovedHost(event)
+       }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is 
initialized.
+type internalHostStateAndTopologyChangeListener struct {
+       hostStateChangeListener HostStatusChangeListener
+       topologyChangeListener  TopologyChangeListener
+
+       // To avoid calling the listeners before the session is initialized, we 
need to track if the session is initialized.
+       // Updated by the sessionReady method.
+       sessionInitialized bool
+}
+
+func newInternalHostStateAndTopologyChangeListener(hostStateChangeListener 
HostStatusChangeListener, topologyChangeListener TopologyChangeListener) 
*internalHostStateAndTopologyChangeListener {
+       return &internalHostStateAndTopologyChangeListener{
+               hostStateChangeListener: hostStateChangeListener,
+               topologyChangeListener:  topologyChangeListener,
+               sessionInitialized:      false,
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostUp(event 
HostUpEvent) {
+       if l.hostStateChangeListener != nil && l.sessionInitialized {
+               l.hostStateChangeListener.OnHostUp(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostDown(event 
HostDownEvent) {
+       if l.hostStateChangeListener != nil && l.sessionInitialized {
+               l.hostStateChangeListener.OnHostDown(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnNewHost(event 
NewHostEvent) {
+       if l.topologyChangeListener != nil && l.sessionInitialized {
+               l.topologyChangeListener.OnNewHost(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnRemovedHost(event 
RemovedHostEvent) {
+       if l.topologyChangeListener != nil && l.sessionInitialized {
+               l.topologyChangeListener.OnRemovedHost(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnSessionReady() {

Review Comment:
   We should avoid internal components using the public API like this in 
general, just makes for a clearer boundary. Either we provide the session 
pointer to this struct or we just provide a simple `func() bool`



##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be 
used.
+type SessionReadyListener interface {
+       OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+       OnNewHost(event NewHostEvent)
+       OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+       Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+       Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+       OnHostUp(event HostUpEvent)
+       OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+       Host *HostInfo
+}
+
+type HostDownEvent struct {
+       Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+       OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+       OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+       OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+       OnTableCreated(event OnTableCreatedEvent)
+       OnTableUpdated(event OnTableUpdatedEvent)
+       OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+       OnUserTypeCreated(event OnUserTypeCreatedEvent)
+       OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+       OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+       OnFunctionCreated(event OnFunctionCreatedEvent)
+       OnFunctionUpdated(event OnFunctionUpdatedEvent)
+       OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+       OnAggregateCreated(event OnAggregateCreatedEvent)
+       OnAggregateUpdated(event OnAggregateUpdatedEvent)
+       OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+       Old *KeyspaceMetadata
+       New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+       Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+       Old *TableMetadata
+       New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+       Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+       Old *UserTypeMetadata
+       New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+       Old *FunctionMetadata
+       New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+       Old *AggregateMetadata
+       New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+       Keyspaces  []KeyspaceChangeListener
+       Tables     []TableChangeListener
+       UserTypes  []UserTypeChangeListener
+       Functions  []FunctionChangeListener
+       Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event 
OnKeyspaceCreatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event 
OnKeyspaceUpdatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event 
OnKeyspaceDroppedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event 
OnUserTypeCreatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event 
OnUserTypeUpdatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event 
OnUserTypeDroppedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event 
OnFunctionCreatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event 
OnFunctionUpdatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event 
OnFunctionDroppedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event 
OnAggregateCreatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event 
OnAggregateUpdatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event 
OnAggregateDroppedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateDropped(event)
+       }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology 
change listeners.
+// Allows to register multiple listeners for the same type of host state and 
topology change.
+type HostStateChangeListenersMux struct {
+       HostStateChangeListeners []HostStatusChangeListener
+       TopologyChangeListeners  []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostUp(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostDown(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnNewHost(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnRemovedHost(event)
+       }
+}

Review Comment:
   We're missing the SessionReadyListenersMux no?



##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be 
used.
+type SessionReadyListener interface {
+       OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+       OnNewHost(event NewHostEvent)
+       OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+       Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+       Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+       OnHostUp(event HostUpEvent)
+       OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+       Host *HostInfo
+}
+
+type HostDownEvent struct {
+       Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+       OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+       OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+       OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+       OnTableCreated(event OnTableCreatedEvent)
+       OnTableUpdated(event OnTableUpdatedEvent)
+       OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+       OnUserTypeCreated(event OnUserTypeCreatedEvent)
+       OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+       OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+       OnFunctionCreated(event OnFunctionCreatedEvent)
+       OnFunctionUpdated(event OnFunctionUpdatedEvent)
+       OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+       OnAggregateCreated(event OnAggregateCreatedEvent)
+       OnAggregateUpdated(event OnAggregateUpdatedEvent)
+       OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+       Old *KeyspaceMetadata
+       New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+       Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+       Old *TableMetadata
+       New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+       Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+       Old *UserTypeMetadata
+       New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+       Old *FunctionMetadata
+       New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+       Old *AggregateMetadata
+       New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+       Keyspaces  []KeyspaceChangeListener
+       Tables     []TableChangeListener
+       UserTypes  []UserTypeChangeListener
+       Functions  []FunctionChangeListener
+       Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event 
OnKeyspaceCreatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event 
OnKeyspaceUpdatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event 
OnKeyspaceDroppedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event 
OnUserTypeCreatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event 
OnUserTypeUpdatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event 
OnUserTypeDroppedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event 
OnFunctionCreatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event 
OnFunctionUpdatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event 
OnFunctionDroppedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event 
OnAggregateCreatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event 
OnAggregateUpdatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event 
OnAggregateDroppedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateDropped(event)
+       }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology 
change listeners.
+// Allows to register multiple listeners for the same type of host state and 
topology change.
+type HostStateChangeListenersMux struct {
+       HostStateChangeListeners []HostStatusChangeListener
+       TopologyChangeListeners  []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostUp(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostDown(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnNewHost(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnRemovedHost(event)
+       }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is 
initialized.
+type internalHostStateAndTopologyChangeListener struct {
+       hostStateChangeListener HostStatusChangeListener
+       topologyChangeListener  TopologyChangeListener
+
+       // To avoid calling the listeners before the session is initialized, we 
need to track if the session is initialized.
+       // Updated by the sessionReady method.
+       sessionInitialized bool
+}
+
+func newInternalHostStateAndTopologyChangeListener(hostStateChangeListener 
HostStatusChangeListener, topologyChangeListener TopologyChangeListener) 
*internalHostStateAndTopologyChangeListener {
+       return &internalHostStateAndTopologyChangeListener{
+               hostStateChangeListener: hostStateChangeListener,
+               topologyChangeListener:  topologyChangeListener,
+               sessionInitialized:      false,
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostUp(event 
HostUpEvent) {
+       if l.hostStateChangeListener != nil && l.sessionInitialized {
+               l.hostStateChangeListener.OnHostUp(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostDown(event 
HostDownEvent) {
+       if l.hostStateChangeListener != nil && l.sessionInitialized {
+               l.hostStateChangeListener.OnHostDown(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnNewHost(event 
NewHostEvent) {
+       if l.topologyChangeListener != nil && l.sessionInitialized {
+               l.topologyChangeListener.OnNewHost(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnRemovedHost(event 
RemovedHostEvent) {
+       if l.topologyChangeListener != nil && l.sessionInitialized {
+               l.topologyChangeListener.OnRemovedHost(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnSessionReady() {
+       l.sessionInitialized = true
+}
+
+// Wrapper around the session ready listeners.
+type sessionReadyListeners struct {
+       sessionReadyListeners []SessionReadyListener
+}
+
+func (mux *sessionReadyListeners) OnSessionReady() {

Review Comment:
   no need for pointers



##########
cluster.go:
##########
@@ -394,6 +397,60 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
        return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
 }
 
+// MetadataConfig configures driver's internal metadata caching and event 
listening.
+type MetadataConfig struct {
+       CacheMode MetadataCacheMode
+
+       // HostListener configures event listeners for host state and topology 
changes.
+       HostListener HostListenersConfig

Review Comment:
   We should mention and reference the Mux types on these API docs comments 
(both on each config field + the struct docs + the individual listener field 
docs. And we need a new section on `doc.go` to document this new feature (maybe 
improve an existing Metadata section if it already exists otherwise create it)



##########
cluster.go:
##########
@@ -394,6 +397,60 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
        return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
 }
 
+// MetadataConfig configures driver's internal metadata caching and event 
listening.
+type MetadataConfig struct {
+       CacheMode MetadataCacheMode
+
+       // HostListener configures event listeners for host state and topology 
changes.
+       HostListener HostListenersConfig
+
+       // SchemaListener configures event listeners for schema changes.
+       //
+       // If CacheMode is Full, listeners will be notified about all schema 
changes.
+       //
+       // If CacheMode is KeyspaceOnly, only KeyspaceChangeListener will be 
notified,
+       // having other listeners registered will result in an error during 
session creation.
+       //
+       // If CacheMode is Disabled, having these listeners will result in an 
error during session creation.
+       SchemaListener SchemaListenersConfig
+
+       // SessionReadyListener will be notified when the session is ready to 
be used.
+       SessionReadyListener SessionReadyListener
+}
+
+type HostListenersConfig struct {
+       // HostStateChangeListener will be notified about host state events 
(UP, DOWN).
+       HostStateChangeListener HostStatusChangeListener
+
+       // TopologyChangeListener will be notified about topology change events
+       // (NEW_NODE, REMOVED_NODE).
+       TopologyChangeListener TopologyChangeListener
+}
+
+type SchemaListenersConfig struct {
+       KeyspaceChangeListener  KeyspaceChangeListener
+       TableChangeListener     TableChangeListener
+       UserTypeChangeListener  UserTypeChangeListener
+       FunctionChangeListener  FunctionChangeListener
+       AggregateChangeListener AggregateChangeListener
+}
+
+func (cfg *SchemaListenersConfig) hasSchemaChangeListeners() bool {
+       return cfg.hasKeyspaceListener() || 
cfg.hasNonKeyspaceSchemaChangeListeners()
+}
+
+func (cfg *SchemaListenersConfig) hasKeyspaceListener() bool {
+       return cfg.KeyspaceChangeListener != nil
+}
+
+// hasNonKeyspaceSchemaChangeListeners returns true if any schema change 
listener is set except for keyspace one.
+func (cfg *SchemaListenersConfig) hasNonKeyspaceSchemaChangeListeners() bool {
+       return cfg.TableChangeListener != nil ||
+               cfg.UserTypeChangeListener != nil ||
+               cfg.FunctionChangeListener != nil ||
+               cfg.AggregateChangeListener != nil
+}

Review Comment:
   We can move all of these `SchemaListenersConfig` methods to a new internal 
schemalisteners object like you already did for the host listeners and 
sessionready listener



##########
metadata.go:
##########
@@ -2049,3 +2101,319 @@ func isIdentifierChar(c byte) bool {
                c == '_' ||
                c == '&'
 }
+
+// Handles the full schema changes including table, user type, function, 
aggregate.
+func handleFullSchemaChanges(session *Session, oldKeyspaces, newKeyspaces 
map[string]*KeyspaceMetadata) {
+       for _, oldKsMeta := range oldKeyspaces {
+               newKsMeta, ok := newKeyspaces[oldKsMeta.Name]
+               if !ok {
+                       // Skip, KeyspaceChangeListener is already notified in 
refreshSchemas()
+                       continue
+               }
+
+               if session.schemaChangeListeners.TableChangeListener != nil {

Review Comment:
   with the internal schema listeners object we can replace this with just 
`hasTableListener` and all the nil checks will be contained inside the internal 
schema listeners struct code (and 0 risk of panic'ing on nil references)



##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be 
used.
+type SessionReadyListener interface {
+       OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+       OnNewHost(event NewHostEvent)
+       OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+       Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+       Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+       OnHostUp(event HostUpEvent)
+       OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+       Host *HostInfo
+}
+
+type HostDownEvent struct {
+       Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+       OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+       OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+       OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+       OnTableCreated(event OnTableCreatedEvent)
+       OnTableUpdated(event OnTableUpdatedEvent)
+       OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+       OnUserTypeCreated(event OnUserTypeCreatedEvent)
+       OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+       OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+       OnFunctionCreated(event OnFunctionCreatedEvent)
+       OnFunctionUpdated(event OnFunctionUpdatedEvent)
+       OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+       OnAggregateCreated(event OnAggregateCreatedEvent)
+       OnAggregateUpdated(event OnAggregateUpdatedEvent)
+       OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+       Old *KeyspaceMetadata
+       New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+       Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+       Old *TableMetadata
+       New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+       Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+       Old *UserTypeMetadata
+       New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+       Old *FunctionMetadata
+       New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+       Old *AggregateMetadata
+       New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+       Keyspaces  []KeyspaceChangeListener
+       Tables     []TableChangeListener
+       UserTypes  []UserTypeChangeListener
+       Functions  []FunctionChangeListener
+       Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event 
OnKeyspaceCreatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event 
OnKeyspaceUpdatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event 
OnKeyspaceDroppedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event 
OnUserTypeCreatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event 
OnUserTypeUpdatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event 
OnUserTypeDroppedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event 
OnFunctionCreatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event 
OnFunctionUpdatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event 
OnFunctionDroppedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event 
OnAggregateCreatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event 
OnAggregateUpdatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event 
OnAggregateDroppedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateDropped(event)
+       }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology 
change listeners.
+// Allows to register multiple listeners for the same type of host state and 
topology change.
+type HostStateChangeListenersMux struct {
+       HostStateChangeListeners []HostStatusChangeListener
+       TopologyChangeListeners  []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostUp(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostDown(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnNewHost(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnRemovedHost(event)
+       }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is 
initialized.
+type internalHostStateAndTopologyChangeListener struct {

Review Comment:
   hostListenersWrapper? internalHostListeners?



##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be 
used.
+type SessionReadyListener interface {
+       OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+       OnNewHost(event NewHostEvent)
+       OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+       Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+       Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+       OnHostUp(event HostUpEvent)
+       OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+       Host *HostInfo
+}
+
+type HostDownEvent struct {
+       Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+       OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+       OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+       OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+       OnTableCreated(event OnTableCreatedEvent)
+       OnTableUpdated(event OnTableUpdatedEvent)
+       OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+       OnUserTypeCreated(event OnUserTypeCreatedEvent)
+       OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+       OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+       OnFunctionCreated(event OnFunctionCreatedEvent)
+       OnFunctionUpdated(event OnFunctionUpdatedEvent)
+       OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+       OnAggregateCreated(event OnAggregateCreatedEvent)
+       OnAggregateUpdated(event OnAggregateUpdatedEvent)
+       OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+       Old *KeyspaceMetadata
+       New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+       Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+       Old *TableMetadata
+       New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+       Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+       Old *UserTypeMetadata
+       New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+       Old *FunctionMetadata
+       New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+       Old *AggregateMetadata
+       New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+       Keyspaces  []KeyspaceChangeListener
+       Tables     []TableChangeListener
+       UserTypes  []UserTypeChangeListener
+       Functions  []FunctionChangeListener
+       Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event 
OnKeyspaceCreatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event 
OnKeyspaceUpdatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event 
OnKeyspaceDroppedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event 
OnUserTypeCreatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event 
OnUserTypeUpdatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event 
OnUserTypeDroppedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event 
OnFunctionCreatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event 
OnFunctionUpdatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event 
OnFunctionDroppedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event 
OnAggregateCreatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event 
OnAggregateUpdatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event 
OnAggregateDroppedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateDropped(event)
+       }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology 
change listeners.
+// Allows to register multiple listeners for the same type of host state and 
topology change.
+type HostStateChangeListenersMux struct {
+       HostStateChangeListeners []HostStatusChangeListener
+       TopologyChangeListeners  []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostUp(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostDown(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnNewHost(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnRemovedHost(event)
+       }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is 
initialized.
+type internalHostStateAndTopologyChangeListener struct {
+       hostStateChangeListener HostStatusChangeListener
+       topologyChangeListener  TopologyChangeListener
+
+       // To avoid calling the listeners before the session is initialized, we 
need to track if the session is initialized.
+       // Updated by the sessionReady method.
+       sessionInitialized bool
+}
+
+func newInternalHostStateAndTopologyChangeListener(hostStateChangeListener 
HostStatusChangeListener, topologyChangeListener TopologyChangeListener) 
*internalHostStateAndTopologyChangeListener {
+       return &internalHostStateAndTopologyChangeListener{
+               hostStateChangeListener: hostStateChangeListener,
+               topologyChangeListener:  topologyChangeListener,
+               sessionInitialized:      false,
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostUp(event 
HostUpEvent) {
+       if l.hostStateChangeListener != nil && l.sessionInitialized {
+               l.hostStateChangeListener.OnHostUp(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostDown(event 
HostDownEvent) {
+       if l.hostStateChangeListener != nil && l.sessionInitialized {
+               l.hostStateChangeListener.OnHostDown(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnNewHost(event 
NewHostEvent) {
+       if l.topologyChangeListener != nil && l.sessionInitialized {
+               l.topologyChangeListener.OnNewHost(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnRemovedHost(event 
RemovedHostEvent) {
+       if l.topologyChangeListener != nil && l.sessionInitialized {
+               l.topologyChangeListener.OnRemovedHost(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnSessionReady() {
+       l.sessionInitialized = true
+}
+

Review Comment:
   Why no `internalSchemaListeners` ?



##########
metadata.go:
##########
@@ -483,6 +504,43 @@ func refreshSchemas(session *Session) error {
                        
session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: updatedKeyspace, 
Change: SchemaChangeTypeUpdated})
                }
        }
+
+       // Notify the keyspace change listeners if they are set.
+       if session.schemaChangeListeners.hasKeyspaceListener() && 
sessionInitialized {
+               keyspaceListener := 
session.schemaChangeListeners.KeyspaceChangeListener
+               for _, createdKeyspace := range newKeyspaces {
+                       
keyspaceListener.OnKeyspaceCreated(OnKeyspaceCreatedEvent{Keyspace: 
keyspaceMeta[createdKeyspace]})
+
+               }
+               for _, droppedKeyspace := range droppedKeyspaces {
+                       
keyspaceListener.OnKeyspaceDropped(OnKeyspaceDroppedEvent{Keyspace: 
oldKeyspaceMeta[droppedKeyspace]})
+
+               }
+               for _, updatedKeyspace := range updatedKeyspaces {

Review Comment:
   we may already have looped through all the keyspaces beforehand, shouldn't 
we try to merge these to optimize a bit? Rough code example:
   
   ```
   // Check notification capabilities once
   notifier, supportsRefresh := session.policy.(schemaRefreshNotifier)
   hasListener := session.schemaChangeListeners.hasKeyspaceListener() && 
sessionInitialized
   keyspaceListener := session.schemaChangeListeners.KeyspaceChangeListener
   
   if supportsRefresh {
       notifier.schemaRefreshed(sd.getSchemaMetaForRead())
   }
   
   // If we don't support schemaRefreshed OR we have listeners, we need to loop
   if !supportsRefresh || hasListener {
       for _, name := range newKeyspaces {
           if !supportsRefresh {
               session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: 
name, Change: SchemaChangeTypeCreated})
           }
           if hasListener {
               
keyspaceListener.OnKeyspaceCreated(OnKeyspaceCreatedEvent{Keyspace: 
keyspaceMeta[name]})
           }
       }
   
       for _, name := range droppedKeyspaces {
           if !supportsRefresh {
               session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: 
name, Change: SchemaChangeTypeDropped})
           }
           if hasListener {
               
keyspaceListener.OnKeyspaceDropped(OnKeyspaceDroppedEvent{Keyspace: 
oldKeyspaceMeta[name]})
           }
       }
   
       for _, name := range updatedKeyspaces {
           if !supportsRefresh {
               session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: 
name, Change: SchemaChangeTypeUpdated})
           }
           if hasListener {
               keyspaceListener.OnKeyspaceUpdated(OnKeyspaceUpdatedEvent{
                   Old: oldKeyspaceMeta[name],
                   New: keyspaceMeta[name],
               })
           }
       }
   }
   ```



##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be 
used.
+type SessionReadyListener interface {
+       OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+       OnNewHost(event NewHostEvent)
+       OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+       Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+       Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+       OnHostUp(event HostUpEvent)
+       OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+       Host *HostInfo
+}
+
+type HostDownEvent struct {
+       Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+       OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+       OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+       OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+       OnTableCreated(event OnTableCreatedEvent)
+       OnTableUpdated(event OnTableUpdatedEvent)
+       OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+       OnUserTypeCreated(event OnUserTypeCreatedEvent)
+       OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+       OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+       OnFunctionCreated(event OnFunctionCreatedEvent)
+       OnFunctionUpdated(event OnFunctionUpdatedEvent)
+       OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+       OnAggregateCreated(event OnAggregateCreatedEvent)
+       OnAggregateUpdated(event OnAggregateUpdatedEvent)
+       OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+       Old *KeyspaceMetadata
+       New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+       Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+       Old *TableMetadata
+       New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+       Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+       Old *UserTypeMetadata
+       New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+       Old *FunctionMetadata
+       New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+       Old *AggregateMetadata
+       New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+       Keyspaces  []KeyspaceChangeListener
+       Tables     []TableChangeListener
+       UserTypes  []UserTypeChangeListener
+       Functions  []FunctionChangeListener
+       Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event 
OnKeyspaceCreatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event 
OnKeyspaceUpdatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event 
OnKeyspaceDroppedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event 
OnUserTypeCreatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event 
OnUserTypeUpdatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event 
OnUserTypeDroppedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event 
OnFunctionCreatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event 
OnFunctionUpdatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event 
OnFunctionDroppedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event 
OnAggregateCreatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event 
OnAggregateUpdatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event 
OnAggregateDroppedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateDropped(event)
+       }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology 
change listeners.
+// Allows to register multiple listeners for the same type of host state and 
topology change.
+type HostStateChangeListenersMux struct {
+       HostStateChangeListeners []HostStatusChangeListener
+       TopologyChangeListeners  []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostUp(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostDown(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnNewHost(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnRemovedHost(event)
+       }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is 
initialized.
+type internalHostStateAndTopologyChangeListener struct {
+       hostStateChangeListener HostStatusChangeListener
+       topologyChangeListener  TopologyChangeListener
+
+       // To avoid calling the listeners before the session is initialized, we 
need to track if the session is initialized.
+       // Updated by the sessionReady method.
+       sessionInitialized bool
+}
+
+func newInternalHostStateAndTopologyChangeListener(hostStateChangeListener 
HostStatusChangeListener, topologyChangeListener TopologyChangeListener) 
*internalHostStateAndTopologyChangeListener {

Review Comment:
   When we remove the `sessionInitialized` field we can make this return a 
value instead of pointer (and change the receivers)



##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be 
used.
+type SessionReadyListener interface {
+       OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+       OnNewHost(event NewHostEvent)
+       OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+       Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+       Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+       OnHostUp(event HostUpEvent)
+       OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+       Host *HostInfo
+}
+
+type HostDownEvent struct {
+       Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+       OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+       OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+       OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+       OnTableCreated(event OnTableCreatedEvent)
+       OnTableUpdated(event OnTableUpdatedEvent)
+       OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+       OnUserTypeCreated(event OnUserTypeCreatedEvent)
+       OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+       OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+       OnFunctionCreated(event OnFunctionCreatedEvent)
+       OnFunctionUpdated(event OnFunctionUpdatedEvent)
+       OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+       OnAggregateCreated(event OnAggregateCreatedEvent)
+       OnAggregateUpdated(event OnAggregateUpdatedEvent)
+       OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+       Old *KeyspaceMetadata
+       New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+       Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+       Old *TableMetadata
+       New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+       Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+       Old *UserTypeMetadata
+       New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+       Old *FunctionMetadata
+       New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+       Old *AggregateMetadata
+       New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+       Keyspaces  []KeyspaceChangeListener
+       Tables     []TableChangeListener
+       UserTypes  []UserTypeChangeListener
+       Functions  []FunctionChangeListener
+       Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event 
OnKeyspaceCreatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event 
OnKeyspaceUpdatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event 
OnKeyspaceDroppedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event 
OnUserTypeCreatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event 
OnUserTypeUpdatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event 
OnUserTypeDroppedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event 
OnFunctionCreatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event 
OnFunctionUpdatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event 
OnFunctionDroppedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event 
OnAggregateCreatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event 
OnAggregateUpdatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event 
OnAggregateDroppedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateDropped(event)
+       }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology 
change listeners.
+// Allows to register multiple listeners for the same type of host state and 
topology change.
+type HostStateChangeListenersMux struct {
+       HostStateChangeListeners []HostStatusChangeListener
+       TopologyChangeListeners  []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostUp(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostDown(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnNewHost(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnRemovedHost(event)
+       }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is 
initialized.
+type internalHostStateAndTopologyChangeListener struct {
+       hostStateChangeListener HostStatusChangeListener
+       topologyChangeListener  TopologyChangeListener
+
+       // To avoid calling the listeners before the session is initialized, we 
need to track if the session is initialized.
+       // Updated by the sessionReady method.
+       sessionInitialized bool
+}
+
+func newInternalHostStateAndTopologyChangeListener(hostStateChangeListener 
HostStatusChangeListener, topologyChangeListener TopologyChangeListener) 
*internalHostStateAndTopologyChangeListener {
+       return &internalHostStateAndTopologyChangeListener{
+               hostStateChangeListener: hostStateChangeListener,
+               topologyChangeListener:  topologyChangeListener,
+               sessionInitialized:      false,
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostUp(event 
HostUpEvent) {
+       if l.hostStateChangeListener != nil && l.sessionInitialized {
+               l.hostStateChangeListener.OnHostUp(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostDown(event 
HostDownEvent) {
+       if l.hostStateChangeListener != nil && l.sessionInitialized {
+               l.hostStateChangeListener.OnHostDown(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnNewHost(event 
NewHostEvent) {
+       if l.topologyChangeListener != nil && l.sessionInitialized {
+               l.topologyChangeListener.OnNewHost(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnRemovedHost(event 
RemovedHostEvent) {
+       if l.topologyChangeListener != nil && l.sessionInitialized {
+               l.topologyChangeListener.OnRemovedHost(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnSessionReady() {
+       l.sessionInitialized = true
+}
+
+// Wrapper around the session ready listeners.
+type sessionReadyListeners struct {
+       sessionReadyListeners []SessionReadyListener
+}
+
+func (mux *sessionReadyListeners) OnSessionReady() {
+       for _, listener := range mux.sessionReadyListeners {
+               listener.OnSessionReady()
+       }
+}
+
+func (mux *sessionReadyListeners) addListener(listener SessionReadyListener) {

Review Comment:
   make it consistent with the other internal listeners object above by adding 
a `newSessionReadyListeners` function instead of `addListener` when we replace 
the slice field



##########
metadata.go:
##########
@@ -483,6 +504,43 @@ func refreshSchemas(session *Session) error {
                        
session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: updatedKeyspace, 
Change: SchemaChangeTypeUpdated})
                }
        }
+
+       // Notify the keyspace change listeners if they are set.
+       if session.schemaChangeListeners.hasKeyspaceListener() && 
sessionInitialized {
+               keyspaceListener := 
session.schemaChangeListeners.KeyspaceChangeListener
+               for _, createdKeyspace := range newKeyspaces {
+                       
keyspaceListener.OnKeyspaceCreated(OnKeyspaceCreatedEvent{Keyspace: 
keyspaceMeta[createdKeyspace]})
+
+               }
+               for _, droppedKeyspace := range droppedKeyspaces {
+                       
keyspaceListener.OnKeyspaceDropped(OnKeyspaceDroppedEvent{Keyspace: 
oldKeyspaceMeta[droppedKeyspace]})
+
+               }
+               for _, updatedKeyspace := range updatedKeyspaces {
+                       
keyspaceListener.OnKeyspaceUpdated(OnKeyspaceUpdatedEvent{
+                               Old: oldKeyspaceMeta[updatedKeyspace],
+                               New: keyspaceMeta[updatedKeyspace],
+                       })
+               }
+       }
+
+       // If we have full metadata cache mode, notify the non-keyspace change 
listeners if they are set.
+       if session.cfg.Metadata.CacheMode == Full &&
+               
session.cfg.Metadata.SchemaListener.hasNonKeyspaceSchemaChangeListeners() &&
+               sessionInitialized {
+               session.logger.Debug("Processing full schema changes.")

Review Comment:
   I'd say we can remove this log message, one is enough ("Finished calling 
schema change listeners")



##########
metadata.go:
##########
@@ -2049,3 +2101,319 @@ func isIdentifierChar(c byte) bool {
                c == '_' ||
                c == '&'
 }
+
+// Handles the full schema changes including table, user type, function, 
aggregate.
+func handleFullSchemaChanges(session *Session, oldKeyspaces, newKeyspaces 
map[string]*KeyspaceMetadata) {
+       for _, oldKsMeta := range oldKeyspaces {
+               newKsMeta, ok := newKeyspaces[oldKsMeta.Name]
+               if !ok {
+                       // Skip, KeyspaceChangeListener is already notified in 
refreshSchemas()
+                       continue
+               }
+
+               if session.schemaChangeListeners.TableChangeListener != nil {
+                       handleSchemaTableChanges(session, oldKsMeta, newKsMeta)
+               }
+               if session.schemaChangeListeners.UserTypeChangeListener != nil {
+                       handleSchemaUserTypeChanges(session, oldKsMeta, 
newKsMeta)
+               }
+               if session.schemaChangeListeners.FunctionChangeListener != nil {
+                       handleSchemaFunctionChanges(session, oldKsMeta, 
newKsMeta)
+               }
+               if session.schemaChangeListeners.AggregateChangeListener != nil 
{
+                       handleSchemaAggregateChanges(session, oldKsMeta, 
newKsMeta)
+               }
+       }
+}
+
+// Computes the schema table changes and notifies the event listener if it is 
set.
+// It is expected that the TableChangeListener is set on the Session.
+func handleSchemaTableChanges(session *Session, oldKeyspace, newKeyspace 
*KeyspaceMetadata) {
+       var createdEvents []OnTableCreatedEvent
+       var droppedEvents []OnTableDroppedEvent
+       var updatedEvents []OnTableUpdatedEvent
+
+       for _, oldTableMeta := range oldKeyspace.Tables {
+               newTableMeta, ok := newKeyspace.Tables[oldTableMeta.Name]
+               if !ok {
+                       droppedEvents = append(droppedEvents, 
OnTableDroppedEvent{Table: oldTableMeta})
+               } else {
+                       if !compareTablesMetadata(oldTableMeta, newTableMeta) {
+                               updatedEvents = append(updatedEvents, 
OnTableUpdatedEvent{Old: oldTableMeta, New: newTableMeta})
+                       }
+               }
+       }
+
+       for _, newTableMeta := range newKeyspace.Tables {
+               _, ok := oldKeyspace.Tables[newTableMeta.Name]
+               if !ok {
+                       createdEvents = append(createdEvents, 
OnTableCreatedEvent{Table: newTableMeta})
+               }
+       }
+
+       session.logger.Debug("Computed schema table change events",
+               NewLogFieldInt("created_table_events_count", 
len(createdEvents)),
+               NewLogFieldInt("updated_table_events_count", 
len(updatedEvents)),
+               NewLogFieldInt("dropped_table_events_count", 
len(droppedEvents)),

Review Comment:
   I like this for debugging, maybe we should try to do this for the keyspace 
events too? Even if a keyspace listener isn't set the policy might require the 
events so it would be useful during debug to see if the number of events that 
are being provided to the hostselectionpolicy (or the keyspace listener) seem 
normal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to