Copilot commented on code in PR #1423:
URL: https://github.com/apache/dubbo-admin/pull/1423#discussion_r2970668655
##########
pkg/core/discovery/component.go:
##########
@@ -123,11 +192,12 @@ func (d *discoveryComponent) Start(_ runtime.Runtime, ch
<-chan struct{}) error
fmt.Sprintf("subscriber %s can not subscribe
resource changed events", sub.Name()))
}
}
+ // 2. start informers
for name, informers := range d.informers {
for _, informer := range informers {
- go informer.Run(ch)
+ go informer.Run(stopCh)
}
- logger.Infof("resource discvoery %s has started succesfully",
name)
+ logger.Infof("resource discovery %s has started successfully",
name)
}
Review Comment:
`startBusinessLogic` is called on every leadership acquisition, but it
re-subscribes all subscribers without unsubscribing on leadership loss
(EventBus enforces unique subscriber names) and it tries to restart the same
informer instances. The informer implementation does not allow restarting once
stopped (`HasStarted()` guard in `pkg/core/controller/informer.go`), so after a
leader failover the new leader term will not restart list-watch and business
logic will be partially/fully disabled. Unsubscribe subscribers in
`onStopLeading` and recreate informers per leadership term (or redesign to keep
informers running but prevent follower-side writes).
##########
pkg/core/engine/component.go:
##########
@@ -146,6 +179,43 @@ func (e *engineComponent) initSubscribers(eventbus
events.EventBus) error {
}
func (e *engineComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error {
+ if !e.needsLeaderElection {
+ return e.startBusinessLogic(ch)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go func() {
+ <-ch
+ cancel()
+ }()
+
+ var leaderStopCh chan struct{}
+
+ e.leaderElection.RunLeaderElection(ctx, ch,
+ func() { // onStartLeading: create a fresh stopCh for this
leadership term
+ leaderStopCh = make(chan struct{})
+ logger.Infof("engine: became leader, starting business
logic")
+ if err := e.startBusinessLogic(leaderStopCh); err !=
nil {
+ logger.Errorf("engine: failed to start business
logic: %v", err)
+ }
+ },
+ func() { // onStopLeading: stop informers from the current term
+ logger.Warnf("engine: lost leadership, stopping
business logic")
+ if leaderStopCh != nil {
+ close(leaderStopCh)
+ leaderStopCh = nil
+ }
+ },
+ )
+
+ return nil
Review Comment:
With leader election enabled, `Start` blocks inside `RunLeaderElection` and
only returns when the process is stopping. The runtime logs "component started
successfully" only after `Start` returns (see `pkg/core/runtime/runtime.go`),
so this component will never be reported as started during normal operation,
and startup errors from `startBusinessLogic` can no longer be surfaced via the
`Start` return value. Consider running the leader-election loop in a goroutine
and returning from `Start` after successful initialization, while still
respecting `ch` for shutdown.
##########
pkg/core/discovery/component.go:
##########
@@ -111,10 +116,74 @@ func (d *discoveryComponent) Init(ctx
runtime.BuilderContext) error {
if err != nil {
return err
}
+
+ // Memory store runs single-replica; leader election is not needed.
+ if ctx.Config().Store.Type == storecfg.Memory {
+ return nil
+ }
+
+ dbSrc, ok := storeComponent.(leader.DBSource)
+ if !ok {
+ return nil
+ }
+ db, hasDB := dbSrc.GetDB()
+ if !hasDB {
+ return nil
+ }
+ holderID, err := leader.GenerateHolderID()
+ if err != nil {
+ logger.Warnf("discovery: failed to generate holder ID, skipping
leader election: %v", err)
+ return nil
+ }
+ le := leader.NewLeaderElection(db, runtime.ResourceDiscovery, holderID)
+ if err := le.EnsureTable(); err != nil {
+ logger.Warnf("discovery: failed to ensure leader lease table:
%v", err)
+ return nil
+ }
+ d.leaderElection = le
+ d.needsLeaderElection = true
+ logger.Infof("discovery: leader election initialized (holder: %s)",
holderID)
return nil
}
func (d *discoveryComponent) Start(_ runtime.Runtime, ch <-chan struct{})
error {
+ if !d.needsLeaderElection {
+ return d.startBusinessLogic(ch)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go func() {
+ <-ch
+ cancel()
+ }()
+
+ var leaderStopCh chan struct{}
+
+ d.leaderElection.RunLeaderElection(ctx, ch,
+ func() { // onStartLeading: create a fresh stopCh for this
leadership term
+ leaderStopCh = make(chan struct{})
+ logger.Infof("discovery: became leader, starting
business logic")
+ if err := d.startBusinessLogic(leaderStopCh); err !=
nil {
+ logger.Errorf("discovery: failed to start
business logic: %v", err)
+ }
+ },
+ func() { // onStopLeading: stop informers from the current term
+ logger.Warnf("discovery: lost leadership, stopping
business logic")
+ if leaderStopCh != nil {
+ close(leaderStopCh)
+ leaderStopCh = nil
+ }
+ },
+ )
Review Comment:
With leader election enabled, `Start` blocks inside `RunLeaderElection` and
only returns when the process is stopping. The runtime reports components as
started only after `Start` returns (see `pkg/core/runtime/runtime.go`), so this
component won't be marked started during normal operation, and failures in
`startBusinessLogic` are only logged (not returned). Consider running the
election loop asynchronously and returning from `Start` once setup is complete,
while still stopping the loop when `ch` is closed.
```suggestion
// Tie the leader election context to the component's stop channel.
go func() {
<-ch
cancel()
}()
// Run the leader election loop asynchronously so that Start can return
// and the runtime can mark this component as started.
go func() {
var leaderStopCh chan struct{}
d.leaderElection.RunLeaderElection(ctx, ch,
func() { // onStartLeading: create a fresh stopCh for
this leadership term
leaderStopCh = make(chan struct{})
logger.Infof("discovery: became leader,
starting business logic")
if err := d.startBusinessLogic(leaderStopCh);
err != nil {
logger.Errorf("discovery: failed to
start business logic: %v", err)
}
},
func() { // onStopLeading: stop informers from the
current term
logger.Warnf("discovery: lost leadership,
stopping business logic")
if leaderStopCh != nil {
close(leaderStopCh)
leaderStopCh = nil
}
},
)
}()
```
##########
pkg/core/engine/component.go:
##########
@@ -154,7 +224,7 @@ func (e *engineComponent) Start(_ runtime.Runtime, ch
<-chan struct{}) error {
}
// 2. start informers
for _, informer := range e.informers {
- go informer.Run(ch)
+ go informer.Run(stopCh)
}
logger.Infof("resource engine %s has started successfully", e.name)
return nil
Review Comment:
`startBusinessLogic` is invoked every time leadership is acquired, but it
(1) calls `Subscribe` again without unsubscribing on leadership loss (EventBus
rejects duplicate subscriber names), and (2) attempts to `Run` the same
informer instances again. The informer implementation explicitly disallows
restarting an informer once it has been stopped (`HasStarted()` check in
`pkg/core/controller/informer.go`), so after the first leadership loss/failover
the new leader term will not actually restart list-watch. To support leader
failover, unsubscribe subscribers on `onStopLeading` and recreate new informer
instances (and list-watchers) for each leadership term, or refactor to keep a
single informer instance but gate event processing without stopping it.
##########
pkg/core/leader/leader.go:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package leader
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "sync/atomic"
+ "time"
+
+ "github.com/google/uuid"
+ "gorm.io/gorm"
+
+ "github.com/apache/dubbo-admin/pkg/core/logger"
+)
+
+const (
+ // DefaultLeaseDuration is the default duration for a leader lease
+ DefaultLeaseDuration = 30 * time.Second
+ // DefaultRenewInterval is the default interval for renewing the lease
+ DefaultRenewInterval = 10 * time.Second
+ // DefaultAcquireRetryInterval is the default retry interval for
acquiring leadership
+ DefaultAcquireRetryInterval = 5 * time.Second
+)
+
+// LeaderElection manages leader election for distributed components
+// It uses database-based optimistic locking to ensure only one replica holds
the lease at any time
+type LeaderElection struct {
+ db *gorm.DB
+ component string
+ holderID string
+ leaseDuration time.Duration
+ renewInterval time.Duration
+ acquireRetry time.Duration
+ isLeader atomic.Bool
+ currentVersion int64
+ stopCh chan struct{}
+}
+
+// Option is a functional option for configuring LeaderElection
+type Option func(*LeaderElection)
+
+// WithLeaseDuration sets the lease duration
+func WithLeaseDuration(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.leaseDuration = d
+ }
+}
+
+// WithRenewInterval sets the renewal interval
+func WithRenewInterval(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.renewInterval = d
+ }
+}
+
+// WithAcquireRetryInterval sets the acquisition retry interval
+func WithAcquireRetryInterval(d time.Duration) Option {
+ return func(le *LeaderElection) {
+ le.acquireRetry = d
+ }
+}
+
+// NewLeaderElection creates a new LeaderElection instance
+func NewLeaderElection(db *gorm.DB, component, holderID string, opts
...Option) *LeaderElection {
+ le := &LeaderElection{
+ db: db,
+ component: component,
+ holderID: holderID,
+ leaseDuration: DefaultLeaseDuration,
+ renewInterval: DefaultRenewInterval,
+ acquireRetry: DefaultAcquireRetryInterval,
+ stopCh: make(chan struct{}),
+ }
+
+ for _, opt := range opts {
+ opt(le)
+ }
+
+ return le
+}
+
+// EnsureTable creates the leader_leases table if it doesn't exist
+// This is idempotent and can be called multiple times
+func (le *LeaderElection) EnsureTable() error {
+ return le.db.AutoMigrate(&LeaderLease{})
+}
+
+// TryAcquire attempts to acquire the leader lease from an expired holder.
+// It only competes for leases that have already expired and does NOT renew an
+// existing self-held lease — use Renew for that.
+// Returns true if the current holder successfully acquired the lease.
+func (le *LeaderElection) TryAcquire(ctx context.Context) bool {
+ now := time.Now()
+ expiresAt := now.Add(le.leaseDuration)
+
+ // Only take over an expired lease; never pre-empt an active holder.
+ result := le.db.WithContext(ctx).Model(&LeaderLease{}).
+ Where("component = ? AND expires_at < ?", le.component, now).
+ Updates(map[string]interface{}{
+ "holder_id": le.holderID,
+ "acquired_at": now,
+ "expires_at": expiresAt,
+ "version": gorm.Expr("version + 1"),
+ })
+
+ if result.Error != nil {
+ logger.Warnf("leader election: failed to update lease for
component %s: %v", le.component, result.Error)
+ le.isLeader.Store(false)
+ return false
+ }
+
+ // If the update succeeded (found a row to update)
+ if result.RowsAffected > 0 {
+ // Fetch the updated version
+ var lease LeaderLease
+ err := le.db.WithContext(ctx).
+ Where("component = ?", le.component).
+ First(&lease).Error
+ if err == nil {
+ le.currentVersion = lease.Version
+ }
+ le.isLeader.Store(true)
+ return true
+ }
+
+ // No row was updated, try to insert a new record (lease doesn't exist)
+ result = le.db.WithContext(ctx).Create(&LeaderLease{
+ Component: le.component,
+ HolderID: le.holderID,
+ AcquiredAt: now,
+ ExpiresAt: expiresAt,
+ Version: 1,
+ })
+
+ if result.Error != nil {
+ // If insertion fails, it means another replica just created it
+ // This is expected in concurrent scenarios
+ logger.Debugf("leader election: failed to insert lease for
component %s (probably created by another replica): %v", le.component,
result.Error)
+ le.isLeader.Store(false)
+ return false
+ }
Review Comment:
`TryAcquire` falls back to `Create` whenever the expired-lease `UPDATE`
affects 0 rows. That includes the common case where a non-expired lease already
exists, which will trigger a unique constraint error on every retry interval
and produce avoidable DB work/log noise. Consider first checking whether a
lease row exists (and is unexpired) and returning false early, or use an
upsert/"insert ... on conflict do nothing" pattern and treat conflict as a
normal non-leader outcome without logging it as an error.
##########
pkg/store/dbcommon/gorm_store.go:
##########
@@ -582,3 +582,9 @@ func (gs *GormStore) rebuildIndices() error {
logger.Infof("Rebuilt indices for %s: loaded %d resources",
gs.kind.ToString(), len(models))
return nil
}
+
+// Pool returns the connection pool for this store
+// Used by other components (e.g., leader election) that need direct DB access
+func (gs *GormStore) Pool() *ConnectionPool {
+ return gs.pool
Review Comment:
This new `Pool()` accessor returns a concrete `*ConnectionPool`, but the
consumer in `pkg/core/store` currently uses an interface-based type assertion
to avoid importing `dbcommon`. As implemented, the signatures don't match, so
DB discovery fails. Consider returning a minimal interface type (e.g., an
interface that only has `GetDB() *gorm.DB`) so `core/store` can depend on that
interface without importing `dbcommon`, and avoid reflection there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]