Copilot commented on code in PR #1340:
URL: https://github.com/apache/dubbo-admin/pull/1340#discussion_r2446485675


##########
pkg/engine/kubernetes/engine.go:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kubernetes
+
+import (
+       "fmt"
+       "reflect"
+
+       "github.com/duke-git/lancet/v2/slice"
+       "github.com/duke-git/lancet/v2/strutil"
+       v1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/fields"
+       k8sruntime "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/watch"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/client-go/tools/cache"
+
+       meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1"
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       enginecfg "github.com/apache/dubbo-admin/pkg/config/engine"
+       "github.com/apache/dubbo-admin/pkg/core/consts"
+       "github.com/apache/dubbo-admin/pkg/core/controller"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+)
+
+type PodListerWatcher struct {
+       cfg *enginecfg.Config
+       lw  cache.ListerWatcher
+}
+
+var _ controller.ResourceListerWatcher = &PodListerWatcher{}
+
+func NewPodListWatcher(clientset *kubernetes.Clientset, cfg *enginecfg.Config) 
(*PodListerWatcher, error) {
+       var selector fields.Selector
+       s := cfg.Properties.PodWatchSelector
+       if strutil.IsBlank(s) {
+               selector = fields.Everything()
+       }
+       selector, err := fields.ParseSelector(s)
+       if err != nil {
+               return nil, fmt.Errorf("parse selector %s failed: %v", s, err)
+       }

Review Comment:
   Variable `selector` is declared twice: first initialized on line 55 when `s` 
is blank, then reassigned on line 57. If `s` is blank, line 57 will overwrite 
the `fields.Everything()` value with the result of parsing an empty string, 
which may not be the intended behavior. Remove the assignment on line 55 or use 
a conditional to skip parsing when `s` is blank.
   ```suggestion
        } else {
                var err error
                selector, err = fields.ParseSelector(s)
                if err != nil {
                        return nil, fmt.Errorf("parse selector %s failed: %v", 
s, err)
                }
        }
   ```



##########
pkg/core/runtime/runtime.go:
##########
@@ -115,13 +115,18 @@ func (rt *runtime) Add(components ...Component) {
 func (rt *runtime) Start(stop <-chan struct{}) error {
        components := maputil.Values(rt.components)
        slice.SortBy(components, func(a, b Component) bool {
-               return a.Order() < b.Order()
+               return a.Order() > b.Order()
        })
        for _, com := range components {
-               err := com.Start(rt, stop)
-               if err != nil {
-                       return err
-               }
+               go func() {
+                       err := com.Start(rt, stop)
+                       if err != nil {
+                               panic("component " + com.Type() + " running 
failed with error: " + err.Error())
+                       }
+               }()

Review Comment:
   Starting each component in a separate goroutine without synchronization 
means errors won't be returned to the caller, and a panic in one component's 
goroutine won't be caught by recover in the parent. Consider using an error 
group (e.g., `errgroup.Group`) to properly propagate errors and coordinate 
shutdown.



##########
pkg/core/controller/informer.go:
##########
@@ -91,13 +102,11 @@ type informer struct {
        transform cache.TransformFunc
 }
 
-func NewInformerWithOptions(lw cache.ListerWatcher, emitter events.Emitter, 
store store.ResourceStore,
-       exampleObject runtime.Object, options Options) Informer {
+func NewInformerWithOptions(lw cache.ListerWatcher, emitter events.Emitter, 
store store.ResourceStore, options Options) Informer {
        return &informer{
                indexer:           store,
                listerWatcher:     lw,
                emitter:           emitter,

Review Comment:
   The `objectType` field is no longer set in the constructor (previously set 
on line 109 before removal), but it may still be referenced elsewhere in the 
`informer` struct. Verify that removing this initialization doesn't break 
functionality or remove the field from the struct definition.
   ```suggestion
   func NewInformerWithOptions(lw cache.ListerWatcher, emitter events.Emitter, 
store store.ResourceStore, objectType runtime.Object, options Options) Informer 
{
        return &informer{
                indexer:           store,
                listerWatcher:     lw,
                emitter:           emitter,
                objectType:        objectType,
   ```



##########
pkg/core/engine/subscriber/runtime_instance.go:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package subscriber
+
+import (
+       "errors"
+       "reflect"
+
+       "github.com/duke-git/lancet/v2/strutil"
+       "k8s.io/client-go/tools/cache"
+
+       "github.com/apache/dubbo-admin/pkg/common/bizerror"
+       "github.com/apache/dubbo-admin/pkg/core/events"
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+       meshresource 
"github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
+       coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
+       "github.com/apache/dubbo-admin/pkg/core/store"
+       "github.com/apache/dubbo-admin/pkg/core/store/index"
+)
+
+type RuntimeInstanceEventSubscriber struct {
+       instanceResourceStore store.ResourceStore
+       eventEmitter          events.Emitter
+}
+
+func (s *RuntimeInstanceEventSubscriber) ResourceKind() coremodel.ResourceKind 
{
+       return meshresource.RuntimeInstanceKind
+}
+
+func (s *RuntimeInstanceEventSubscriber) Name() string {
+       return "Engine-" + s.ResourceKind().ToString()
+}
+
+func (s *RuntimeInstanceEventSubscriber) ProcessEvent(event events.Event) 
error {
+       newObj, ok := event.NewObj().(*meshresource.RuntimeInstanceResource)
+       if !ok && newObj != nil {
+               return 
bizerror.NewAssertionError(meshresource.RuntimeInstanceKind, 
reflect.TypeOf(event.NewObj()).Name())
+       }
+       oldObj, ok := event.OldObj().(*meshresource.RuntimeInstanceResource)
+       if !ok && oldObj != nil {
+               return 
bizerror.NewAssertionError(meshresource.RuntimeInstanceKind, 
reflect.TypeOf(event.OldObj()).Name())
+       }
+       var processErr error
+       switch event.Type() {
+       case cache.Added, cache.Updated, cache.Replaced, cache.Sync:
+               if newObj == nil {
+                       errStr := "process runtime instance upsert event, but 
new obj is nil, skipped processing"
+                       logger.Error(errStr)
+                       return errors.New(errStr)
+               }
+               processErr = s.processUpsert(newObj)
+       case cache.Deleted:
+               if oldObj == nil {
+                       errStr := "process runtime instance delete event, but 
old obj is nil, skipped processing"
+                       logger.Error(errStr)
+                       return errors.New(errStr)
+               }
+               processErr = s.processDelete(oldObj)
+       }
+       eventStr := event.String()
+       if processErr == nil {
+               logger.Infof("process runtime instance event successfully, 
event: %s", eventStr)
+       } else {
+               logger.Errorf("process runtime instance event failed, event: 
%s, err: %s", eventStr, processErr.Error())
+       }
+       return processErr
+}
+
+func (s *RuntimeInstanceEventSubscriber) getRelatedInstanceResource(
+       rtInstance *meshresource.RuntimeInstanceResource) 
(*meshresource.InstanceResource, error) {
+       resources, err := 
s.instanceResourceStore.ListByIndexes(map[string]string{
+               index.ByInstanceIpIndex: rtInstance.Spec.Ip,
+       })
+       if err != nil {
+               return nil, err
+       }
+       if len(resources) == 0 {
+               return nil, nil
+       }
+       instanceResources := make([]*meshresource.InstanceResource, 
len(resources))
+       for i, item := range resources {
+               if res, ok := item.(*meshresource.InstanceResource); ok {
+                       instanceResources[i] = res
+               } else {
+                       return nil, 
bizerror.NewAssertionError("InstanceResource", reflect.TypeOf(item).Name())
+               }
+       }
+       return instanceResources[0], nil
+}
+
+func (s *RuntimeInstanceEventSubscriber) mergeRuntimeInstance(
+       instanceRes *meshresource.InstanceResource,
+       rtInstanceRes *meshresource.RuntimeInstanceResource) {
+       instanceRes.Name = rtInstanceRes.Name
+       instanceRes.Spec.Name = rtInstanceRes.Spec.Name
+       instanceRes.Spec.Ip = rtInstanceRes.Spec.Ip
+       instanceRes.Labels = rtInstanceRes.Labels
+       instanceRes.Spec.Image = rtInstanceRes.Spec.Image
+       instanceRes.Spec.CreateTime = rtInstanceRes.Spec.CreateTime
+       instanceRes.Spec.StartTime = rtInstanceRes.Spec.StartTime
+       instanceRes.Spec.ReadyTime = rtInstanceRes.Spec.ReadyTime
+       instanceRes.Spec.DeployState = rtInstanceRes.Spec.Phase
+       instanceRes.Spec.WorkloadType = rtInstanceRes.Spec.WorkloadType
+       instanceRes.Spec.WorkloadName = rtInstanceRes.Spec.WorkloadName
+       instanceRes.Spec.Node = rtInstanceRes.Spec.Node
+       instanceRes.Spec.Probes = rtInstanceRes.Spec.Probes
+       instanceRes.Spec.Conditions = rtInstanceRes.Spec.Conditions
+}
+
+func (s *RuntimeInstanceEventSubscriber) fromRuntimeInstance(
+       rtInstanceRes *meshresource.RuntimeInstanceResource) 
*meshresource.InstanceResource {
+       instanceRes := 
meshresource.NewInstanceResourceWithAttributes(rtInstanceRes.Name, 
rtInstanceRes.Mesh)
+       s.mergeRuntimeInstance(instanceRes, rtInstanceRes)
+       return instanceRes
+}
+
+// processUpsert when runtime instance added or updated, we should add/update 
the corresponding instance resource
+func (s *RuntimeInstanceEventSubscriber) processUpsert(rtInstanceRes 
*meshresource.RuntimeInstanceResource) error {
+       instanceResource, err := s.getRelatedInstanceResource(rtInstanceRes)
+       if err != nil {
+               return err
+       }
+       // If instance resource exists, the rpc instance resource exists in 
remote registry and has been watched by discovery.
+       // So we should merge the runtime info into it
+       if instanceResource != nil {
+               s.mergeRuntimeInstance(instanceResource, rtInstanceRes)
+               return s.instanceResourceStore.Update(instanceResource)
+       }
+       // If instance resource does not exist, we should create a new instance 
resource by runtime instance
+       // If the app name is empty, we cannot identify it as a dubbo app, so 
we skip it
+       if strutil.IsBlank(rtInstanceRes.Spec.AppName) {
+               logger.Warnf("cannot identify runtime instance %s as a dubbo 
app, skipped updating instance", rtInstanceRes.Name)
+               return nil
+       }
+       // Otherwise we can create a new instance resource by runtime instance
+       instanceRes := s.fromRuntimeInstance(rtInstanceRes)
+       if err = s.instanceResourceStore.Add(instanceRes); err != nil {
+               logger.Errorf("add instance resource failed, instance: %s, err: 
%s", instanceRes.ResourceKey(), err.Error())
+               return err
+       }
+       instanceAddEvent := events.NewResourceChangedEvent(cache.Added, nil, 
instanceRes)
+       s.eventEmitter.Send(instanceAddEvent)
+       logger.Debugf("runtime instance upsert trigger instance add event, 
event: %s", instanceAddEvent.String())
+       return nil
+}
+
+// processDelete when runtime instance deleted, we should delete the 
corresponding instance resource
+func (s *RuntimeInstanceEventSubscriber) processDelete(rtInstanceRes 
*meshresource.RuntimeInstanceResource) error {
+       instanceResource, err := s.getRelatedInstanceResource(rtInstanceRes)
+       if err != nil {
+               return err
+       }
+       if instanceResource == nil {
+               return nil
+       }
+       if err = 
s.instanceResourceStore.Delete(instanceResource.ResourceKey()); err != nil {
+               logger.Errorf("delete instance resource failed, instance: %s, 
err: %s", instanceResource.ResourceKey(), err.Error())
+               return err
+       }
+       instanceDeleteEvent := events.NewResourceChangedEvent(cache.Deleted, 
instanceResource, nil)
+       s.eventEmitter.Send(instanceDeleteEvent)
+       logger.Debugf("runtime instance delete trigger instance delete event, 
event: %s", instanceDeleteEvent.String())
+       return nil
+}
+
+func NewRuntimeInstanceEventSubscriber(instanceResourceStore 
store.ResourceStore) events.Subscriber {
+       return &RuntimeInstanceEventSubscriber{
+               instanceResourceStore: instanceResourceStore,

Review Comment:
   The `eventEmitter` field is not initialized in the constructor but is used 
in `processUpsert` (line 157) and `processDelete` (line 176). This will cause a 
nil pointer dereference. Add an `eventEmitter` parameter to the constructor.
   ```suggestion
   func NewRuntimeInstanceEventSubscriber(instanceResourceStore 
store.ResourceStore, eventEmitter events.Emitter) events.Subscriber {
        return &RuntimeInstanceEventSubscriber{
                instanceResourceStore: instanceResourceStore,
                eventEmitter:          eventEmitter,
   ```



##########
pkg/diagnostics/server.go:
##########
@@ -79,31 +76,30 @@ func (s *diagnosticsServer) Start(_ runtime.Runtime, stop 
<-chan struct{}) error
                Addr:              fmt.Sprintf(":%d", s.config.ServerPort),
                Handler:           mux,
                ReadHeaderTimeout: time.Second,
-               ErrorLog:          adapter.ToStd(diagnosticsServerLog),
        }
 
-       diagnosticsServerLog.Info("starting diagnostic server", "interface", 
"0.0.0.0", "port", s.config.ServerPort)
+       logger.Infof("starting diagnostic server, endpoint is 0.0.0.0: %d", 
s.config.ServerPort)
        errChan := make(chan error)
        go func() {
                defer close(errChan)
                var err error
                err = httpServer.ListenAndServe()
                if err != nil {
-                       switch err {
-                       case http.ErrServerClosed:
-                               diagnosticsServerLog.Info("shutting down 
server")
+                       switch {
+                       case errors.Is(err, http.ErrServerClosed):
+                               logger.Errorf("diagnostics http server closed, 
err: %s", err)

Review Comment:
   The log message says 'diagnostics http server closed' with an error level, 
but `http.ErrServerClosed` is the expected normal shutdown error. This should 
be logged at Info level with message 'shutting down server' (matching the 
original behavior) rather than as an error.
   ```suggestion
                                logger.Info("shutting down diagnostics server")
   ```



##########
pkg/diagnostics/server.go:
##########
@@ -79,31 +76,30 @@ func (s *diagnosticsServer) Start(_ runtime.Runtime, stop 
<-chan struct{}) error
                Addr:              fmt.Sprintf(":%d", s.config.ServerPort),
                Handler:           mux,
                ReadHeaderTimeout: time.Second,
-               ErrorLog:          adapter.ToStd(diagnosticsServerLog),
        }
 
-       diagnosticsServerLog.Info("starting diagnostic server", "interface", 
"0.0.0.0", "port", s.config.ServerPort)
+       logger.Infof("starting diagnostic server, endpoint is 0.0.0.0: %d", 
s.config.ServerPort)
        errChan := make(chan error)
        go func() {
                defer close(errChan)
                var err error
                err = httpServer.ListenAndServe()
                if err != nil {
-                       switch err {
-                       case http.ErrServerClosed:
-                               diagnosticsServerLog.Info("shutting down 
server")
+                       switch {
+                       case errors.Is(err, http.ErrServerClosed):
+                               logger.Errorf("diagnostics http server closed, 
err: %s", err)
                        default:
-                               diagnosticsServerLog.Error(err, "could not 
start HTTP Server")
+                               logger.Error("could not start diagnostics http 
Server, unknown err: %s", err)

Review Comment:
   Corrected capitalization of 'Server' to 'server'.
   ```suggestion
                                logger.Error("could not start diagnostics http 
server, unknown err: %s", err)
   ```



##########
app/dubbo-admin/dubbo-admin.yaml:
##########
@@ -39,17 +39,55 @@ console:
 store:
   type: memory
 discovery:
-  - type: nacos
-    id: nacos-44.33
-    address:
-      registry: nacos://47.76.94.134:8848?username=nacos&password=nacos
-      configCenter: nacos://47.76.94.134:8848?username=nacos&password=nacos
-      metadataReport: nacos://47.76.94.134:8848?username=nacos&password=nacos
-  - type: istio
+#  - type: nacos
+#    id: nacos-44.33
+#    address:
+#      registry: nacos://47.76.94.134:8848?username=nacos&password=nacos
+#      configCenter: nacos://47.76.94.134:8848?username=nacos&password=nacos
+#      metadataReport: nacos://47.76.94.134:8848?username=nacos&password=nacos
+  # mock discovery is only for development
+  - type: mock
 engine:
   name: k8s1.28.6
   type: kubernetes
   properties:
-    apiServerAddress: https://192.168.1.1:6443
-    kubeConfig: /etc/kubernetes/admin.conf
+    # [Kubernetes] Path to kubernetes config file, if not set, will use in 
cluster config
+    kubeConfigPath: /root/.kube/config
+    # [Kubernetes] Watch pods with specified labels, if not set, will watch 
all pods
+    # podWatchSelector: org.apache.dubbo/dubbo-apps=true
+    # [Kubernetes] Identify which Dubbo app the pod belongs to, if not set, 
[type = ByIP] will be used
+    # 1. ByLabels: Use the label value corresponding to the labelKey as the 
dubbo app name
+    # e.g.
+    #    type: ByLabel
+    #    labelKey: org.apache.dubbo/dubbo-app-name
+    # 2. ByAnnotation: Use the annotation value corresponding to the 
annotationKey as the dubbo app name
+    # e.g.
+    #    type: ByAnnotation
+    #    annotationKey: org.apache.dubbo/dubbo-app-name
+    # 3. ByIP(default): Use pod's IP to find if there is a same ip of an 
instance and use the instance's app name as the identifier,
+    # if there is no such association, the pod will not be seen as a pod of 
dubbo application.
+    # e.g.
+    #    type: ByIP
+#    dubboAppIdentifier:
+#      type: ByLabel
+#      labelKey: org.apache.dubbo/dubbo-app-name
+    # [Kubernetes] Strategy of choosing the main container, if not set, [type 
= ByIndex] and [index = 0] will be used
+    # 1. ByLast: choose the last container as the main container
+    # e.g.
+    #    type: ByLast
+    # 2. ByIndex(default): choose the container at the specified index 
location as the main container
+    # e.g.
+    #    type: ByIndex
+    #    index: 0
+    # 3. ByName: choose the container with the specified name
+    # e.g.
+    #    type: ByName
+    #    name: main
+    # 4. chooseByAnnotation: choose the container with the annotation key, 
specified annotation value will be used as the container name

Review Comment:
   Corrected spelling of 'chooseByAnnotation' to 'ByAnnotation' to match the 
constant name.
   ```suggestion
       # 4. ByAnnotation: choose the container with the annotation key, 
specified annotation value will be used as the container name
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to