Copilot commented on code in PR #1423:
URL: https://github.com/apache/dubbo-admin/pull/1423#discussion_r2970668655


##########
pkg/core/discovery/component.go:
##########
@@ -123,11 +192,12 @@ func (d *discoveryComponent) Start(_ runtime.Runtime, ch 
<-chan struct{}) error
                                fmt.Sprintf("subscriber %s can not subscribe 
resource changed events", sub.Name()))
                }
        }
+       // 2. start informers
        for name, informers := range d.informers {
                for _, informer := range informers {
-                       go informer.Run(ch)
+                       go informer.Run(stopCh)
                }
-               logger.Infof("resource discvoery %s has started succesfully", 
name)
+               logger.Infof("resource discovery %s has started successfully", 
name)
        }

Review Comment:
   `startBusinessLogic` is called on every leadership acquisition, but it 
re-subscribes all subscribers without unsubscribing on leadership loss 
(EventBus enforces unique subscriber names) and it tries to restart the same 
informer instances. The informer implementation does not allow restarting once 
stopped (`HasStarted()` guard in `pkg/core/controller/informer.go`), so after a 
leader failover the new leader term will not restart list-watch and business 
logic will be partially/fully disabled. Unsubscribe subscribers in 
`onStopLeading` and recreate informers per leadership term (or redesign to keep 
informers running but prevent follower-side writes).



##########
pkg/core/engine/component.go:
##########
@@ -146,6 +179,43 @@ func (e *engineComponent) initSubscribers(eventbus 
events.EventBus) error {
 }
 
 func (e *engineComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error {
+       if !e.needsLeaderElection {
+               return e.startBusinessLogic(ch)
+       }
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       go func() {
+               <-ch
+               cancel()
+       }()
+
+       var leaderStopCh chan struct{}
+
+       e.leaderElection.RunLeaderElection(ctx, ch,
+               func() { // onStartLeading: create a fresh stopCh for this 
leadership term
+                       leaderStopCh = make(chan struct{})
+                       logger.Infof("engine: became leader, starting business 
logic")
+                       if err := e.startBusinessLogic(leaderStopCh); err != 
nil {
+                               logger.Errorf("engine: failed to start business 
logic: %v", err)
+                       }
+               },
+               func() { // onStopLeading: stop informers from the current term
+                       logger.Warnf("engine: lost leadership, stopping 
business logic")
+                       if leaderStopCh != nil {
+                               close(leaderStopCh)
+                               leaderStopCh = nil
+                       }
+               },
+       )
+
+       return nil

Review Comment:
   With leader election enabled, `Start` blocks inside `RunLeaderElection` and 
only returns when the process is stopping. The runtime logs "component started 
successfully" only after `Start` returns (see `pkg/core/runtime/runtime.go`), 
so this component will never be reported as started during normal operation, 
and startup errors from `startBusinessLogic` can no longer be surfaced via the 
`Start` return value. Consider running the leader-election loop in a goroutine 
and returning from `Start` after successful initialization, while still 
respecting `ch` for shutdown.



##########
pkg/core/discovery/component.go:
##########
@@ -111,10 +116,74 @@ func (d *discoveryComponent) Init(ctx 
runtime.BuilderContext) error {
        if err != nil {
                return err
        }
+
+       // Memory store runs single-replica; leader election is not needed.
+       if ctx.Config().Store.Type == storecfg.Memory {
+               return nil
+       }
+
+       dbSrc, ok := storeComponent.(leader.DBSource)
+       if !ok {
+               return nil
+       }
+       db, hasDB := dbSrc.GetDB()
+       if !hasDB {
+               return nil
+       }
+       holderID, err := leader.GenerateHolderID()
+       if err != nil {
+               logger.Warnf("discovery: failed to generate holder ID, skipping 
leader election: %v", err)
+               return nil
+       }
+       le := leader.NewLeaderElection(db, runtime.ResourceDiscovery, holderID)
+       if err := le.EnsureTable(); err != nil {
+               logger.Warnf("discovery: failed to ensure leader lease table: 
%v", err)
+               return nil
+       }
+       d.leaderElection = le
+       d.needsLeaderElection = true
+       logger.Infof("discovery: leader election initialized (holder: %s)", 
holderID)
        return nil
 }
 
 func (d *discoveryComponent) Start(_ runtime.Runtime, ch <-chan struct{}) 
error {
+       if !d.needsLeaderElection {
+               return d.startBusinessLogic(ch)
+       }
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       go func() {
+               <-ch
+               cancel()
+       }()
+
+       var leaderStopCh chan struct{}
+
+       d.leaderElection.RunLeaderElection(ctx, ch,
+               func() { // onStartLeading: create a fresh stopCh for this 
leadership term
+                       leaderStopCh = make(chan struct{})
+                       logger.Infof("discovery: became leader, starting 
business logic")
+                       if err := d.startBusinessLogic(leaderStopCh); err != 
nil {
+                               logger.Errorf("discovery: failed to start 
business logic: %v", err)
+                       }
+               },
+               func() { // onStopLeading: stop informers from the current term
+                       logger.Warnf("discovery: lost leadership, stopping 
business logic")
+                       if leaderStopCh != nil {
+                               close(leaderStopCh)
+                               leaderStopCh = nil
+                       }
+               },
+       )

Review Comment:
   With leader election enabled, `Start` blocks inside `RunLeaderElection` and 
only returns when the process is stopping. The runtime reports components as 
started only after `Start` returns (see `pkg/core/runtime/runtime.go`), so this 
component won't be marked started during normal operation, and failures in 
`startBusinessLogic` are only logged (not returned). Consider running the 
election loop asynchronously and returning from `Start` once setup is complete, 
while still stopping the loop when `ch` is closed.
   ```suggestion
   
        // Tie the leader election context to the component's stop channel.
        go func() {
                <-ch
                cancel()
        }()
   
        // Run the leader election loop asynchronously so that Start can return
        // and the runtime can mark this component as started.
        go func() {
                var leaderStopCh chan struct{}
   
                d.leaderElection.RunLeaderElection(ctx, ch,
                        func() { // onStartLeading: create a fresh stopCh for 
this leadership term
                                leaderStopCh = make(chan struct{})
                                logger.Infof("discovery: became leader, 
starting business logic")
                                if err := d.startBusinessLogic(leaderStopCh); 
err != nil {
                                        logger.Errorf("discovery: failed to 
start business logic: %v", err)
                                }
                        },
                        func() { // onStopLeading: stop informers from the 
current term
                                logger.Warnf("discovery: lost leadership, 
stopping business logic")
                                if leaderStopCh != nil {
                                        close(leaderStopCh)
                                        leaderStopCh = nil
                                }
                        },
                )
        }()
   ```



##########
pkg/core/engine/component.go:
##########
@@ -154,7 +224,7 @@ func (e *engineComponent) Start(_ runtime.Runtime, ch 
<-chan struct{}) error {
        }
        // 2. start informers
        for _, informer := range e.informers {
-               go informer.Run(ch)
+               go informer.Run(stopCh)
        }
        logger.Infof("resource engine %s has started successfully", e.name)
        return nil

Review Comment:
   `startBusinessLogic` is invoked every time leadership is acquired, but it 
(1) calls `Subscribe` again without unsubscribing on leadership loss (EventBus 
rejects duplicate subscriber names), and (2) attempts to `Run` the same 
informer instances again. The informer implementation explicitly disallows 
restarting an informer once it has been stopped (`HasStarted()` check in 
`pkg/core/controller/informer.go`), so after the first leadership loss/failover 
the new leader term will not actually restart list-watch. To support leader 
failover, unsubscribe subscribers on `onStopLeading` and recreate new informer 
instances (and list-watchers) for each leadership term, or refactor to keep a 
single informer instance but gate event processing without stopping it.



##########
pkg/core/leader/leader.go:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "sync/atomic"
+       "time"
+
+       "github.com/google/uuid"
+       "gorm.io/gorm"
+
+       "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+const (
+       // DefaultLeaseDuration is the default duration for a leader lease
+       DefaultLeaseDuration = 30 * time.Second
+       // DefaultRenewInterval is the default interval for renewing the lease
+       DefaultRenewInterval = 10 * time.Second
+       // DefaultAcquireRetryInterval is the default retry interval for 
acquiring leadership
+       DefaultAcquireRetryInterval = 5 * time.Second
+)
+
+// LeaderElection manages leader election for distributed components
+// It uses database-based optimistic locking to ensure only one replica holds 
the lease at any time
+type LeaderElection struct {
+       db             *gorm.DB
+       component      string
+       holderID       string
+       leaseDuration  time.Duration
+       renewInterval  time.Duration
+       acquireRetry   time.Duration
+       isLeader       atomic.Bool
+       currentVersion int64
+       stopCh         chan struct{}
+}
+
+// Option is a functional option for configuring LeaderElection
+type Option func(*LeaderElection)
+
+// WithLeaseDuration sets the lease duration
+func WithLeaseDuration(d time.Duration) Option {
+       return func(le *LeaderElection) {
+               le.leaseDuration = d
+       }
+}
+
+// WithRenewInterval sets the renewal interval
+func WithRenewInterval(d time.Duration) Option {
+       return func(le *LeaderElection) {
+               le.renewInterval = d
+       }
+}
+
+// WithAcquireRetryInterval sets the acquisition retry interval
+func WithAcquireRetryInterval(d time.Duration) Option {
+       return func(le *LeaderElection) {
+               le.acquireRetry = d
+       }
+}
+
+// NewLeaderElection creates a new LeaderElection instance
+func NewLeaderElection(db *gorm.DB, component, holderID string, opts 
...Option) *LeaderElection {
+       le := &LeaderElection{
+               db:            db,
+               component:     component,
+               holderID:      holderID,
+               leaseDuration: DefaultLeaseDuration,
+               renewInterval: DefaultRenewInterval,
+               acquireRetry:  DefaultAcquireRetryInterval,
+               stopCh:        make(chan struct{}),
+       }
+
+       for _, opt := range opts {
+               opt(le)
+       }
+
+       return le
+}
+
+// EnsureTable creates the leader_leases table if it doesn't exist
+// This is idempotent and can be called multiple times
+func (le *LeaderElection) EnsureTable() error {
+       return le.db.AutoMigrate(&LeaderLease{})
+}
+
+// TryAcquire attempts to acquire the leader lease from an expired holder.
+// It only competes for leases that have already expired and does NOT renew an
+// existing self-held lease — use Renew for that.
+// Returns true if the current holder successfully acquired the lease.
+func (le *LeaderElection) TryAcquire(ctx context.Context) bool {
+       now := time.Now()
+       expiresAt := now.Add(le.leaseDuration)
+
+       // Only take over an expired lease; never pre-empt an active holder.
+       result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+               Where("component = ? AND expires_at < ?", le.component, now).
+               Updates(map[string]interface{}{
+                       "holder_id":   le.holderID,
+                       "acquired_at": now,
+                       "expires_at":  expiresAt,
+                       "version":     gorm.Expr("version + 1"),
+               })
+
+       if result.Error != nil {
+               logger.Warnf("leader election: failed to update lease for 
component %s: %v", le.component, result.Error)
+               le.isLeader.Store(false)
+               return false
+       }
+
+       // If the update succeeded (found a row to update)
+       if result.RowsAffected > 0 {
+               // Fetch the updated version
+               var lease LeaderLease
+               err := le.db.WithContext(ctx).
+                       Where("component = ?", le.component).
+                       First(&lease).Error
+               if err == nil {
+                       le.currentVersion = lease.Version
+               }
+               le.isLeader.Store(true)
+               return true
+       }
+
+       // No row was updated, try to insert a new record (lease doesn't exist)
+       result = le.db.WithContext(ctx).Create(&LeaderLease{
+               Component:  le.component,
+               HolderID:   le.holderID,
+               AcquiredAt: now,
+               ExpiresAt:  expiresAt,
+               Version:    1,
+       })
+
+       if result.Error != nil {
+               // If insertion fails, it means another replica just created it
+               // This is expected in concurrent scenarios
+               logger.Debugf("leader election: failed to insert lease for 
component %s (probably created by another replica): %v", le.component, 
result.Error)
+               le.isLeader.Store(false)
+               return false
+       }

Review Comment:
   `TryAcquire` falls back to `Create` whenever the expired-lease `UPDATE` 
affects 0 rows. That includes the common case where a non-expired lease already 
exists, which will trigger a unique constraint error on every retry interval 
and produce avoidable DB work/log noise. Consider first checking whether a 
lease row exists (and is unexpired) and returning false early, or use an 
upsert/"insert ... on conflict do nothing" pattern and treat conflict as a 
normal non-leader outcome without logging it as an error.



##########
pkg/store/dbcommon/gorm_store.go:
##########
@@ -582,3 +582,9 @@ func (gs *GormStore) rebuildIndices() error {
        logger.Infof("Rebuilt indices for %s: loaded %d resources", 
gs.kind.ToString(), len(models))
        return nil
 }
+
+// Pool returns the connection pool for this store
+// Used by other components (e.g., leader election) that need direct DB access
+func (gs *GormStore) Pool() *ConnectionPool {
+       return gs.pool

Review Comment:
   This new `Pool()` accessor returns a concrete `*ConnectionPool`, but the 
consumer in `pkg/core/store` currently uses an interface-based type assertion 
to avoid importing `dbcommon`. As implemented, the signatures don't match, so 
DB discovery fails. Consider returning a minimal interface type (e.g., an 
interface that only has `GetDB() *gorm.DB`) so `core/store` can depend on that 
interface without importing `dbcommon`, and avoid reflection there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to