mrproliu commented on code in PR #1138:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1138#discussion_r3278199974


##########
banyand/measure/migration_schema.go:
##########
@@ -0,0 +1,420 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "path/filepath"
+       "strings"
+
+       "github.com/blugelabs/bluge"
+       "github.com/pkg/errors"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       backupsnapshot 
"github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+// errSchemaPropertyMissing signals "no schema-property/_schema under
+// backupDir". Returned by findSchemaPropertyRoot when a caller asks for
+// a backup-tree discovery but the snapshot omitted the catalog.
+var errSchemaPropertyMissing = errors.New("schema-property catalog not found 
in backup")
+
+// schemaPropDoc is one decoded doc emitted by walkSchemaPropertyShard;
+// the caller decides whether the kind / group matches what it wants.
+type schemaPropDoc struct {
+       propID     string
+       kindName   string // "measure" / "group" / "stream" / ...
+       group      string // empty for non-measure docs
+       sourceJSON string // embedded protobuf JSON ready for kind-specific 
Unmarshal
+       modRev     int64
+       deleted    bool
+}
+
+// walkSchemaPropertyShard opens one shard of the backup's
+// schema-property bluge index and invokes visit() for each doc.
+// Visitors filter on kindName and fold into their own dedup tables.
+func walkSchemaPropertyShard(shardPath string, visit func(schemaPropDoc) 
error) error {
+       reader, err := bluge.OpenReader(bluge.DefaultConfig(shardPath))
+       if err != nil {
+               return fmt.Errorf("open bluge reader: %w", err)
+       }
+       defer func() { _ = reader.Close() }()
+       dmi, err := reader.Search(context.Background(),
+               bluge.NewTopNSearch(schemaSearchSize, bluge.NewMatchAllQuery()))
+       if err != nil {
+               return fmt.Errorf("search schema docs: %w", err)
+       }
+       for {
+               next, err := dmi.Next()
+               if err != nil {
+                       return fmt.Errorf("iterate schema docs: %w", err)
+               }
+               if next == nil {
+                       return nil
+               }
+               var sourceBytes []byte
+               var deleted bool
+               if err := next.VisitStoredFields(func(field string, value 
[]byte) bool {
+                       switch field {
+                       case schemaSourceField:
+                               sourceBytes = append([]byte(nil), value...)
+                       case schemaDeletedTag:
+                               if len(value) > 0 {
+                                       deleted = true
+                               }
+                       }
+                       return true
+               }); err != nil {
+                       return fmt.Errorf("visit schema doc: %w", err)
+               }
+               if len(sourceBytes) == 0 {
+                       continue
+               }
+               var prop propertyv1.Property
+               if err := protojson.Unmarshal(sourceBytes, &prop); err != nil {
+                       continue
+               }
+               var group, srcJSON string
+               for _, t := range prop.GetTags() {
+                       sv := t.GetValue().GetStr()
+                       if sv == nil {
+                               continue
+                       }
+                       switch t.GetKey() {
+                       case "group":
+                               group = sv.GetValue()
+                       case "source":
+                               srcJSON = sv.GetValue()
+                       }
+               }
+               if vErr := visit(schemaPropDoc{
+                       propID:     prop.GetId(),
+                       kindName:   prop.GetMetadata().GetName(),
+                       group:      group,
+                       sourceJSON: srcJSON,
+                       modRev:     prop.GetMetadata().GetModRevision(),
+                       deleted:    deleted,
+               }); vErr != nil {
+                       return vErr
+               }
+       }
+}
+
+const (
+       schemaSourceField = "_source"
+       schemaDeletedTag  = "_deleted"
+       schemaShardPrefix = "shard-"
+       // schemaSearchSize sets the per-page document budget for the bluge
+       // scan; the schema-property index in production tops out at ~10k docs 
so
+       // a single page is plenty.
+       schemaSearchSize = 200000
+)
+
+// resolveSchemaRoot picks the `_schema` bluge directory to read schemas
+// from. If schemaPropertyPath is non-empty it is honored directly (used
+// by callers that mount a live cluster's schema-property PVC and want
+// to skip the backup-style `<node>/<date>/` discovery). Otherwise the
+// caller falls back to walking the backup snapshot via
+// findSchemaPropertyRoot.
+func resolveSchemaRoot(backupDir, schemaPropertyPath string) (string, error) {
+       if schemaPropertyPath != "" {
+               return schemaPropertyPath, nil
+       }
+       return findSchemaPropertyRoot(backupDir)
+}
+
+// loadMeasureSchemasFromSchemaCatalog reads every measure schema under the
+// given groups directly from the schema-property bluge index. Returns
+// (group -> measure-name -> schema). When schemaPropertyPath is set, the
+// catalog is read straight from that bluge dir (the live PVC mount path);
+// otherwise the function walks the backup-style `<node>/<date>/` layout
+// rooted at backupDir.
+//
+// The bluge index retains historical revisions across segments (a schema
+// updated N times leaves N docs sharing the same propID with distinct
+// mod_revisions). This loader mirrors the live cluster's
+// SchemaRegistry.listSchemas dedup: propID is the dedup key, the entry
+// with the highest mod_revision wins, and the winning entry is dropped
+// entirely if it carries a non-zero _deleted marker (the schema was
+// tombstoned at that revision).
+func loadMeasureSchemasFromSchemaCatalog(backupDir, schemaPropertyPath string, 
groups []string) (map[string]map[string]*measureSchemaInfo, error) {
+       byGroup, err := fetchMeasureSchemasFromSchema(backupDir, 
schemaPropertyPath, groups)
+       if err != nil {
+               return nil, err
+       }
+       out := make(map[string]map[string]*measureSchemaInfo, len(groups))
+       for _, group := range groups {
+               list := byGroup[group]
+               byName := make(map[string]*measureSchemaInfo, len(list))
+               for _, s := range list {
+                       byName[s.Name] = s
+               }
+               out[group] = byName
+       }
+       return out, nil
+}
+
+func fetchMeasureSchemasFromSchema(backupDir, schemaPropertyPath string, 
groups []string) (map[string][]*measureSchemaInfo, error) {
+       if backupDir == "" && schemaPropertyPath == "" {
+               return nil, errors.New("either backup-dir or 
schema-property-path is required")
+       }
+       wanted := make(map[string]bool, len(groups))
+       for _, g := range groups {
+               wanted[g] = true
+       }
+       schemaRoot, err := resolveSchemaRoot(backupDir, schemaPropertyPath)
+       if err != nil {
+               return nil, err
+       }
+       shards, err := os.ReadDir(schemaRoot)
+       if err != nil {
+               return nil, fmt.Errorf("read schema-property root %q: %w", 
schemaRoot, err)
+       }
+       candidates := make(map[string]*schemaCandidate)
+       for _, sh := range shards {
+               if !sh.IsDir() || !strings.HasPrefix(sh.Name(), 
schemaShardPrefix) {
+                       continue
+               }
+               shardPath := filepath.Join(schemaRoot, sh.Name())
+               if loadErr := loadMeasureSchemasFromShard(shardPath, wanted, 
candidates); loadErr != nil {
+                       return nil, fmt.Errorf("load schemas from %s: %w", 
shardPath, loadErr)
+               }
+       }
+       result := make(map[string][]*measureSchemaInfo)
+       for _, c := range candidates {
+               if c.deleted || c.info == nil {
+                       continue
+               }
+               result[c.info.Group] = append(result[c.info.Group], c.info)
+       }
+       return result, nil
+}
+
+// schemaCandidate captures the best (highest mod_revision) doc seen for
+// one propID while iterating the schema-property bluge index. deleted is
+// true when the winning revision is a tombstone, in which case the entry
+// must be skipped entirely (an older live revision must not resurrect a
+// deleted schema).
+type schemaCandidate struct {
+       info    *measureSchemaInfo
+       modRev  int64
+       deleted bool
+}
+
+// findSchemaPropertyRoot walks <backup>/<node>/<date>/ and returns the first
+// schema-property/_schema directory it finds. Hot/schema-server pods are the
+// only ones whose backup carries this catalog.
+func findSchemaPropertyRoot(backupDir string) (string, error) {
+       nodes, err := os.ReadDir(backupDir)
+       if err != nil {
+               return "", fmt.Errorf("read backup dir %q: %w", backupDir, err)
+       }
+       for _, node := range nodes {
+               if !node.IsDir() {
+                       continue
+               }
+               nodeRoot := filepath.Join(backupDir, node.Name())
+               dates, readErr := os.ReadDir(nodeRoot)
+               if readErr != nil {
+                       continue
+               }
+               for _, date := range dates {
+                       if !date.IsDir() {
+                               continue
+                       }
+                       cand := filepath.Join(nodeRoot, date.Name(), 
backupsnapshot.SchemaPropertyCatalogName, schema.SchemaGroup)
+                       if info, statErr := os.Stat(cand); statErr == nil && 
info.IsDir() {
+                               return cand, nil
+                       }
+               }
+       }
+       return "", errors.WithMessagef(errSchemaPropertyMissing,
+               "no %s/%s directory found under %q (only hot/schema-server node 
backups carry schemas)",
+               backupsnapshot.SchemaPropertyCatalogName, schema.SchemaGroup, 
backupDir)
+}

Review Comment:
   Add `date` as a parameter. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to