Similarityoung commented on code in PR #746: URL: https://github.com/apache/dubbo-go-pixiu/pull/746#discussion_r2464804697
########## pkg/adapter/llmregistry/registry/nacos/listener.go: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "encoding/json" + "reflect" + "strconv" + "strings" + "sync" + "time" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/remoting" + + "github.com/creasty/defaults" + + "github.com/hashicorp/go-uuid" + + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + nacosModel "github.com/nacos-group/nacos-sdk-go/model" + "github.com/nacos-group/nacos-sdk-go/vo" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common" + "github.com/apache/dubbo-go-pixiu/pkg/common/constant" + "github.com/apache/dubbo-go-pixiu/pkg/logger" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + +const ( + // ServicePollingInterval How often to poll for the list of available services (to discover new ones). + ServicePollingInterval = 30 * time.Second +) + +// listener monitors Nacos for service changes. +type listener struct { + client naming_client.INamingClient + // Caches the last known instances for a given service. Key: ServiceName, Value: *sync.Map + instanceCache sync.Map + // Caches the set of services we are currently subscribed to. Key: ServiceName, Value: bool + subscribedServices sync.Map + regConf *model.Registry + adapterListener common.RegistryEventListener // The callback to notify the gateway core. + + exit chan struct{} + wg sync.WaitGroup +} + +// newNacosListener creates a new Nacos service listener. +func newNacosListener(client naming_client.INamingClient, regConf *model.Registry, adapterListener common.RegistryEventListener) *listener { + return &listener{ + client: client, + exit: make(chan struct{}), + regConf: regConf, + adapterListener: adapterListener, + } +} + +// WatchAndHandle starts the background goroutine to watch for service changes. +func (l *listener) WatchAndHandle() { + l.wg.Add(1) + go l.watchForServices() +} + +// watchForServices periodically polls Nacos to discover new services to subscribe to. +// The actual instance updates for subscribed services are push-based via the callback. +func (l *listener) watchForServices() { + defer l.wg.Done() + + ticker := time.NewTicker(ServicePollingInterval) + defer ticker.Stop() + + // Perform an initial check immediately. + l.discoverAndSubscribe() + + for { + select { + case <-l.exit: + logger.Info("Nacos listener is stopping...") + l.unsubscribeAll() + return + case <-ticker.C: + l.discoverAndSubscribe() + } + } +} + +func (l *listener) discoverAndSubscribe() { + serviceList, err := l.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{ + GroupName: l.regConf.Group, + NameSpace: l.regConf.Namespace, + }) + if err != nil { + logger.Warnf("Failed to get service list from Nacos: %v", err) + return + } + + currentServices := make(map[string]struct{}) + for _, serviceName := range serviceList.Doms { + currentServices[serviceName] = struct{}{} + // If we aren't already subscribed to this service, subscribe now. + if _, loaded := l.subscribedServices.LoadOrStore(serviceName, true); !loaded { + err := l.client.Subscribe(&vo.SubscribeParam{ + ServiceName: serviceName, + GroupName: l.regConf.Group, + SubscribeCallback: l.serviceCallback, + }) + if err != nil { + logger.Errorf("Failed to subscribe to Nacos service %s: %v", serviceName, err) + l.subscribedServices.Delete(serviceName) // Remove from map to retry next time. + } else { + logger.Infof("Successfully subscribed to Nacos service: %s", serviceName) + } + } + } + + // Unsubscribe from services that no longer exist. + l.subscribedServices.Range(func(key, value any) bool { + serviceName := key.(string) + if _, exists := currentServices[serviceName]; !exists { + err := l.client.Unsubscribe(&vo.SubscribeParam{ + ServiceName: serviceName, + GroupName: l.regConf.Group, + }) + if err != nil { + logger.Errorf("Failed to unsubscribe from Nacos service %s: %v", serviceName, err) + } else { + logger.Infof("Successfully unsubscribed from Nacos service: %s", serviceName) + l.subscribedServices.Delete(serviceName) + l.instanceCache.Delete(serviceName) + } + } + return true + }) +} + +// serviceCallback is the function that Nacos SDK invokes when there's a change +// in the instances of a subscribed service. This is the injected callback. +func (l *listener) serviceCallback(services []nacosModel.SubscribeService, err error) { + if err != nil { + logger.Errorf("Nacos subscribe callback received an error: %v", err) + return + } + if len(services) == 0 { + logger.Warn("Nacos callback received an empty list of services, which might indicate all instances are offline.") + // The logic to handle removal of a service with zero instances + // is handled by the polling `discoverAndSubscribe` loop. + return + } + + serviceName := services[0].ServiceName + logger.Debugf("Received callback for service: %s with %d instances", serviceName, len(services)) + + oldCache, _ := l.instanceCache.LoadOrStore(serviceName, &sync.Map{}) + oldInstanceMap := oldCache.(*sync.Map) + + newInstanceMap := &sync.Map{} + newEndpoints := make(map[string]*model.Endpoint) + + // Process the new list from Nacos. + for i := range services { + // Also check for health + if !services[i].Enable || !services[i].Healthy { + continue + } + instance := generateInstance(services[i]) + endpoint := generateEndpoint(instance) + if endpoint == nil { + continue + } + key := serviceName + constant.At + endpoint.ID + newInstanceMap.Store(key, instance) + newEndpoints[key] = endpoint + } Review Comment: 也行,那就维持原样吧 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
