Copilot commented on code in PR #1422:
URL: https://github.com/apache/dubbo-admin/pull/1422#discussion_r2970687315
##########
pkg/store/dbcommon/gorm_store.go:
##########
@@ -550,35 +637,76 @@ func (gs *GormStore) getKeysByIndexes(indexes
map[string]string) ([]string, erro
return result, nil
}
-// clearIndices clears all in-memory indices
-func (gs *GormStore) clearIndices() {
- gs.indices.Clear()
+// persistIndexEntries writes index entries for a resource to the database
+// If oldResource is not nil, first deletes old entries, then inserts new ones
+func (gs *GormStore) persistIndexEntries(resource model.Resource, oldResource
model.Resource) error {
+ db := gs.pool.GetDB()
+
+ // Delete old entries if updating
+ if oldResource != nil {
+ if err := db.Where("resource_key = ?",
oldResource.ResourceKey()).Delete(&ResourceIndexModel{}).Error; err != nil {
Review Comment:
`persistIndexEntries` deletes old index rows using only `resource_key`.
Since `ResourceKey()` is not globally unique across resource kinds, this can
delete index entries belonging to other kinds that share the same key. The
delete should be scoped by `resource_kind` (and ideally `index_name` too if
needed).
```suggestion
if err := db.Where("resource_kind = ? AND resource_key = ?",
gs.kind.ToString(), oldResource.ResourceKey()).
Delete(&ResourceIndexModel{}).Error; err != nil {
```
##########
pkg/store/dbcommon/gorm_store.go:
##########
@@ -142,8 +145,10 @@ func (gs *GormStore) Add(obj interface{}) error {
return err
}
- // Update indices after successful DB operation
- gs.indices.UpdateResource(resource, nil)
+ // Persist index entries to DB
+ if err := gs.persistIndexEntries(resource, nil); err != nil {
+ logger.Warnf("failed to persist index entries for %s: %v",
resource.ResourceKey(), err)
+ }
return nil
Review Comment:
`Add` persists index entries after inserting the resource, but if index
persistence fails it only logs a warning and still returns success. Since all
indexed reads now depend on `resource_indices`, this can cause permanently
inconsistent query results. Consider making resource + index writes atomic
(transaction) and returning the persistence error to the caller (same applies
to `Update`/`Delete`).
##########
pkg/store/memory/store.go:
##########
@@ -204,3 +286,77 @@ func (rs *resourceStore) getKeysByIndexes(indexes
map[string]string) ([]string,
}
return keySet.ToSlice(), nil
}
+
+// addToTrees adds a resource to all relevant RadixTrees for prefix matching
+func (rs *resourceStore) addToTrees(resource coremodel.Resource) {
+ rs.treesMu.Lock()
+ defer rs.treesMu.Unlock()
+
+ // Get indexers from storeProxy, not from global registry
+ // This ensures we include both init-time and dynamically-added indexers
+ indexers := rs.storeProxy.GetIndexers()
+ for indexName, indexFunc := range indexers {
+ values, err := indexFunc(resource)
+ if err != nil {
+ continue
+ }
+ tree, ok := rs.prefixTrees[indexName]
+ if !ok || tree == nil {
+ continue
+ }
+ for _, v := range values {
+ // Key format: "indexValue/resourceKey"
+ key := v + "/" + resource.ResourceKey()
+ tree.Insert(key, struct{}{})
+ }
+ }
+}
+
+// removeFromTrees removes a resource from all relevant RadixTrees
+func (rs *resourceStore) removeFromTrees(resource coremodel.Resource) {
+ rs.treesMu.Lock()
+ defer rs.treesMu.Unlock()
+
+ // Get indexers from storeProxy, not from global registry
+ // This ensures we include both init-time and dynamically-added indexers
+ indexers := rs.storeProxy.GetIndexers()
+ for indexName, indexFunc := range indexers {
+ values, err := indexFunc(resource)
+ if err != nil {
+ continue
+ }
+ tree, ok := rs.prefixTrees[indexName]
+ if !ok || tree == nil {
+ continue
+ }
+ for _, v := range values {
+ // Key format: "indexValue/resourceKey"
+ key := v + "/" + resource.ResourceKey()
+ tree.Delete(key)
+ }
+ }
+}
+
+// getKeysByPrefix retrieves resource keys by prefix match using RadixTree
+func (rs *resourceStore) getKeysByPrefix(indexName, prefix string) ([]string,
error) {
+ rs.treesMu.RLock()
+ defer rs.treesMu.RUnlock()
+
+ tree, ok := rs.prefixTrees[indexName]
+ if !ok {
+ return nil, fmt.Errorf("index %s does not exist", indexName)
+ }
+
+ var keys []string
+ tree.WalkPrefix(prefix, func(k string, v interface{}) bool {
+ // Key format: "indexValue/resourceKey"
+ // Extract the resourceKey part (after the last "/")
+ idx := strings.LastIndex(k, "/")
Review Comment:
Prefix-tree key encoding uses `v + "/" + resource.ResourceKey()` and later
extracts the resource key using `strings.LastIndex(k, "/")`. Since
`ResourceKey()` is typically `mesh/name` and contains `/`, this parsing will
truncate the key (only the part after the last `/`) and return
incorrect/missing resources for `HasPrefix` queries.
```suggestion
// Extract the resourceKey part (after the first "/")
idx := strings.Index(k, "/")
```
##########
pkg/store/memory/store.go:
##########
@@ -63,14 +74,42 @@ func (rs *resourceStore) Start(_ runtime.Runtime, _ <-chan
struct{}) error {
}
func (rs *resourceStore) Add(obj interface{}) error {
- return rs.storeProxy.Add(obj)
+ if err := rs.storeProxy.Add(obj); err != nil {
+ return err
+ }
+ r, ok := obj.(coremodel.Resource)
+ if ok {
+ rs.addToTrees(r)
+ }
+ return nil
}
func (rs *resourceStore) Update(obj interface{}) error {
- return rs.storeProxy.Update(obj)
+ r, ok := obj.(coremodel.Resource)
+ if ok {
+ // Get the old resource from the store to properly remove it
from trees
+ oldObj, exists, err := rs.storeProxy.Get(r)
+ if exists && err == nil {
+ if oldRes, ok := oldObj.(coremodel.Resource); ok {
+ rs.removeFromTrees(oldRes)
+ }
+ }
+ }
+ if err := rs.storeProxy.Update(obj); err != nil {
+ return err
+ }
+ if ok {
+ // Add new entry with updated values
+ rs.addToTrees(r)
+ }
Review Comment:
`Update` removes the old resource from prefix trees before calling
`storeProxy.Update`. If `storeProxy.Update` returns an error, the underlying
cache remains unchanged but the prefix tree state has already been mutated,
leading to incorrect `HasPrefix` results. Consider only mutating prefix trees
after a successful `storeProxy.Update` (and similarly ensuring rollback on
errors).
##########
pkg/store/dbcommon/gorm_store.go:
##########
@@ -35,48 +36,50 @@ import (
)
// GormStore is a GORM-backed store implementation for Dubbo resources
-// It uses GORM for database operations and maintains in-memory indices for
fast lookups
+// It uses GORM for database operations and persists all indices to the
resource_indices table
// This implementation is database-agnostic and works with any GORM-supported
database
type GormStore struct {
- pool *ConnectionPool // Shared connection pool with reference
counting
- kind model.ResourceKind
- address string
- indices *Index // In-memory index with thread-safe operations
- stopCh chan struct{}
+ pool *ConnectionPool // Shared connection pool with reference
counting
+ kind model.ResourceKind
+ address string
+ indexers cache.Indexers // Index functions for creating indices
+ mu sync.RWMutex // Protects indexers
+ stopCh chan struct{}
}
var _ store.ManagedResourceStore = &GormStore{}
// NewGormStore creates a new GORM store for the specified resource kind
func NewGormStore(kind model.ResourceKind, address string, pool
*ConnectionPool) *GormStore {
return &GormStore{
- kind: kind,
- address: address,
- pool: pool,
- indices: NewIndex(),
- stopCh: make(chan struct{}),
+ kind: kind,
+ address: address,
+ pool: pool,
+ indexers: make(cache.Indexers),
+ stopCh: make(chan struct{}),
}
}
-// Init initializes the GORM store by migrating the schema and rebuilding
indices
+// Init initializes the GORM store by migrating the schema and registering
indexers
func (gs *GormStore) Init(_ runtime.BuilderContext) error {
// Perform table migration
db := gs.pool.GetDB()
// Use Scopes to set the table name dynamically for migration
if err :=
db.Scopes(TableScope(gs.kind.ToString())).AutoMigrate(&ResourceModel{}); err !=
nil {
return fmt.Errorf("failed to migrate schema for %s: %w",
gs.kind.ToString(), err)
}
+
+ // Migrate resource_indices table (shared across all resource kinds)
+ if err := db.AutoMigrate(&ResourceIndexModel{}); err != nil {
+ return fmt.Errorf("failed to migrate resource_indices: %w", err)
+ }
+
// Register indexers for the resource kind
indexers := index.IndexersRegistry().Indexers(gs.kind)
if err := gs.AddIndexers(indexers); err != nil {
return err
}
- // Rebuild indices from existing data in the database
- if err := gs.rebuildIndices(); err != nil {
- return fmt.Errorf("failed to rebuild indices for %s: %w",
gs.kind.ToString(), err)
- }
-
logger.Infof("GORM store initialized for resource kind: %s",
gs.kind.ToString())
return nil
}
Review Comment:
`GormStore.Init` no longer rebuilds/backfills the new `resource_indices`
table from existing `ResourceModel` rows. On restart or upgrade from a DB that
already contains resources, `ListByIndexes`/`HasPrefix` will return empty
results until each resource is re-written. Consider adding an init-time
backfill (e.g., delete existing `resource_kind` entries and bulk-insert
computed index entries) after indexers are registered.
##########
pkg/store/dbcommon/gorm_store.go:
##########
@@ -550,35 +637,76 @@ func (gs *GormStore) getKeysByIndexes(indexes
map[string]string) ([]string, erro
return result, nil
}
-// clearIndices clears all in-memory indices
-func (gs *GormStore) clearIndices() {
- gs.indices.Clear()
+// persistIndexEntries writes index entries for a resource to the database
+// If oldResource is not nil, first deletes old entries, then inserts new ones
+func (gs *GormStore) persistIndexEntries(resource model.Resource, oldResource
model.Resource) error {
+ db := gs.pool.GetDB()
+
+ // Delete old entries if updating
+ if oldResource != nil {
+ if err := db.Where("resource_key = ?",
oldResource.ResourceKey()).Delete(&ResourceIndexModel{}).Error; err != nil {
+ return err
+ }
+ }
+
+ // Get all index entries for this resource
+ indexers := gs.GetIndexers()
+ var entries []ResourceIndexModel
+ for indexName, indexFunc := range indexers {
+ values, err := indexFunc(resource)
+ if err != nil {
+ continue
+ }
+ for _, v := range values {
+ entries = append(entries, ResourceIndexModel{
+ ResourceKind: gs.kind.ToString(),
+ IndexName: indexName,
+ IndexValue: v,
+ ResourceKey: resource.ResourceKey(),
+ Operator: string(index.Equals),
+ })
+ }
+ }
+
+ if len(entries) == 0 {
+ return nil
+ }
+
+ return db.Create(&entries).Error
}
-// rebuildIndices rebuilds all in-memory indices from existing database records
-// This is called during initialization to ensure indices are populated with
existing data
-func (gs *GormStore) rebuildIndices() error {
- // Clear existing indices first
- gs.clearIndices()
+// deleteIndexEntries removes all index entries for a resource key
+func (gs *GormStore) deleteIndexEntries(resourceKey string) error {
+ db := gs.pool.GetDB()
+ return db.Where("resource_key = ?",
resourceKey).Delete(&ResourceIndexModel{}).Error
+}
Review Comment:
`deleteIndexEntries` deletes from `resource_indices` using only
`resource_key`, which can remove index rows for other resource kinds that
happen to share the same key. This should be scoped by `resource_kind` (and
possibly `index_name`) to avoid cross-kind data loss.
##########
pkg/store/dbcommon/gorm_store.go:
##########
@@ -338,9 +350,31 @@ func (gs *GormStore) Replace(list []interface{}, _ string)
error {
return err
}
- // Rebuild indices for all resources
+ // Persist all index entries in bulk
+ var indexEntries []ResourceIndexModel
+ indexers := gs.GetIndexers()
for _, resource := range resources {
- gs.indices.UpdateResource(resource, nil)
+ for indexName, indexFunc := range indexers {
+ values, err := indexFunc(resource)
+ if err != nil {
+ continue
+ }
+ for _, v := range values {
+ indexEntries = append(indexEntries,
ResourceIndexModel{
+ ResourceKind:
gs.kind.ToString(),
+ IndexName: indexName,
+ IndexValue: v,
+ ResourceKey:
resource.ResourceKey(),
+ Operator:
string(index.Equals),
+ })
+ }
+ }
+ }
+
+ if len(indexEntries) > 0 {
+ if err := tx.CreateInBatches(&indexEntries, 100).Error;
err != nil {
+ logger.Warnf("failed to persist index entries
during replace: %v", err)
Review Comment:
During `Replace`, failures persisting `resource_indices` are logged but not
returned, allowing the transaction to commit resources without corresponding
index rows. This will break all index-based queries after a replace. Consider
returning the error so `Replace` remains atomic for both resources and indices.
```suggestion
logger.Warnf("failed to persist index entries
during replace: %v", err)
return err
```
##########
pkg/store/memory/store.go:
##########
@@ -63,14 +74,42 @@ func (rs *resourceStore) Start(_ runtime.Runtime, _ <-chan
struct{}) error {
}
func (rs *resourceStore) Add(obj interface{}) error {
- return rs.storeProxy.Add(obj)
+ if err := rs.storeProxy.Add(obj); err != nil {
+ return err
+ }
+ r, ok := obj.(coremodel.Resource)
+ if ok {
+ rs.addToTrees(r)
+ }
+ return nil
}
func (rs *resourceStore) Update(obj interface{}) error {
- return rs.storeProxy.Update(obj)
+ r, ok := obj.(coremodel.Resource)
+ if ok {
+ // Get the old resource from the store to properly remove it
from trees
+ oldObj, exists, err := rs.storeProxy.Get(r)
+ if exists && err == nil {
+ if oldRes, ok := oldObj.(coremodel.Resource); ok {
+ rs.removeFromTrees(oldRes)
+ }
+ }
+ }
+ if err := rs.storeProxy.Update(obj); err != nil {
+ return err
+ }
+ if ok {
+ // Add new entry with updated values
+ rs.addToTrees(r)
+ }
+ return nil
}
func (rs *resourceStore) Delete(obj interface{}) error {
+ r, ok := obj.(coremodel.Resource)
+ if ok {
+ rs.removeFromTrees(r)
+ }
return rs.storeProxy.Delete(obj)
Review Comment:
`Delete` removes the resource from prefix trees before calling
`storeProxy.Delete`. If `storeProxy.Delete` fails, the prefix trees become
inconsistent with the underlying indexer contents. Consider performing the tree
removal only after a successful delete, or re-adding on failure.
```suggestion
if err := rs.storeProxy.Delete(obj); err != nil {
return err
}
if r, ok := obj.(coremodel.Resource); ok {
rs.removeFromTrees(r)
}
return nil
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]