robocanic commented on code in PR #1367: URL: https://github.com/apache/dubbo-admin/pull/1367#discussion_r2616247241
########## pkg/core/discovery/subscriber/nacos_service.go: ########## @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package subscriber + +import ( + "encoding/json" + "fmt" + "reflect" + "regexp" + "strconv" + "strings" + "time" + + "github.com/duke-git/lancet/v2/convertor" + set "github.com/duke-git/lancet/v2/datastructure/set" + "github.com/duke-git/lancet/v2/maputil" + "github.com/duke-git/lancet/v2/slice" + "k8s.io/client-go/tools/cache" + + meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1" + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/store" + "github.com/apache/dubbo-admin/pkg/core/store/index" +) + +type NacosServiceEventSubscriber struct { + emitter events.Emitter + storeRouter store.Router +} + +func NewNacosServiceEventSubscriber(eventEmitter events.Emitter, storeRouter store.Router) *NacosServiceEventSubscriber { + return &NacosServiceEventSubscriber{ + emitter: eventEmitter, + storeRouter: storeRouter, + } +} + +func (n *NacosServiceEventSubscriber) ResourceKind() coremodel.ResourceKind { + return meshresource.NacosServiceKind +} + +func (n *NacosServiceEventSubscriber) Name() string { + return "Nacos2Discovery-" + n.ResourceKind().ToString() +} + +func (n *NacosServiceEventSubscriber) ProcessEvent(event events.Event) error { + newObj, ok := event.NewObj().(*meshresource.NacosServiceResource) + if !ok && event.NewObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(newObj), event.NewObj()) + } + oldObj, ok := event.OldObj().(*meshresource.NacosServiceResource) + if !ok && event.OldObj() != nil { + return bizerror.NewAssertionError(reflect.TypeOf(oldObj), event.OldObj()) + } + var processErr error + switch event.Type() { + case cache.Added, cache.Updated, cache.Replaced, cache.Sync: + if newObj == nil { + errStr := "process nacos service upsert event, but new obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + processErr = n.processUpsert(newObj) + case cache.Deleted: + if oldObj == nil { + errStr := "process nacos service delete event, but old obj is nil, skipped processing" + logger.Errorf(errStr) + return bizerror.New(bizerror.EventError, errStr) + } + processErr = n.processDelete(oldObj) + } + if processErr != nil { + logger.Errorf("process nacos service event failed, cause: %s, event: %s", processErr.Error(), event.String()) + return processErr + } + logger.Infof("process nacos service event successfully, event: %s", event.String()) + return nil +} + +func (n *NacosServiceEventSubscriber) processUpsert(serviceRes *meshresource.NacosServiceResource) error { + providerRe := regexp.MustCompile(`^providers:[\w.]+(?::[\w.]*:|::[\w.]*)?$`) + consumerRe := regexp.MustCompile(`^consumers:[\w.]+(?::[\w.]*:|::[\w.]*)?$`) + if providerRe.MatchString(serviceRes.Name) { + logger.Infof("interface-level service ignored, service name: %s", serviceRes.Name) + return nil + } + if consumerRe.MatchString(serviceRes.Name) { + return n.processConsumerMetadataUpsert(serviceRes) + } + return n.processRPCInstanceUpsert(serviceRes) + +} + +func (n *NacosServiceEventSubscriber) processConsumerMetadataUpsert(serviceRes *meshresource.NacosServiceResource) error { + serviceName, err := parseServiceName(serviceRes.Name) + if err != nil { + return bizerror.Wrap(err, bizerror.UnknownError, "parse service name error, raw nacos service is"+serviceRes.String()) + } + convertFunc := func(i int, instance *meshproto.NacosInstance) maputil.Entry[string, *meshresource.ServiceConsumerMetadataResource] { + metadataJsonStr, err := convertor.ToJson(instance.Metadata) + if err != nil { + logger.Errorf("skipping convert service consumer %s:%d of %s to ServiceConsumerMetadata "+ + "because it cannot convert to valid json string", instance.Ip, instance.Port, serviceRes.Name) + return maputil.Entry[string, *meshresource.ServiceConsumerMetadataResource]{} + } + consumerAppName, exists := instance.Metadata[constants.Application] + if !exists { + logger.Errorf("service consumer %s:%d of %s is invalid because no application name found, raw metadata: %s", + instance.Ip, instance.Port, serviceRes.Name, metadataJsonStr) + return maputil.Entry[string, *meshresource.ServiceConsumerMetadataResource]{} + } + serviceName, exists := instance.Metadata[constants.InterfaceKey] + if !exists { + logger.Errorf("service consumer %s:%d of %s is invalid because no service name found, raw metadata: %s", + instance.Ip, instance.Port, serviceRes.Name, metadataJsonStr) + return maputil.Entry[string, *meshresource.ServiceConsumerMetadataResource]{} + } + version := instance.Metadata[constants.VersionKey] + group := instance.Metadata[constants.GroupKey] + resKey := consumerAppName + ":" + serviceRes.Name + res := meshresource.NewServiceConsumerMetadataResourceWithAttributes(resKey, serviceRes.Mesh) + res.Spec = &meshproto.ServiceConsumerMetadata{ + ServiceName: serviceName, + ConsumerAppName: consumerAppName, + Version: version, + Group: group, + Metadata: instance.Metadata, + } + return maputil.Entry[string, *meshresource.ServiceConsumerMetadataResource]{ + Key: res.ResourceKey(), + Value: res, + } + } + newConsumers := maputil.FromEntries(slice.Map(serviceRes.Spec.Instances, convertFunc)) + st, err := n.storeRouter.ResourceKindRoute(meshresource.ServiceConsumerMetadataKind) + if err != nil { + logger.Errorf("process service consumer metadata upsert event, but cannot route to service consumer metadata resource, cause: %v", err) + return err + } + resources, err := st.ListByIndexes(map[string]string{ + index.ByServiceConsumerServiceName: serviceName, + }) + if err != nil { + logger.Errorf("process service consumer metadata upsert event, but cannot list service consumer metadata resource of %s, cause: %v", serviceRes.Name, err) + return err + } + oldConsumers := make(map[string]*meshresource.ServiceConsumerMetadataResource) + for _, res := range resources { + oldConsumer, ok := res.(*meshresource.ServiceConsumerMetadataResource) + if !ok { + return bizerror.NewAssertionError(meshresource.ServiceConsumerMetadataKind, reflect.TypeOf(res).Name()) + } + oldConsumers[oldConsumer.ResourceKey()] = oldConsumer + } + // Find offline consumers and delete them + offlineConsumers := maputil.Minus(oldConsumers, newConsumers) + slice.ForEach(maputil.Values(offlineConsumers), func(_ int, item *meshresource.ServiceConsumerMetadataResource) { + err := st.Delete(item) + if err != nil { + logger.Errorf("delete service consumer metadata %s failed, cause: %v", item.ResourceKey(), err) + return + } + n.emitter.Send(events.NewResourceChangedEvent(cache.Deleted, item, nil)) + }) + // Find consumers need to add and add them + addConsumers := maputil.Minus(newConsumers, oldConsumers) + slice.ForEach(maputil.Values(addConsumers), func(_ int, item *meshresource.ServiceConsumerMetadataResource) { + err := st.Add(item) + if err != nil { + logger.Errorf("add service consumer metadata %s failed, cause: %v", item.ResourceKey(), err) + return + } + n.emitter.Send(events.NewResourceChangedEvent(cache.Added, nil, item)) + }) + // Find consumers need to update and update them + updateConsumers := maputil.Intersect(newConsumers, oldConsumers) + slice.ForEach(maputil.Values(updateConsumers), func(_ int, item *meshresource.ServiceConsumerMetadataResource) { + err := st.Update(item) + if err != nil { + logger.Errorf("update service consumer metadata %s failed, cause: %v", item.ResourceKey(), err) + return + } + n.emitter.Send(events.NewResourceChangedEvent(cache.Updated, item, item)) + }) + + return nil +} + +func parseServiceName(s string) (string, error) { + const prefix = "consumers:" + if !strings.HasPrefix(s, prefix) { + return "", fmt.Errorf("invalid prefix") + } + + parts := strings.SplitN(s[len(prefix):], ":", 3) + + if len(parts) < 1 { + return "", fmt.Errorf("invalid format") + } + + return parts[0], nil +} + +func (n *NacosServiceEventSubscriber) processRPCInstanceUpsert(serviceRes *meshresource.NacosServiceResource) error { + convertFunc := func(i int, instance *meshproto.NacosInstance) maputil.Entry[string, *meshresource.RPCInstanceResource] { + resName := meshresource.BuildInstanceResName(serviceRes.Name, instance.Ip, instance.Port) + res := meshresource.NewRPCInstanceResourceWithAttributes(resName, serviceRes.Mesh) + var registerTime string + timestamp, err := strconv.ParseInt(instance.Metadata[constants.TimestampKey], 10, 64) + if err != nil { + registerTime = "" Review Comment: fixed ########## pkg/common/constants/constants.go: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package constants + +const ( + DubboPropertyKey = "dubbo.properties" + RegistryAddressKey = "dubbo.registry.address" + MetadataReportAddressKey = "dubbo.metadata-report.address" +) + +const ( + RegistryKey = "registry" + RegistryClusterKey = "REGISTRY_CLUSTER" + RegisterModeKey = "register-mode" + RegistryClusterTypeKey = "registry-cluster-type" + RemoteClientNameKey = "remote-client-name" + DefaultRegisterModeInterface = "interface" + DefaultRegisterModeInstance = "instance" + DefaultRegisterModeAll = "all" + MetadataStorageTypeKey = "dubbo.metadata.storage-type" + TimestampKey = "timestamp" + EndpointsKey = "dubbo.endpoints" + URLParamsKey = "dubbo.metadata-service.url-params" + MetadataRevisionKey = "dubbo.metadata.revision" + AnyValue = "*" + AnyHostValue = "0.0.0.0" + InterfaceKey = "interface" + GroupKey = "group" + VersionKey = "version" + ClassifierKey = "classifier" + CategoryKey = "category" + ProvidersCategory = "providers" + ConsumersCategory = "consumers" + RoutersCategory = "routers" + ConfiguratorsCategory = "configurators" + ConfiguratorRuleSuffix = ".configurators" + EnabledKey = "enabled" + CheckKey = "check" + AdminProtocol = "admin" + Side = "side" + ConsumerSide = "consumer" + ProviderSide = "provider" + ConsumerProtocol = "consumer" + EmptyProtocol = "empty" + OverrideProtocol = "override" + DefaultGroup = "dubbo" + DynamicKey = "dynamic" + SerializationKey = "serialization" + TimeoutKey = "timeout" + DefaultTimeout = 1000 + WeightKey = "weight" + BalancingKey = "balancing" + DefaultWeight = 100 + OwnerKey = "owner" + + ConfigFileEnvKey = "conf" // config file path + RegistryAll = "ALL" + RegistryInterface = "INTERFACE" + RegistryInstance = "INSTANCE" + RegistryType = "TYPE" + NamespaceKey = "namespace" +) + +const ( + Application = "application" + Instance = "instance" + Service = "service" + + StatefulSet = "StatefulSet" + Deployment = "Deployment" +) + +const ( + DubboVersionKey = "dubbo" + WorkLoadKey = "workLoad" + ReleaseKey = "release" +) + +const ( + Stateful = "ζηΆζ" Review Comment: in next prs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
