joao-r-reis commented on code in PR #1929:
URL: 
https://github.com/apache/cassandra-gocql-driver/pull/1929#discussion_r2930997467


##########
event_listeners.go:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
+ * Copyright (c) 2012, The Gocql authors,
+ * provided under the BSD-3-Clause License.
+ * See the NOTICE file distributed with this work for additional information.
+ */
+
+package gocql
+
+// SessionReadyListener is notified when the session is ready to be used.
+// This is useful for users who need to know when the session is ready to be 
used.
+type SessionReadyListener interface {
+       OnSessionReady()
+}
+
+// TopologyChangeListener receives topology change events.
+// Host may be nil if the node is not yet known to the ring.
+type TopologyChangeListener interface {
+       OnNewHost(event NewHostEvent)
+       OnRemovedHost(event RemovedHostEvent)
+}
+
+type NewHostEvent struct {
+       Host *HostInfo
+}
+
+type RemovedHostEvent struct {
+       Host *HostInfo
+}
+
+type HostStatusChangeListener interface {
+       OnHostUp(event HostUpEvent)
+       OnHostDown(event HostDownEvent)
+}
+
+type HostUpEvent struct {
+       Host *HostInfo
+}
+
+type HostDownEvent struct {
+       Host *HostInfo
+}
+
+type KeyspaceChangeListener interface {
+       OnKeyspaceCreated(event OnKeyspaceCreatedEvent)
+       OnKeyspaceUpdated(event OnKeyspaceUpdatedEvent)
+       OnKeyspaceDropped(event OnKeyspaceDroppedEvent)
+}
+
+type TableChangeListener interface {
+       OnTableCreated(event OnTableCreatedEvent)
+       OnTableUpdated(event OnTableUpdatedEvent)
+       OnTableDropped(event OnTableDroppedEvent)
+}
+
+type UserTypeChangeListener interface {
+       OnUserTypeCreated(event OnUserTypeCreatedEvent)
+       OnUserTypeUpdated(event OnUserTypeUpdatedEvent)
+       OnUserTypeDropped(event OnUserTypeDroppedEvent)
+}
+
+type FunctionChangeListener interface {
+       OnFunctionCreated(event OnFunctionCreatedEvent)
+       OnFunctionUpdated(event OnFunctionUpdatedEvent)
+       OnFunctionDropped(event OnFunctionDroppedEvent)
+}
+
+type AggregateChangeListener interface {
+       OnAggregateCreated(event OnAggregateCreatedEvent)
+       OnAggregateUpdated(event OnAggregateUpdatedEvent)
+       OnAggregateDropped(event OnAggregateDroppedEvent)
+}
+
+type OnKeyspaceCreatedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnKeyspaceUpdatedEvent struct {
+       Old *KeyspaceMetadata
+       New *KeyspaceMetadata
+}
+
+type OnKeyspaceDroppedEvent struct {
+       Keyspace *KeyspaceMetadata
+}
+
+type OnTableCreatedEvent struct {
+       Table *TableMetadata
+}
+
+type OnTableUpdatedEvent struct {
+       Old *TableMetadata
+       New *TableMetadata
+}
+
+type OnTableDroppedEvent struct {
+       Table *TableMetadata
+}
+
+type OnUserTypeCreatedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnUserTypeUpdatedEvent struct {
+       Old *UserTypeMetadata
+       New *UserTypeMetadata
+}
+
+type OnUserTypeDroppedEvent struct {
+       UserType *UserTypeMetadata
+}
+
+type OnFunctionCreatedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnFunctionUpdatedEvent struct {
+       Old *FunctionMetadata
+       New *FunctionMetadata
+}
+
+type OnFunctionDroppedEvent struct {
+       Function *FunctionMetadata
+}
+
+type OnAggregateCreatedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+type OnAggregateUpdatedEvent struct {
+       Old *AggregateMetadata
+       New *AggregateMetadata
+}
+
+type OnAggregateDroppedEvent struct {
+       Aggregate *AggregateMetadata
+}
+
+// SchemaChangeListenersMux is a multiplexer for schema change listeners.
+// Allows to register multiple listeners for the same type of schema change.
+type SchemaChangeListenersMux struct {
+       Keyspaces  []KeyspaceChangeListener
+       Tables     []TableChangeListener
+       UserTypes  []UserTypeChangeListener
+       Functions  []FunctionChangeListener
+       Aggregates []AggregateChangeListener
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceCreated(event 
OnKeyspaceCreatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceUpdated(event 
OnKeyspaceUpdatedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnKeyspaceDropped(event 
OnKeyspaceDroppedEvent) {
+       for _, listener := range mux.Keyspaces {
+               listener.OnKeyspaceDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableCreated(event OnTableCreatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableUpdated(event OnTableUpdatedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnTableDropped(event OnTableDroppedEvent) 
{
+       for _, listener := range mux.Tables {
+               listener.OnTableDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeCreated(event 
OnUserTypeCreatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeUpdated(event 
OnUserTypeUpdatedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnUserTypeDropped(event 
OnUserTypeDroppedEvent) {
+       for _, listener := range mux.UserTypes {
+               listener.OnUserTypeDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionCreated(event 
OnFunctionCreatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionUpdated(event 
OnFunctionUpdatedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnFunctionDropped(event 
OnFunctionDroppedEvent) {
+       for _, listener := range mux.Functions {
+               listener.OnFunctionDropped(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateCreated(event 
OnAggregateCreatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateCreated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateUpdated(event 
OnAggregateUpdatedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateUpdated(event)
+       }
+}
+
+func (mux *SchemaChangeListenersMux) OnAggregateDropped(event 
OnAggregateDroppedEvent) {
+       for _, listener := range mux.Aggregates {
+               listener.OnAggregateDropped(event)
+       }
+}
+
+// HostStateChangeListenersMux is a multiplexer for host state and topology 
change listeners.
+// Allows to register multiple listeners for the same type of host state and 
topology change.
+type HostStateChangeListenersMux struct {
+       HostStateChangeListeners []HostStatusChangeListener
+       TopologyChangeListeners  []TopologyChangeListener
+}
+
+func (mux *HostStateChangeListenersMux) OnHostUp(event HostUpEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostUp(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnHostDown(event HostDownEvent) {
+       for _, listener := range mux.HostStateChangeListeners {
+               listener.OnHostDown(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnNewHost(event NewHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnNewHost(event)
+       }
+}
+
+func (mux *HostStateChangeListenersMux) OnRemovedHost(event RemovedHostEvent) {
+       for _, listener := range mux.TopologyChangeListeners {
+               listener.OnRemovedHost(event)
+       }
+}
+
+// Wrapper around the host topology and state change listeners.
+// Provides nil checks for the listeners and tracks if the session is 
initialized.
+type internalHostStateAndTopologyChangeListener struct {
+       hostStateChangeListener HostStatusChangeListener
+       topologyChangeListener  TopologyChangeListener
+
+       // To avoid calling the listeners before the session is initialized, we 
need to track if the session is initialized.
+       // Updated by the sessionReady method.
+       sessionInitialized bool
+}
+
+func newInternalHostStateAndTopologyChangeListener(hostStateChangeListener 
HostStatusChangeListener, topologyChangeListener TopologyChangeListener) 
*internalHostStateAndTopologyChangeListener {
+       return &internalHostStateAndTopologyChangeListener{
+               hostStateChangeListener: hostStateChangeListener,
+               topologyChangeListener:  topologyChangeListener,
+               sessionInitialized:      false,
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostUp(event 
HostUpEvent) {
+       if l.hostStateChangeListener != nil && l.sessionInitialized {
+               l.hostStateChangeListener.OnHostUp(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnHostDown(event 
HostDownEvent) {
+       if l.hostStateChangeListener != nil && l.sessionInitialized {
+               l.hostStateChangeListener.OnHostDown(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnNewHost(event 
NewHostEvent) {
+       if l.topologyChangeListener != nil && l.sessionInitialized {
+               l.topologyChangeListener.OnNewHost(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnRemovedHost(event 
RemovedHostEvent) {
+       if l.topologyChangeListener != nil && l.sessionInitialized {
+               l.topologyChangeListener.OnRemovedHost(event)
+       }
+}
+
+func (l *internalHostStateAndTopologyChangeListener) OnSessionReady() {

Review Comment:
   Hmm I think for memory visibility concerns you'd have to at least use 
`atomic` no? There's goroutines being spawned during the init process that will 
possibly trigger these host events if I'm not mistaken so goroutines being 
spawned before the flag is set means there's no "happens-before" relationship 
that would guarantee the synchronization



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to