Copilot commented on code in PR #1423: URL: https://github.com/apache/dubbo-admin/pull/1423#discussion_r2901555006
########## pkg/core/leader/model.go: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package leader + +import "time" + +// LeaderLease is the GORM model for the leader_leases table +// It uses optimistic locking via the Version field to ensure atomic leader elections +type LeaderLease struct { + ID uint `gorm:"primaryKey;autoIncrement"` + Component string `gorm:"uniqueIndex;size:64;not null"` + HolderID string `gorm:"size:255;not null"` + AcquiredAt time.Time `gorm:"not null"` + ExpiresAt time.Time `gorm:"not null"` + Version int64 `gorm:"not null;default:0"` Review Comment: The struct/doc comment says the `Version` field is used for optimistic locking to ensure atomic leader elections, but `TryAcquire` does not include `version` in its acquisition `UPDATE` predicate (it only uses it for `Renew`). Either update the documentation to match the actual behavior, or extend acquisition to use `Version` as part of the atomicity guarantee. ########## pkg/core/store/component.go: ########## @@ -109,3 +122,29 @@ func (sc *storeComponent) ResourceKindRoute(k coremodel.ResourceKind) (ResourceS return nil, fmt.Errorf("%s is not supported by store yet", k) } + +// GetDB returns the shared DB connection if the underlying store is DB-backed +// Implements the leader.DBSource interface +func (sc *storeComponent) GetDB() (*gorm.DB, bool) { + // Try to get DB from any store that has a Pool() method (all GormStores share the same ConnectionPool) + for _, store := range sc.stores { + if pp, ok := store.(poolProvider); ok { + pool := pp.Pool() + if pool == nil { + continue + } + // Use reflection to call GetDB() on the pool to avoid importing dbcommon + poolVal := reflect.ValueOf(pool) + getDBMethod := poolVal.MethodByName("GetDB") + if getDBMethod.IsValid() { + result := getDBMethod.Call(nil) + if len(result) > 0 { + if db, ok := result[0].Interface().(*gorm.DB); ok { + return db, true + } + } Review Comment: `GetDB()` relies on the `poolProvider` type assertion + reflection. Given the current `poolProvider` signature mismatch, this loop will never find a DB and leader election will never be initialized. Even after fixing the signature, prefer a direct type assertion to a minimal interface (e.g. `interface{ GetDB() *gorm.DB }`) instead of reflection to avoid runtime surprises and make this easier to test. ########## pkg/core/discovery/component.go: ########## @@ -111,10 +116,65 @@ func (d *discoveryComponent) Init(ctx runtime.BuilderContext) error { if err != nil { return err } + + // Initialize leader election if using DB store + if ctx.Config().Store.Type != storecfg.Memory { + if dbSrc, ok := storeComponent.(leader.DBSource); ok { + if db, hasDB := dbSrc.GetDB(); hasDB { + holderID, err := leader.GenerateHolderID() + if err != nil { + logger.Warnf("discovery: failed to generate holder ID, skipping leader election: %v", err) + } else { + le := leader.NewLeaderElection(db, runtime.ResourceDiscovery, holderID) + if err := le.EnsureTable(); err != nil { + logger.Warnf("discovery: failed to ensure leader lease table: %v", err) + } else { + d.leaderElection = le + d.needsLeaderElection = true + logger.Infof("discovery: leader election initialized (holder: %s)", holderID) + } + } + } + } + } + return nil } func (d *discoveryComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error { + // If no leader election is needed (Memory store or DB initialization failed), run business logic directly + if !d.needsLeaderElection { + return d.startBusinessLogic(ch) + } + + // Create a context that can be cancelled when the stop channel is closed + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Monitor the stop channel and cancel the context when it's closed + go func() { + <-ch + cancel() + }() + + // Run leader election with callbacks for starting/stopping leadership + d.leaderElection.RunLeaderElection(ctx, ch, + func() { // onStartLeading callback + logger.Infof("discovery: became leader, starting business logic") + if err := d.startBusinessLogic(ch); err != nil { + logger.Errorf("discovery: failed to start business logic: %v", err) + } + }, + func() { // onStopLeading callback + logger.Warnf("discovery: lost leadership, stopping business logic") + }, + ) Review Comment: Same issue as engine: `startBusinessLogic(ch)` launches informers with the global stop channel, but `onStopLeading` doesn’t stop them or unsubscribe, so a node that loses leadership will keep running leader-only behavior and repeated elections can start duplicate goroutines / duplicate subscription attempts. Use a dedicated stop channel/context per leadership term and stop/unsubscribe in `onStopLeading` to ensure followers do not execute list-watch or writes. ########## pkg/core/engine/component.go: ########## @@ -146,6 +173,39 @@ func (e *engineComponent) initSubscribers(eventbus events.EventBus) error { } func (e *engineComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error { + // If no leader election is needed (Memory store or DB initialization failed), run business logic directly + if !e.needsLeaderElection { + return e.startBusinessLogic(ch) + } + + // Create a context that can be cancelled when the stop channel is closed + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Monitor the stop channel and cancel the context when it's closed + go func() { + <-ch + cancel() + }() + + // Run leader election with callbacks for starting/stopping leadership + e.leaderElection.RunLeaderElection(ctx, ch, + func() { // onStartLeading callback + logger.Infof("engine: became leader, starting business logic") + if err := e.startBusinessLogic(ch); err != nil { + logger.Errorf("engine: failed to start business logic: %v", err) + } + }, + func() { // onStopLeading callback + logger.Warnf("engine: lost leadership, stopping business logic") + }, + ) Review Comment: When leadership is acquired, `startBusinessLogic(ch)` starts informers using the global stop channel `ch`, but on leadership loss the `onStopLeading` callback only logs. This means informers/subscribers will continue running (and can be started multiple times on leadership flaps), violating the follower contract and potentially duplicating list-watch + DB writes. Use a per-leadership stop channel/context for informers, and in `onStopLeading` close it and `Unsubscribe` any subscribers (or otherwise ensure business logic is idempotent and fully stops on leadership loss). ########## pkg/core/leader/leader.go: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package leader + +import ( + "context" + "fmt" + "os" + "sync/atomic" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +const ( + // DefaultLeaseDuration is the default duration for a leader lease + DefaultLeaseDuration = 30 * time.Second + // DefaultRenewInterval is the default interval for renewing the lease + DefaultRenewInterval = 10 * time.Second + // DefaultAcquireRetryInterval is the default retry interval for acquiring leadership + DefaultAcquireRetryInterval = 5 * time.Second +) + +// LeaderElection manages leader election for distributed components +// It uses database-based optimistic locking to ensure only one replica holds the lease at any time +type LeaderElection struct { + db *gorm.DB + component string + holderID string + leaseDuration time.Duration + renewInterval time.Duration + acquireRetry time.Duration + isLeader atomic.Bool + currentVersion int64 + stopCh chan struct{} +} + +// Option is a functional option for configuring LeaderElection +type Option func(*LeaderElection) + +// WithLeaseDuration sets the lease duration +func WithLeaseDuration(d time.Duration) Option { + return func(le *LeaderElection) { + le.leaseDuration = d + } +} + +// WithRenewInterval sets the renewal interval +func WithRenewInterval(d time.Duration) Option { + return func(le *LeaderElection) { + le.renewInterval = d + } +} + +// WithAcquireRetryInterval sets the acquisition retry interval +func WithAcquireRetryInterval(d time.Duration) Option { + return func(le *LeaderElection) { + le.acquireRetry = d + } +} + +// NewLeaderElection creates a new LeaderElection instance +func NewLeaderElection(db *gorm.DB, component, holderID string, opts ...Option) *LeaderElection { + le := &LeaderElection{ + db: db, + component: component, + holderID: holderID, + leaseDuration: DefaultLeaseDuration, + renewInterval: DefaultRenewInterval, + acquireRetry: DefaultAcquireRetryInterval, + stopCh: make(chan struct{}), + } + + for _, opt := range opts { + opt(le) + } + + return le +} + +// EnsureTable creates the leader_leases table if it doesn't exist +// This is idempotent and can be called multiple times +func (le *LeaderElection) EnsureTable() error { + return le.db.AutoMigrate(&LeaderLease{}) +} + +// TryAcquire attempts to acquire or renew the leader lease +// Returns true if the current holder successfully acquired/renewed the lease +func (le *LeaderElection) TryAcquire(ctx context.Context) bool { + now := time.Now() + expiresAt := now.Add(le.leaseDuration) + + // First, try to update an existing lease (either expired or held by us) + result := le.db.WithContext(ctx).Model(&LeaderLease{}). + Where("component = ? AND (expires_at < ? OR holder_id = ?)", le.component, now, le.holderID). + Updates(map[string]interface{}{ + "holder_id": le.holderID, + "acquired_at": now, + "expires_at": expiresAt, + "version": gorm.Expr("version + 1"), + }) + + if result.Error != nil { + logger.Warnf("leader election: failed to update lease for component %s: %v", le.component, result.Error) + le.isLeader.Store(false) + return false + } + + // If the update succeeded (found a row to update) + if result.RowsAffected > 0 { + // Fetch the updated version + var lease LeaderLease + err := le.db.WithContext(ctx). + Where("component = ?", le.component). + First(&lease).Error + if err == nil { + le.currentVersion = lease.Version + } + le.isLeader.Store(true) + return true + } + + // No row was updated, try to insert a new record (lease doesn't exist) + result = le.db.WithContext(ctx).Create(&LeaderLease{ + Component: le.component, + HolderID: le.holderID, + AcquiredAt: now, + ExpiresAt: expiresAt, + Version: 1, + }) + + if result.Error != nil { + // If insertion fails, it means another replica just created it + // This is expected in concurrent scenarios + logger.Debugf("leader election: failed to insert lease for component %s (probably created by another replica): %v", le.component, result.Error) + le.isLeader.Store(false) + return false + } + + le.currentVersion = 1 + le.isLeader.Store(true) + return true +} + +// Renew attempts to renew the current leader lease +// Returns true if the renewal was successful +func (le *LeaderElection) Renew(ctx context.Context) bool { + if !le.isLeader.Load() { + return false + } + + now := time.Now() + expiresAt := now.Add(le.leaseDuration) + + result := le.db.WithContext(ctx).Model(&LeaderLease{}). + Where("component = ? AND holder_id = ? AND version = ?", le.component, le.holderID, le.currentVersion). + Updates(map[string]interface{}{ + "acquired_at": now, + "expires_at": expiresAt, + "version": gorm.Expr("version + 1"), + }) + + if result.Error != nil { + logger.Warnf("leader election: failed to renew lease for component %s: %v", le.component, result.Error) + le.isLeader.Store(false) + return false + } + + if result.RowsAffected > 0 { + le.currentVersion++ + return true + } + + // Lease was lost (likely held by another replica now) + logger.Warnf("leader election: lost leader lease for component %s (renewal failed, version mismatch)", le.component) + le.isLeader.Store(false) + return false +} + +// Release releases the leader lease for this holder +// This should be called when the holder voluntarily gives up leadership +func (le *LeaderElection) Release(ctx context.Context) { + le.isLeader.Store(false) + + expiresAt := time.Now().Add(-1 * time.Second) // Immediately expire the lease + + result := le.db.WithContext(ctx).Model(&LeaderLease{}). + Where("component = ? AND holder_id = ?", le.component, le.holderID). + Update("expires_at", expiresAt) + + if result.Error != nil { + logger.Warnf("leader election: failed to release lease for component %s: %v", le.component, result.Error) + } +} + +// IsLeader returns true if this holder currently holds the leader lease +func (le *LeaderElection) IsLeader() bool { + return le.isLeader.Load() +} + +// RunLeaderElection runs the leader election loop +// It blocks and runs onStartLeading/onStopLeading callbacks as leadership changes +// This is designed to be run in a separate goroutine +func (le *LeaderElection) RunLeaderElection(ctx context.Context, stopCh <-chan struct{}, + onStartLeading func(), onStopLeading func()) { + + ticker := time.NewTicker(le.acquireRetry) + defer ticker.Stop() + + renewTicker := time.NewTicker(le.renewInterval) + renewTicker.Stop() // Don't start renewal ticker yet + + isLeader := false + + for { + select { + case <-ctx.Done(): + if isLeader { + le.Release(context.Background()) + onStopLeading() + } + return + case <-stopCh: + if isLeader { + le.Release(context.Background()) + onStopLeading() + } + return + case <-ticker.C: + // Try to acquire leadership if not already leader + if !isLeader { + if le.TryAcquire(ctx) { + logger.Infof("leader election: component %s acquired leadership (holder: %s)", le.component, le.holderID) + isLeader = true + renewTicker.Reset(le.renewInterval) + onStartLeading() + } Review Comment: `RunLeaderElection` waits for the first `ticker.C` before the initial `TryAcquire`, which can delay leader startup by up to `acquireRetry` (default 5s). Consider attempting `TryAcquire` once before starting the ticker loop so a leader can start work immediately on boot. ########## pkg/core/store/component.go: ########## @@ -34,6 +38,12 @@ type Router interface { ResourceKindRoute(k coremodel.ResourceKind) (ResourceStore, error) } +// poolProvider is an internal interface for stores that provide DB access +// This avoids circular imports by not referencing dbcommon directly +type poolProvider interface { + Pool() interface{} // Returns *ConnectionPool, but we don't type it to avoid import +} Review Comment: `poolProvider` is defined as `Pool() interface{}`, but `GormStore.Pool()` returns `*ConnectionPool`. Go method return types are not covariant, so `store.(poolProvider)` will never succeed and `GetDB()` will always return `(nil, false)`, effectively disabling leader election for DB stores. Consider exposing a small, import-cycle-safe interface on DB-backed stores (e.g. a `GetDB() *gorm.DB` method on the store itself) or adjust the `Pool()` signature to exactly match the interface and then type-assert to a `GetDB() *gorm.DB` interface instead of using reflection. ########## pkg/core/leader/leader.go: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package leader + +import ( + "context" + "fmt" + "os" + "sync/atomic" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +const ( + // DefaultLeaseDuration is the default duration for a leader lease + DefaultLeaseDuration = 30 * time.Second + // DefaultRenewInterval is the default interval for renewing the lease + DefaultRenewInterval = 10 * time.Second + // DefaultAcquireRetryInterval is the default retry interval for acquiring leadership + DefaultAcquireRetryInterval = 5 * time.Second +) + +// LeaderElection manages leader election for distributed components +// It uses database-based optimistic locking to ensure only one replica holds the lease at any time +type LeaderElection struct { + db *gorm.DB + component string + holderID string + leaseDuration time.Duration + renewInterval time.Duration + acquireRetry time.Duration + isLeader atomic.Bool + currentVersion int64 + stopCh chan struct{} Review Comment: `LeaderElection` has a `stopCh` field that is never used (the stop channel is passed into `RunLeaderElection` instead). Removing the unused field will simplify the struct and avoid confusion about which stop mechanism is authoritative. ```suggestion ``` ########## pkg/core/leader/leader.go: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package leader + +import ( + "context" + "fmt" + "os" + "sync/atomic" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +const ( + // DefaultLeaseDuration is the default duration for a leader lease + DefaultLeaseDuration = 30 * time.Second + // DefaultRenewInterval is the default interval for renewing the lease + DefaultRenewInterval = 10 * time.Second + // DefaultAcquireRetryInterval is the default retry interval for acquiring leadership + DefaultAcquireRetryInterval = 5 * time.Second +) + +// LeaderElection manages leader election for distributed components +// It uses database-based optimistic locking to ensure only one replica holds the lease at any time +type LeaderElection struct { + db *gorm.DB + component string + holderID string + leaseDuration time.Duration + renewInterval time.Duration + acquireRetry time.Duration + isLeader atomic.Bool + currentVersion int64 + stopCh chan struct{} +} + +// Option is a functional option for configuring LeaderElection +type Option func(*LeaderElection) + +// WithLeaseDuration sets the lease duration +func WithLeaseDuration(d time.Duration) Option { + return func(le *LeaderElection) { + le.leaseDuration = d + } +} + +// WithRenewInterval sets the renewal interval +func WithRenewInterval(d time.Duration) Option { + return func(le *LeaderElection) { + le.renewInterval = d + } +} + +// WithAcquireRetryInterval sets the acquisition retry interval +func WithAcquireRetryInterval(d time.Duration) Option { + return func(le *LeaderElection) { + le.acquireRetry = d + } +} + +// NewLeaderElection creates a new LeaderElection instance +func NewLeaderElection(db *gorm.DB, component, holderID string, opts ...Option) *LeaderElection { + le := &LeaderElection{ + db: db, + component: component, + holderID: holderID, + leaseDuration: DefaultLeaseDuration, + renewInterval: DefaultRenewInterval, + acquireRetry: DefaultAcquireRetryInterval, + stopCh: make(chan struct{}), + } + + for _, opt := range opts { + opt(le) + } + + return le +} + +// EnsureTable creates the leader_leases table if it doesn't exist +// This is idempotent and can be called multiple times +func (le *LeaderElection) EnsureTable() error { + return le.db.AutoMigrate(&LeaderLease{}) +} + +// TryAcquire attempts to acquire or renew the leader lease +// Returns true if the current holder successfully acquired/renewed the lease +func (le *LeaderElection) TryAcquire(ctx context.Context) bool { + now := time.Now() + expiresAt := now.Add(le.leaseDuration) + + // First, try to update an existing lease (either expired or held by us) + result := le.db.WithContext(ctx).Model(&LeaderLease{}). + Where("component = ? AND (expires_at < ? OR holder_id = ?)", le.component, now, le.holderID). + Updates(map[string]interface{}{ + "holder_id": le.holderID, + "acquired_at": now, + "expires_at": expiresAt, + "version": gorm.Expr("version + 1"), + }) + + if result.Error != nil { + logger.Warnf("leader election: failed to update lease for component %s: %v", le.component, result.Error) + le.isLeader.Store(false) + return false + } + + // If the update succeeded (found a row to update) + if result.RowsAffected > 0 { + // Fetch the updated version + var lease LeaderLease + err := le.db.WithContext(ctx). + Where("component = ?", le.component). + First(&lease).Error + if err == nil { + le.currentVersion = lease.Version + } + le.isLeader.Store(true) + return true + } + + // No row was updated, try to insert a new record (lease doesn't exist) + result = le.db.WithContext(ctx).Create(&LeaderLease{ + Component: le.component, + HolderID: le.holderID, + AcquiredAt: now, + ExpiresAt: expiresAt, + Version: 1, + }) + + if result.Error != nil { + // If insertion fails, it means another replica just created it + // This is expected in concurrent scenarios + logger.Debugf("leader election: failed to insert lease for component %s (probably created by another replica): %v", le.component, result.Error) + le.isLeader.Store(false) + return false Review Comment: `TryAcquire` treats any `Create` error as a benign race ("probably created by another replica") and logs it at debug level. That will also hide real DB failures (connectivity, permission, schema issues) and can leave the component stuck as follower without a visible signal. Consider checking specifically for unique-constraint violations and logging/handling other errors as warnings (or returning an error). ########## pkg/store/dbcommon/gorm_store.go: ########## @@ -582,3 +582,9 @@ func (gs *GormStore) rebuildIndices() error { logger.Infof("Rebuilt indices for %s: loaded %d resources", gs.kind.ToString(), len(models)) return nil } + +// Pool returns the connection pool for this store +// Used by other components (e.g., leader election) that need direct DB access +func (gs *GormStore) Pool() *ConnectionPool { Review Comment: This new `Pool()` accessor returns `*ConnectionPool`, but the only consumer (`storeComponent.GetDB`) currently looks for a `Pool() interface{}` method, so `GormStore` will not satisfy that interface and the accessor won’t be used. Align the method signature with the consumer approach (or replace this with a `GetDB() *gorm.DB` method on `GormStore` to avoid exposing the pool at all). ```suggestion func (gs *GormStore) Pool() interface{} { ``` ########## pkg/core/leader/leader.go: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package leader + +import ( + "context" + "fmt" + "os" + "sync/atomic" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +const ( + // DefaultLeaseDuration is the default duration for a leader lease + DefaultLeaseDuration = 30 * time.Second + // DefaultRenewInterval is the default interval for renewing the lease + DefaultRenewInterval = 10 * time.Second + // DefaultAcquireRetryInterval is the default retry interval for acquiring leadership + DefaultAcquireRetryInterval = 5 * time.Second +) + +// LeaderElection manages leader election for distributed components +// It uses database-based optimistic locking to ensure only one replica holds the lease at any time +type LeaderElection struct { + db *gorm.DB + component string + holderID string + leaseDuration time.Duration + renewInterval time.Duration + acquireRetry time.Duration + isLeader atomic.Bool + currentVersion int64 + stopCh chan struct{} +} + +// Option is a functional option for configuring LeaderElection +type Option func(*LeaderElection) + +// WithLeaseDuration sets the lease duration +func WithLeaseDuration(d time.Duration) Option { + return func(le *LeaderElection) { + le.leaseDuration = d + } +} + +// WithRenewInterval sets the renewal interval +func WithRenewInterval(d time.Duration) Option { + return func(le *LeaderElection) { + le.renewInterval = d + } +} + +// WithAcquireRetryInterval sets the acquisition retry interval +func WithAcquireRetryInterval(d time.Duration) Option { + return func(le *LeaderElection) { + le.acquireRetry = d + } +} + +// NewLeaderElection creates a new LeaderElection instance +func NewLeaderElection(db *gorm.DB, component, holderID string, opts ...Option) *LeaderElection { + le := &LeaderElection{ + db: db, + component: component, + holderID: holderID, + leaseDuration: DefaultLeaseDuration, + renewInterval: DefaultRenewInterval, + acquireRetry: DefaultAcquireRetryInterval, + stopCh: make(chan struct{}), + } + + for _, opt := range opts { + opt(le) + } + + return le +} + +// EnsureTable creates the leader_leases table if it doesn't exist +// This is idempotent and can be called multiple times +func (le *LeaderElection) EnsureTable() error { + return le.db.AutoMigrate(&LeaderLease{}) +} + +// TryAcquire attempts to acquire or renew the leader lease +// Returns true if the current holder successfully acquired/renewed the lease +func (le *LeaderElection) TryAcquire(ctx context.Context) bool { + now := time.Now() + expiresAt := now.Add(le.leaseDuration) + + // First, try to update an existing lease (either expired or held by us) + result := le.db.WithContext(ctx).Model(&LeaderLease{}). + Where("component = ? AND (expires_at < ? OR holder_id = ?)", le.component, now, le.holderID). + Updates(map[string]interface{}{ + "holder_id": le.holderID, + "acquired_at": now, + "expires_at": expiresAt, + "version": gorm.Expr("version + 1"), + }) + + if result.Error != nil { + logger.Warnf("leader election: failed to update lease for component %s: %v", le.component, result.Error) + le.isLeader.Store(false) + return false + } + + // If the update succeeded (found a row to update) + if result.RowsAffected > 0 { + // Fetch the updated version + var lease LeaderLease + err := le.db.WithContext(ctx). + Where("component = ?", le.component). + First(&lease).Error + if err == nil { + le.currentVersion = lease.Version + } + le.isLeader.Store(true) + return true + } + + // No row was updated, try to insert a new record (lease doesn't exist) + result = le.db.WithContext(ctx).Create(&LeaderLease{ + Component: le.component, + HolderID: le.holderID, + AcquiredAt: now, + ExpiresAt: expiresAt, + Version: 1, + }) + + if result.Error != nil { + // If insertion fails, it means another replica just created it + // This is expected in concurrent scenarios + logger.Debugf("leader election: failed to insert lease for component %s (probably created by another replica): %v", le.component, result.Error) + le.isLeader.Store(false) + return false + } + + le.currentVersion = 1 + le.isLeader.Store(true) + return true +} + +// Renew attempts to renew the current leader lease +// Returns true if the renewal was successful +func (le *LeaderElection) Renew(ctx context.Context) bool { + if !le.isLeader.Load() { + return false + } + + now := time.Now() + expiresAt := now.Add(le.leaseDuration) + + result := le.db.WithContext(ctx).Model(&LeaderLease{}). + Where("component = ? AND holder_id = ? AND version = ?", le.component, le.holderID, le.currentVersion). + Updates(map[string]interface{}{ + "acquired_at": now, + "expires_at": expiresAt, + "version": gorm.Expr("version + 1"), + }) + + if result.Error != nil { + logger.Warnf("leader election: failed to renew lease for component %s: %v", le.component, result.Error) + le.isLeader.Store(false) + return false + } + + if result.RowsAffected > 0 { + le.currentVersion++ + return true + } + + // Lease was lost (likely held by another replica now) + logger.Warnf("leader election: lost leader lease for component %s (renewal failed, version mismatch)", le.component) + le.isLeader.Store(false) + return false +} + +// Release releases the leader lease for this holder +// This should be called when the holder voluntarily gives up leadership +func (le *LeaderElection) Release(ctx context.Context) { + le.isLeader.Store(false) + + expiresAt := time.Now().Add(-1 * time.Second) // Immediately expire the lease + + result := le.db.WithContext(ctx).Model(&LeaderLease{}). + Where("component = ? AND holder_id = ?", le.component, le.holderID). + Update("expires_at", expiresAt) + + if result.Error != nil { + logger.Warnf("leader election: failed to release lease for component %s: %v", le.component, result.Error) + } +} + +// IsLeader returns true if this holder currently holds the leader lease +func (le *LeaderElection) IsLeader() bool { + return le.isLeader.Load() +} + +// RunLeaderElection runs the leader election loop +// It blocks and runs onStartLeading/onStopLeading callbacks as leadership changes +// This is designed to be run in a separate goroutine +func (le *LeaderElection) RunLeaderElection(ctx context.Context, stopCh <-chan struct{}, + onStartLeading func(), onStopLeading func()) { + + ticker := time.NewTicker(le.acquireRetry) + defer ticker.Stop() + + renewTicker := time.NewTicker(le.renewInterval) Review Comment: `renewTicker` is created but not `defer`-stopped on all exit paths (only stopped in some state transitions). If the function returns while `renewTicker` is active, the ticker goroutine can leak. Add `defer renewTicker.Stop()` right after creation (similar to `ticker`). ```suggestion renewTicker := time.NewTicker(le.renewInterval) defer renewTicker.Stop() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
