Copilot commented on code in PR #1371: URL: https://github.com/apache/dubbo-admin/pull/1371#discussion_r2636723650
########## pkg/discovery/zk/listerwatcher/listerwatcher.go: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package listerwatcher + +import ( + "fmt" + + "github.com/dubbogo/go-zookeeper/zk" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery" + "github.com/apache/dubbo-admin/pkg/core/logger" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/discovery/zk/zkwatcher" +) + +type ToUpsertResourceFunc func(mesh, nodePath, nodeData string) coremodel.Resource + +type ToDeleteResourceFunc func(mesh, nodePath string) coremodel.Resource + +type ListerWatcher[T coremodel.Resource] struct { + basePath string + rk coremodel.ResourceKind + conn *zk.Conn + cfg *discoverycfg.Config + toUpsertResourceFunc ToUpsertResourceFunc + toDeleteResourceFunc ToDeleteResourceFunc + newResourceFunc coremodel.NewResourceFunc + newResListFunc coremodel.NewResourceListFunc + watcher *zkwatcher.RecursiveWatcher + resultChan chan watch.Event + stopChan chan struct{} +} + +func NewListerWatcher( + rk coremodel.ResourceKind, + toResourceFunc ToUpsertResourceFunc, + toDeleteResourceFunc ToDeleteResourceFunc, + basePath string, + conn *zk.Conn, + cfg *discoverycfg.Config) (*ListerWatcher[coremodel.Resource], error) { + newResourceFunc, err := coremodel.ResourceSchemaRegistry().NewResourceFunc(rk) + if err != nil { + return nil, err + } + newResListFunc, err := coremodel.ResourceSchemaRegistry().NewResourceListFunc(rk) + if err != nil { + return nil, err + } + return &ListerWatcher[coremodel.Resource]{ + rk: rk, + toUpsertResourceFunc: toResourceFunc, + toDeleteResourceFunc: toDeleteResourceFunc, + basePath: basePath, + conn: conn, + cfg: cfg, + newResourceFunc: newResourceFunc, + newResListFunc: newResListFunc, + resultChan: make(chan watch.Event, 1000), + stopChan: make(chan struct{}), + }, nil +} + +func (lw *ListerWatcher[T]) List(_ metav1.ListOptions) (k8sruntime.Object, error) { + resList, err := lw.listRecursively(lw.basePath) + if err != nil { + logger.Errorf("list all %s under path %s in zk %s failed, cause: %v", lw.rk, lw.basePath, lw.zkAddr(), err) + return nil, err + } + resListObj := lw.newResListFunc() + resListObj.SetItems(resList) + return resListObj, nil +} + +func (lw *ListerWatcher[T]) listRecursively(path string) ([]coremodel.Resource, error) { + resList := make([]coremodel.Resource, 0) + nodeData, _, err := lw.conn.Get(path) + if err != nil { + errStr := fmt.Sprintf("get %s from zk failed, path: %s, addr: %s, cause: %v", lw.rk, path, lw.zkAddr(), err) + logger.Errorf(errStr) + return nil, bizerror.Wrap(err, bizerror.ZKError, errStr) + } + res := lw.toUpsertResourceFunc(lw.mesh(), path, string(nodeData)) + if res != nil { + resList = append(resList, res) + } + children, _, err := lw.conn.Children(path) + if err != nil { + errStr := fmt.Sprintf("list %s from zk failed, path: %s, addr: %s, cause: %v", lw.rk, path, lw.zkAddr(), err) + logger.Errorf(errStr) + return nil, bizerror.Wrap(err, bizerror.ZKError, errStr) + } + if len(children) == 0 { + return resList, nil + } + for _, childPath := range children { + resources, err := lw.listRecursively(path + constants.PathSeparator + childPath) + if err != nil { + return nil, err + } + resList = append(resList, resources...) + } + return resList, nil +} + +func (lw *ListerWatcher[T]) Watch(_ metav1.ListOptions) (watch.Interface, error) { + watcher := zkwatcher.NewRecursiveWatcher(lw.conn, lw.basePath) + go func() { + for { + select { + case event, ok := <-watcher.EventChan(): + if !ok { + logger.Warnf("zookeeper watcher stopped, path: %s, addr: %s", lw.basePath, lw.zkAddr()) + return + } + lw.handleEvent(event) + case <-lw.stopChan: + logger.Warnf("stop watching %s in %s,", lw.basePath, lw.zkAddr()) + return + } + + } + }() + err := watcher.StartAsync() + lw.watcher = watcher + if err != nil { + lw.Stop() + return nil, err + } + return lw, nil +} + +func (lw *ListerWatcher[T]) handleEvent(event zkwatcher.ZookeeperEvent) { + switch event.Type { + case zkwatcher.NodeCreated: + res := lw.toUpsertResourceFunc(lw.mesh(), event.Path, event.Data) + if res == nil { + logger.Warnf("skip creating resource, parse zookeeper node data to %s failed, path: %s, zk: %s, event: %v", + lw.rk, event.Path, lw.zkAddr(), event) + return + } + logger.Infof("%s added, rsKey: %s", lw.rk, res.ResourceKey()) + lw.resultChan <- watch.Event{ + Type: watch.Added, + Object: res, + } + case zkwatcher.NodeChanged: + res := lw.toUpsertResourceFunc(lw.mesh(), event.Path, event.Data) + if res == nil { + logger.Warnf("skip updating resource, parse zookeeper node data to %s failed, path: %s, zk: %s, event: %v", + lw.rk, event.Path, lw.zkAddr(), event) + return + } + logger.Infof("%s modified, rsKey: %s", lw.rk, res.ResourceKey()) + lw.resultChan <- watch.Event{ + Type: watch.Modified, + Object: res, + } + case zkwatcher.NodeDeleted: + res := lw.toDeleteResourceFunc(lw.mesh(), event.Path) + if res == nil { + logger.Warnf("skip deleting resource, parse zookeeper node data to %s failed, path: %s, zk: %s, event: %v", + lw.rk, event.Path, lw.zkAddr(), event) + return + } + logger.Infof("%s deleted, rsKey: %s", lw.rk, res.ResourceKey()) + lw.resultChan <- watch.Event{ + Type: watch.Deleted, + Object: res, + } + default: + logger.Warnf("unknown event type, event: %v", event) + } +} + +func (lw *ListerWatcher[T]) zkAddr() string { + return lw.cfg.Address.Registry +} + +func (lw *ListerWatcher[T]) mesh() string { + return lw.cfg.ID +} + +func (lw *ListerWatcher[T]) ResourceKind() coremodel.ResourceKind { + return lw.rk +} + +func (lw *ListerWatcher[T]) TransformFunc() cache.TransformFunc { + return nil +} + +func (lw *ListerWatcher[T]) Stop() { + if lw.watcher != nil { + lw.watcher.Stop() + } + close(lw.stopChan) +} Review Comment: The channel `resultChan` is not being closed in the `Stop` method. When `Stop` is called, it closes `stopChan` and calls `watcher.Stop()`, which closes `eventChan`, but `resultChan` remains open. This could lead to goroutine leaks if code is waiting on `resultChan` after the watcher has stopped. ########## pkg/core/resource/apis/mesh/v1alpha1/zkconfig_types.go: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Generated by tools/resourcegen +// Run "make generate" to update this file. + +// nolint:whitespace +package v1alpha1 + +import ( + "encoding/json" + + "google.golang.org/protobuf/proto" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + + meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1" + "github.com/apache/dubbo-admin/pkg/core/logger" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" +) + +const ZKConfigKind coremodel.ResourceKind = "ZKConfig" + +func init() { + coremodel.RegisterResourceSchema(ZKConfigKind, NewZKConfigResource, NewZKConfigResourceList) +} + +type ZKConfigResource struct { + metav1.TypeMeta `json:",inline"` + + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Mesh is the name of the dubbo mesh this resource belongs to. + // It may be omitted for cluster-scoped resources. + Mesh string `json:"mesh,omitempty"` + + // Spec is the specification of the Dubbo ZKConfig resource. + Spec *meshproto.ZKConfig `json:"spec,omitempty"` + + // Status is the status of the Dubbo ZKConfig resource. + Status ZKConfigResourceStatus `json:"status,omitempty"` +} + +type ZKConfigResourceStatus struct { + // define resource-specific status here +} + +func (r *ZKConfigResource) ResourceKind() coremodel.ResourceKind { + return ZKConfigKind +} + +func (r *ZKConfigResource) MeshName() string { + return r.Mesh +} + +func (r *ZKConfigResource) ResourceKey() string { + return coremodel.BuildResourceKey(r.Mesh, r.Name) +} + +func (r *ZKConfigResource) ResourceMeta() metav1.ObjectMeta { + return r.ObjectMeta +} + +func (r *ZKConfigResource) ResourceSpec() coremodel.ResourceSpec { + return r.Spec +} + +func (r *ZKConfigResource) DeepCopyObject() k8sruntime.Object { + out := &ZKConfigResource{ + TypeMeta: r.TypeMeta, + Mesh: r.Mesh, + Status: r.Status, + } + + r.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + + if r.Spec != nil { + spec, ok := proto.Clone(r.Spec).(*meshproto.ZKConfig) + if !ok { + logger.Warnf("failed to clone spec %v, spec is not conformed to %s", r.Spec, r.ResourceKind()) + return out + } + out.Spec = spec + } + + return out +} + +func (r *ZKConfigResource) String() string { + jsonStr, err := json.Marshal(r) + if err != nil { + logger.Errorf("failed to encode ZKConfigResource: %s to json, err: %w", r.ResourceKey(), err) Review Comment: The error format verb `%w` is used but `err` is not being wrapped. The function uses `fmt.Sprintf` which doesn't support `%w`. Should use `%v` instead, or wrap the error using `fmt.Errorf` if error wrapping is intended. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + LeafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// StartAsync begin watching +func (rw *RecursiveWatcher) StartAsync() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + logger.Infof("Stop watching path: %s", rw.basePath) + rw.conn.Close() + close(rw.stopChan) + close(rw.eventChan) +} Review Comment: The connection closing should be handled more carefully. Calling `rw.conn.Close()` may affect other watchers using the same connection. Consider whether this connection is shared across multiple watchers, and if so, connection lifecycle should be managed at a higher level rather than in the Stop method. ########## pkg/console/model/mesh.go: ########## @@ -18,6 +18,7 @@ package model Review Comment: The ID field is added to the mesh response, but it's unclear from the code whether ID and Name are expected to be different values or if they serve different purposes. Consider adding documentation or a comment explaining the relationship between these two fields to avoid confusion. ```suggestion // MeshResp represents a mesh as returned by the API. // ID is the stable, unique identifier of the mesh, while Name is a human-readable // display name. ID and Name may differ and serve different purposes. ``` ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + LeafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// StartAsync begin watching +func (rw *RecursiveWatcher) StartAsync() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + logger.Infof("Stop watching path: %s", rw.basePath) + rw.conn.Close() + close(rw.stopChan) + close(rw.eventChan) +} + +// EventChan return event channel +func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent { + return rw.eventChan +} + +// watchPathRecursively watch path recursively +func (rw *RecursiveWatcher) watchPathRecursively(path string) error { + // Watch data changes for the current path + go rw.watchDataChanges(path) + + // Get children of current path and watch + children, _, err := rw.conn.Children(path) + if err != nil { + logger.Errorf("Failed to get children of path: %s, cause: %v", path, err) + return err + } + // Watch children list changes + go rw.watchChildrenChanges(path) + + // Recursively watch child nodes + for _, child := range children { + childPath := path + "/" + child + if err := rw.watchPathRecursively(childPath); err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + } + return nil +} + +// watchDataChanges watch node data changes +func (rw *RecursiveWatcher) watchDataChanges(path string) { + logger.Debugf("Watching data changes for path: %s", path) + for { + + select { + case <-rw.stopChan: + return + default: + } + + // Get data and set up watch + data, stat, watcher, err := rw.conn.GetW(path) + if err != nil { + logger.Errorf("Failed to watch data changes for path %s, cause: %v", path, err) + time.Sleep(time.Second) + continue + } + rw.eventChan <- ZookeeperEvent{ + Type: NodeCreated, + Path: path, + Data: string(data), + LeafNode: stat.NumChildren == 0, + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeDataChanged { + logger.Debugf("node data changed: %s", path) + + // Re-watch data changes + go rw.watchDataChanges(path) + return + } else if event.Type == zk.EventNodeDeleted { + // Node deleted, stop watching + logger.Debugf("node deleted: %s", path) + rw.eventChan <- ZookeeperEvent{ + Type: NodeDeleted, + Path: path, + LeafNode: stat.NumChildren == 0, + } + return + } + case <-rw.stopChan: + return + } + + // If node exists but no changes, continue watching + if stat != nil { + continue + } + } +} + +// watchChildrenChanges watch child node changes +func (rw *RecursiveWatcher) watchChildrenChanges(path string) { + logger.Debugf("Watching child node changes for path: %s", path) + for { + // Check if stopped + select { + case <-rw.stopChan: + return + default: + } + + // Get child nodes and set up watch + children, stat, watcher, err := rw.conn.ChildrenW(path) + if err != nil { + logger.Errorf("Failed to watch child node changes for path %s: %v", path, err) + time.Sleep(time.Second) + continue + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeChildrenChanged { + logger.Debugf("Child node list changed: %s", path) + + // Get current child nodes + newChildren, _, err := rw.conn.Children(path) + if err != nil { + logger.Debugf("Failed to get new child nodes: %v", err) + continue + } + // Calculate added and deleted child nodes + newChildSet := set.FromSlice(newChildren) + oldChildSet := set.FromSlice(children) + addedChildren := newChildSet.Minus(oldChildSet) + deletedChildren := oldChildSet.Minus(newChildSet) + + // watch newly added child nodes + for _, c := range addedChildren.ToSlice() { + childPath := path + "/" + c + logger.Debugf("New child node added: %s", childPath) + // Recursively watch new node + go func() { + err := rw.watchPathRecursively(childPath) + if err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + }() Review Comment: The goroutine spawned here captures the loop variable `childPath` in a closure without proper handling. While this appears to be done correctly (the variable is passed as a parameter or captured at the right time), the error handling logs `childPath` which may have changed by the time the goroutine executes. Consider using the parameter directly in the error message instead of capturing from the outer scope. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + LeafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// StartAsync begin watching +func (rw *RecursiveWatcher) StartAsync() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + logger.Infof("Stop watching path: %s", rw.basePath) + rw.conn.Close() + close(rw.stopChan) + close(rw.eventChan) +} + +// EventChan return event channel +func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent { + return rw.eventChan +} + +// watchPathRecursively watch path recursively +func (rw *RecursiveWatcher) watchPathRecursively(path string) error { + // Watch data changes for the current path + go rw.watchDataChanges(path) + + // Get children of current path and watch + children, _, err := rw.conn.Children(path) + if err != nil { + logger.Errorf("Failed to get children of path: %s, cause: %v", path, err) + return err + } + // Watch children list changes + go rw.watchChildrenChanges(path) + + // Recursively watch child nodes + for _, child := range children { + childPath := path + "/" + child + if err := rw.watchPathRecursively(childPath); err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + } + return nil +} + +// watchDataChanges watch node data changes +func (rw *RecursiveWatcher) watchDataChanges(path string) { + logger.Debugf("Watching data changes for path: %s", path) + for { + + select { + case <-rw.stopChan: + return + default: + } + + // Get data and set up watch + data, stat, watcher, err := rw.conn.GetW(path) + if err != nil { + logger.Errorf("Failed to watch data changes for path %s, cause: %v", path, err) + time.Sleep(time.Second) + continue + } + rw.eventChan <- ZookeeperEvent{ + Type: NodeCreated, + Path: path, + Data: string(data), + LeafNode: stat.NumChildren == 0, + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeDataChanged { + logger.Debugf("node data changed: %s", path) + + // Re-watch data changes + go rw.watchDataChanges(path) + return + } else if event.Type == zk.EventNodeDeleted { + // Node deleted, stop watching + logger.Debugf("node deleted: %s", path) + rw.eventChan <- ZookeeperEvent{ + Type: NodeDeleted, + Path: path, + LeafNode: stat.NumChildren == 0, + } + return + } + case <-rw.stopChan: + return + } + + // If node exists but no changes, continue watching + if stat != nil { + continue + } + } +} + +// watchChildrenChanges watch child node changes +func (rw *RecursiveWatcher) watchChildrenChanges(path string) { + logger.Debugf("Watching child node changes for path: %s", path) + for { + // Check if stopped + select { + case <-rw.stopChan: + return + default: + } + + // Get child nodes and set up watch + children, stat, watcher, err := rw.conn.ChildrenW(path) + if err != nil { + logger.Errorf("Failed to watch child node changes for path %s: %v", path, err) + time.Sleep(time.Second) + continue + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeChildrenChanged { + logger.Debugf("Child node list changed: %s", path) + + // Get current child nodes + newChildren, _, err := rw.conn.Children(path) + if err != nil { + logger.Debugf("Failed to get new child nodes: %v", err) + continue + } + // Calculate added and deleted child nodes + newChildSet := set.FromSlice(newChildren) + oldChildSet := set.FromSlice(children) + addedChildren := newChildSet.Minus(oldChildSet) + deletedChildren := oldChildSet.Minus(newChildSet) + + // watch newly added child nodes + for _, c := range addedChildren.ToSlice() { Review Comment: The variable name `c` in the loop iteration is not descriptive. Consider using a more meaningful name like `childName` or `child` to improve code readability. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + LeafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// StartAsync begin watching +func (rw *RecursiveWatcher) StartAsync() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + logger.Infof("Stop watching path: %s", rw.basePath) + rw.conn.Close() + close(rw.stopChan) + close(rw.eventChan) +} + +// EventChan return event channel +func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent { + return rw.eventChan +} + +// watchPathRecursively watch path recursively +func (rw *RecursiveWatcher) watchPathRecursively(path string) error { + // Watch data changes for the current path + go rw.watchDataChanges(path) + + // Get children of current path and watch + children, _, err := rw.conn.Children(path) + if err != nil { + logger.Errorf("Failed to get children of path: %s, cause: %v", path, err) + return err + } + // Watch children list changes + go rw.watchChildrenChanges(path) + + // Recursively watch child nodes + for _, child := range children { + childPath := path + "/" + child + if err := rw.watchPathRecursively(childPath); err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + } + return nil +} + +// watchDataChanges watch node data changes +func (rw *RecursiveWatcher) watchDataChanges(path string) { + logger.Debugf("Watching data changes for path: %s", path) + for { + + select { + case <-rw.stopChan: + return + default: + } + + // Get data and set up watch + data, stat, watcher, err := rw.conn.GetW(path) + if err != nil { + logger.Errorf("Failed to watch data changes for path %s, cause: %v", path, err) + time.Sleep(time.Second) + continue + } + rw.eventChan <- ZookeeperEvent{ + Type: NodeCreated, + Path: path, + Data: string(data), + LeafNode: stat.NumChildren == 0, + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeDataChanged { + logger.Debugf("node data changed: %s", path) + + // Re-watch data changes + go rw.watchDataChanges(path) + return + } else if event.Type == zk.EventNodeDeleted { + // Node deleted, stop watching + logger.Debugf("node deleted: %s", path) + rw.eventChan <- ZookeeperEvent{ + Type: NodeDeleted, + Path: path, + LeafNode: stat.NumChildren == 0, + } Review Comment: The channel send on line 148 is not protected against the channel being closed. If `Stop()` is called from another goroutine, sending to a closed channel will cause a panic. Consider using a select statement with `stopChan` to check if stopping before sending. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + LeafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// StartAsync begin watching +func (rw *RecursiveWatcher) StartAsync() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + logger.Infof("Stop watching path: %s", rw.basePath) + rw.conn.Close() + close(rw.stopChan) + close(rw.eventChan) +} + +// EventChan return event channel +func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent { + return rw.eventChan +} + +// watchPathRecursively watch path recursively +func (rw *RecursiveWatcher) watchPathRecursively(path string) error { + // Watch data changes for the current path + go rw.watchDataChanges(path) + + // Get children of current path and watch + children, _, err := rw.conn.Children(path) + if err != nil { + logger.Errorf("Failed to get children of path: %s, cause: %v", path, err) + return err + } + // Watch children list changes + go rw.watchChildrenChanges(path) + + // Recursively watch child nodes + for _, child := range children { + childPath := path + "/" + child + if err := rw.watchPathRecursively(childPath); err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + } + return nil +} + +// watchDataChanges watch node data changes +func (rw *RecursiveWatcher) watchDataChanges(path string) { + logger.Debugf("Watching data changes for path: %s", path) + for { + + select { + case <-rw.stopChan: + return + default: + } + + // Get data and set up watch + data, stat, watcher, err := rw.conn.GetW(path) + if err != nil { + logger.Errorf("Failed to watch data changes for path %s, cause: %v", path, err) + time.Sleep(time.Second) + continue + } + rw.eventChan <- ZookeeperEvent{ + Type: NodeCreated, + Path: path, + Data: string(data), + LeafNode: stat.NumChildren == 0, + } Review Comment: The channel send on line 130 is not protected against the channel being closed. If `Stop()` is called from another goroutine while this send is executing, it could cause a panic when sending to a closed channel. Consider using a select statement with `stopChan` to check if stopping before sending. ########## pkg/discovery/zk/factory.go: ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zk + +import ( + "encoding/json" + "net/url" + "strconv" + "strings" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + "github.com/duke-git/lancet/v2/strutil" + + meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1" + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + discoverycfg "github.com/apache/dubbo-admin/pkg/config/discovery" + "github.com/apache/dubbo-admin/pkg/core/controller" + "github.com/apache/dubbo-admin/pkg/core/discovery" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/discovery/zk/listerwatcher" +) + +func init() { + discovery.RegisterListWatcherFactory(&Factory{}) +} + +type Factory struct { +} + +func (f *Factory) Support(typ discoverycfg.Type) bool { + return discoverycfg.Zookeeper == typ +} + +func (f *Factory) NewListWatchers(config *discoverycfg.Config) ([]controller.ResourceListerWatcher, error) { + address := config.Address.Registry + zkUrl, err := url.Parse(address) + if err != nil { + return nil, err + } + conn, _, err := zk.Connect([]string{zkUrl.Host}, time.Second*1, func(c *zk.Conn) { + c.SetLogger(&zkLogger{}) + }) + if err != nil { + logger.Fatalf("connect to %s failed", address) + return nil, bizerror.Wrap(err, bizerror.ZKError, "connect to zookeeper failed, addr: "+address) + } + mappingLw, err := listerwatcher.NewListerWatcher( + meshresource.ServiceProviderMappingKind, + toUpsertMappingResource, + toDeleteMappingResource, + "/dubbo/mapping", + conn, + config, + ) + if err != nil { + return nil, err + } + rpcInstanceLW, err := listerwatcher.NewListerWatcher( + meshresource.RPCInstanceKind, + toUpsertRPCInstanceResource, + toDeleteRPCInstanceResource, + "/services", + conn, + config, + ) + if err != nil { + return nil, err + } + configLW, err := listerwatcher.NewListerWatcher( + meshresource.ZKConfigKind, + toUpsertZKConfigResource, + toDeleteZKConfigResource, + "/dubbo/config", + conn, + config, + ) + if err != nil { + return nil, err + } + metadataLW, err := listerwatcher.NewListerWatcher( + meshresource.ZKMetadataKind, + toUpsertZKMetadataResource, + toDeleteZKMetadataResource, + "/dubbo/metadata", + conn, + config, + ) + if err != nil { + return nil, err + } + + return []controller.ResourceListerWatcher{ + mappingLw, + rpcInstanceLW, + configLW, + metadataLW, + }, nil Review Comment: The same ZooKeeper connection is being shared across multiple ListerWatcher instances (mappingLw, rpcInstanceLW, configLW, metadataLW). When any of these watchers calls `Stop()`, it will close the shared connection via `rw.conn.Close()` in line 78 of watcher.go, affecting all other watchers. This will cause failures in other active watchers. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + LeafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// StartAsync begin watching +func (rw *RecursiveWatcher) StartAsync() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + logger.Infof("Stop watching path: %s", rw.basePath) + rw.conn.Close() + close(rw.stopChan) + close(rw.eventChan) Review Comment: Potential race condition: multiple goroutines spawned by `watchPathRecursively` are accessing and sending to `rw.eventChan` without synchronization. If `Stop()` is called while these goroutines are running, there's a race between closing the channel and goroutines attempting to send to it, which could cause a panic. ########## pkg/core/resource/apis/mesh/v1alpha1/zkmetadata_types.go: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Generated by tools/resourcegen +// Run "make generate" to update this file. + +// nolint:whitespace +package v1alpha1 + +import ( + "encoding/json" + + "google.golang.org/protobuf/proto" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + + meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1" + "github.com/apache/dubbo-admin/pkg/core/logger" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" +) + +const ZKMetadataKind coremodel.ResourceKind = "ZKMetadata" + +func init() { + coremodel.RegisterResourceSchema(ZKMetadataKind, NewZKMetadataResource, NewZKMetadataResourceList) +} + +type ZKMetadataResource struct { + metav1.TypeMeta `json:",inline"` + + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Mesh is the name of the dubbo mesh this resource belongs to. + // It may be omitted for cluster-scoped resources. + Mesh string `json:"mesh,omitempty"` + + // Spec is the specification of the Dubbo ZKMetadata resource. + Spec *meshproto.ZKMetadata `json:"spec,omitempty"` + + // Status is the status of the Dubbo ZKMetadata resource. + Status ZKMetadataResourceStatus `json:"status,omitempty"` +} + +type ZKMetadataResourceStatus struct { + // define resource-specific status here +} + +func (r *ZKMetadataResource) ResourceKind() coremodel.ResourceKind { + return ZKMetadataKind +} + +func (r *ZKMetadataResource) MeshName() string { + return r.Mesh +} + +func (r *ZKMetadataResource) ResourceKey() string { + return coremodel.BuildResourceKey(r.Mesh, r.Name) +} + +func (r *ZKMetadataResource) ResourceMeta() metav1.ObjectMeta { + return r.ObjectMeta +} + +func (r *ZKMetadataResource) ResourceSpec() coremodel.ResourceSpec { + return r.Spec +} + +func (r *ZKMetadataResource) DeepCopyObject() k8sruntime.Object { + out := &ZKMetadataResource{ + TypeMeta: r.TypeMeta, + Mesh: r.Mesh, + Status: r.Status, + } + + r.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + + if r.Spec != nil { + spec, ok := proto.Clone(r.Spec).(*meshproto.ZKMetadata) + if !ok { + logger.Warnf("failed to clone spec %v, spec is not conformed to %s", r.Spec, r.ResourceKind()) + return out + } + out.Spec = spec + } + + return out +} + +func (r *ZKMetadataResource) String() string { + jsonStr, err := json.Marshal(r) + if err != nil { + logger.Errorf("failed to encode ZKMetadataResource: %s to json, err: %w", r.ResourceKey(), err) Review Comment: The error format verb `%w` is used but `err` is not being wrapped. The function uses `fmt.Sprintf` which doesn't support `%w`. Should use `%v` instead, or wrap the error using `fmt.Errorf` if error wrapping is intended. ########## pkg/discovery/zk/zkwatcher/watcher.go: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zkwatcher + +import ( + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + set "github.com/duke-git/lancet/v2/datastructure/set" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +type EventType string + +const ( + NodeCreated EventType = "NodeCreated" + NodeDeleted EventType = "NodeDeleted" + NodeChanged EventType = "NodeChanged" +) + +// ZookeeperEvent represents a zookeeper event +type ZookeeperEvent struct { + Type EventType + // node path + Path string + // node data + Data string + // whether it is a leaf node + LeafNode bool +} + +// RecursiveWatcher recursive watcher +type RecursiveWatcher struct { + conn *zk.Conn + basePath string + eventChan chan ZookeeperEvent + stopChan chan struct{} + mu sync.Mutex +} + +// NewRecursiveWatcher create a new recursive watcher +func NewRecursiveWatcher(conn *zk.Conn, basePath string) *RecursiveWatcher { + return &RecursiveWatcher{ + conn: conn, + basePath: basePath, + eventChan: make(chan ZookeeperEvent, 1000), + stopChan: make(chan struct{}), + } +} + +// StartAsync begin watching +func (rw *RecursiveWatcher) StartAsync() error { + logger.Infof("Start watching path: %s", rw.basePath) + // Recursively watch the initial path + return rw.watchPathRecursively(rw.basePath) +} + +// Stop watching +func (rw *RecursiveWatcher) Stop() { + logger.Infof("Stop watching path: %s", rw.basePath) + rw.conn.Close() + close(rw.stopChan) + close(rw.eventChan) +} + +// EventChan return event channel +func (rw *RecursiveWatcher) EventChan() <-chan ZookeeperEvent { + return rw.eventChan +} + +// watchPathRecursively watch path recursively +func (rw *RecursiveWatcher) watchPathRecursively(path string) error { + // Watch data changes for the current path + go rw.watchDataChanges(path) + + // Get children of current path and watch + children, _, err := rw.conn.Children(path) + if err != nil { + logger.Errorf("Failed to get children of path: %s, cause: %v", path, err) + return err + } + // Watch children list changes + go rw.watchChildrenChanges(path) + + // Recursively watch child nodes + for _, child := range children { + childPath := path + "/" + child + if err := rw.watchPathRecursively(childPath); err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + } + return nil +} + +// watchDataChanges watch node data changes +func (rw *RecursiveWatcher) watchDataChanges(path string) { + logger.Debugf("Watching data changes for path: %s", path) + for { + + select { + case <-rw.stopChan: + return + default: + } + + // Get data and set up watch + data, stat, watcher, err := rw.conn.GetW(path) + if err != nil { + logger.Errorf("Failed to watch data changes for path %s, cause: %v", path, err) + time.Sleep(time.Second) + continue + } + rw.eventChan <- ZookeeperEvent{ + Type: NodeCreated, + Path: path, + Data: string(data), + LeafNode: stat.NumChildren == 0, + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeDataChanged { + logger.Debugf("node data changed: %s", path) + + // Re-watch data changes + go rw.watchDataChanges(path) + return + } else if event.Type == zk.EventNodeDeleted { + // Node deleted, stop watching + logger.Debugf("node deleted: %s", path) + rw.eventChan <- ZookeeperEvent{ + Type: NodeDeleted, + Path: path, + LeafNode: stat.NumChildren == 0, + } + return + } + case <-rw.stopChan: + return + } + + // If node exists but no changes, continue watching + if stat != nil { + continue + } + } +} + +// watchChildrenChanges watch child node changes +func (rw *RecursiveWatcher) watchChildrenChanges(path string) { + logger.Debugf("Watching child node changes for path: %s", path) + for { + // Check if stopped + select { + case <-rw.stopChan: + return + default: + } + + // Get child nodes and set up watch + children, stat, watcher, err := rw.conn.ChildrenW(path) + if err != nil { + logger.Errorf("Failed to watch child node changes for path %s: %v", path, err) + time.Sleep(time.Second) + continue + } + + select { + case event := <-watcher.EvtCh: + if event.Type == zk.EventNodeChildrenChanged { + logger.Debugf("Child node list changed: %s", path) + + // Get current child nodes + newChildren, _, err := rw.conn.Children(path) + if err != nil { + logger.Debugf("Failed to get new child nodes: %v", err) + continue + } + // Calculate added and deleted child nodes + newChildSet := set.FromSlice(newChildren) + oldChildSet := set.FromSlice(children) + addedChildren := newChildSet.Minus(oldChildSet) + deletedChildren := oldChildSet.Minus(newChildSet) + + // watch newly added child nodes + for _, c := range addedChildren.ToSlice() { + childPath := path + "/" + c + logger.Debugf("New child node added: %s", childPath) + // Recursively watch new node + go func() { + err := rw.watchPathRecursively(childPath) + if err != nil { + logger.Errorf("Failed to watch subpath %s: %v", childPath, err) + } + }() + } + + // Check for deleted child nodes + for _, c := range deletedChildren.ToSlice() { Review Comment: The variable name `c` in the loop iteration is not descriptive. Consider using a more meaningful name like `childName` or `child` to improve code readability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
