This is an automated email from the ASF dual-hosted git repository.

mrproliu pushed a commit to branch lifecycle-oom
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/lifecycle-oom by this push:
     new 12bc68b30 feat(lifecycle): throttle migration under receiver memory 
pressure; fix bluge dedup OOM
12bc68b30 is described below

commit 12bc68b30495778aa44bf673b7f11fad21945947
Author: mrproliu <[email protected]>
AuthorDate: Fri Jun 26 13:29:26 2026 +0800

    feat(lifecycle): throttle migration under receiver memory pressure; fix 
bluge dedup OOM
---
 banyand/measure/export_test.go                   |  37 ++
 banyand/measure/migration_oom_throttling_test.go | 408 +++++++++++++++++++++++
 2 files changed, 445 insertions(+)

diff --git a/banyand/measure/export_test.go b/banyand/measure/export_test.go
new file mode 100644
index 000000000..e0fbb9e5b
--- /dev/null
+++ b/banyand/measure/export_test.go
@@ -0,0 +1,37 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "time"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+)
+
+// RegisterMeasureSeriesSyncHandlerForTest wires the REAL measure series-index
+// sync handler (setUpSyncSeriesCallback) of the supplied standalone service 
onto
+// the supplied chunked-sync server, injecting the test-controlled protector 
and
+// memWaitTimeout. It is the production registration from svc_data.go, exposed 
to
+// the external measure_test package so an integration test can drive the 
receive
+// path against a real target tsdb without spinning up the full data service.
+func RegisterMeasureSeriesSyncHandlerForTest(svc Service, server queue.Server, 
pm protector.Memory, memWaitTimeout time.Duration) {
+       s := svc.(*standalone)
+       server.RegisterChunkedSyncHandler(data.TopicMeasureSeriesSync, 
setUpSyncSeriesCallback(s.l, s.schemaRepo, pm, memWaitTimeout))
+}
diff --git a/banyand/measure/migration_oom_throttling_test.go 
b/banyand/measure/migration_oom_throttling_test.go
new file mode 100644
index 000000000..69ced1666
--- /dev/null
+++ b/banyand/measure/migration_oom_throttling_test.go
@@ -0,0 +1,408 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure_test
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "os"
+       "path/filepath"
+       "strings"
+       "sync/atomic"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/spf13/cobra"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/measure"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/service"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       obsservice 
"github.com/apache/skywalking-banyandb/banyand/observability/services"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/banyand/query"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/banyand/queue/pub"
+       "github.com/apache/skywalking-banyandb/banyand/queue/sub"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+)
+
+// controllablePM is a protector.Memory whose State() is flippable at runtime.
+// All other Memory methods 
(AvailableBytes/GetLimit/AcquireResource/ShouldCache
+// + run.PreRunner/Config/Service) delegate to the embedded real protector so 
the
+// receive handler behaves exactly like production except for the scripted 
state.
+type controllablePM struct {
+       protector.Memory
+       high atomic.Bool
+}
+
+// State reports StateHigh while the test holds the protector high, StateLow 
otherwise.
+func (c *controllablePM) State() protector.State {
+       if c.high.Load() {
+               return protector.StateHigh
+       }
+       return protector.StateLow
+}
+
+// migration OOM-throttling RECEIVE path, exercised end-to-end through a REAL
+// chunked-sync gRPC sender (pub) and receiver (sub) wired to the REAL measure
+// series-index sync handler (setUpSyncSeriesCallback) over a REAL target tsdb.
+// The ONLY fake is the protector: a controllablePM flipped between HIGH and 
LOW.
+//
+//   - While HIGH: syncSeriesCallback.HandleFileChunk returns 
queue.ErrServerBusy
+//     at chunk entry; the sub server translates that into 
SYNC_STATUS_SERVER_BUSY,
+//     which the pub client surfaces as an error wrapping queue.ErrServerBusy. 
The
+//     test asserts the migration of the series-index part is rejected as BUSY.
+//   - After LOW: the same part migrates cleanly; the handler introduces the
+//     external series-index segment (CompleteSegment) into the target tsdb. 
The
+//     test asserts (Eventually) the series index lands and the migrated tree 
is
+//     queryable, proving recovery.
+var _ = Describe("measure lifecycle tier-migration RECEIVE path under memory 
pressure", func() {
+       const (
+               group       = "oom_throttle_migration"
+               measureName = "oom_throttle_metric"
+       )
+
+       It("rejects the series-index part with SERVER_BUSY while memory is 
HIGH, then completes once it drops to LOW", func() {
+               ctx := context.TODO()
+               twoDayInterval := storage.IntervalRule{Unit: storage.DAY, Num: 
2}
+
+               workspace := GinkgoT().TempDir()
+               sourceRoot := filepath.Join(workspace, "source")
+               targetRoot := filepath.Join(workspace, "target")
+               Expect(os.MkdirAll(sourceRoot, 0o755)).To(Succeed())
+
+               // --- 1. Build a REAL source measure tree with a committed 
sidx segment ---
+               srcSvcs, srcDeferFn := setUpMigrationTarget(sourceRoot)
+               sourceDown := func() {
+                       if srcDeferFn != nil {
+                               srcDeferFn()
+                               srcDeferFn = nil
+                       }
+               }
+               defer sourceDown()
+               Eventually(func() bool {
+                       _, ok := srcSvcs.measure.LoadGroup("sw_metric")
+                       return ok
+               }).WithTimeout(30 * time.Second).Should(BeTrue())
+               registerMigrationE2EGroup(srcSvcs, group, measureName)
+               Eventually(func() bool {
+                       _, ok := srcSvcs.measure.LoadGroup(group)
+                       return ok
+               }).WithTimeout(30 * time.Second).Should(BeTrue())
+               Eventually(func() error {
+                       _, err := 
srcSvcs.measure.Measure(&commonv1.Metadata{Name: measureName, Group: group})
+                       return err
+               }).WithTimeout(30*time.Second).Should(Succeed(),
+                       "source service should resolve the measure before 
writes are published")
+
+               // Two timestamps in a single DAY×2 bucket each, producing two 
on-disk segs.
+               nowBucket := twoDayInterval.Standard(time.Now().UTC())
+               day1 := nowBucket.AddDate(0, 0, -4).Add(8 * time.Hour)
+               day2 := nowBucket.AddDate(0, 0, -2).Add(8 * time.Hour)
+               writeMigrationE2EPoints(srcSvcs, group, measureName, day1, 2, 0)
+               writeMigrationE2EPoints(srcSvcs, group, measureName, day2, 4, 2)
+
+               Eventually(func() int {
+                       info, err := srcSvcs.measure.CollectDataInfo(ctx, group)
+                       if err != nil || info == nil {
+                               return 0
+                       }
+                       return len(info.SegmentInfo)
+               
}).WithTimeout(60*time.Second).WithPolling(time.Second).Should(Equal(2),
+                       "expected 2 DAY×2 segments on disk after writes are 
flushed")
+
+               sourceDataPath := srcSvcs.measure.(interface{ GetDataPath() 
string }).GetDataPath()
+               sourceGroupRoot := filepath.Join(sourceDataPath, group)
+               Eventually(func() bool {
+                       return allSidxDirsHaveSnapshot(sourceGroupRoot)
+               
}).WithTimeout(60*time.Second).WithPolling(time.Second).Should(BeTrue(),
+                       "every <seg>/sidx/ under %s should carry a committed 
.snp before migration runs", sourceGroupRoot)
+
+               // Stop the source so every per-segment bluge series-index 
writer commits.
+               sourceDown()
+
+               // Collect the committed sidx .seg files (with their owning 
segment dir) so
+               // we can replay the exact bytes the lifecycle sender ships.
+               segFiles := collectSidxSegFiles(sourceGroupRoot)
+               Expect(segFiles).NotTo(BeEmpty(), "source must have at least 
one committed sidx .seg to migrate")
+               picked := segFiles[0]
+
+               // --- 2. REAL target measure receiver wired to a REAL sub gRPC 
server ---
+               targetSvcs, targetPM, targetDeferFn := 
setUpThrottleTarget(targetRoot)
+               defer targetDeferFn()
+               Eventually(func() bool {
+                       _, ok := targetSvcs.measure.LoadGroup("sw_metric")
+                       return ok
+               }).WithTimeout(30 * time.Second).Should(BeTrue())
+               registerMigrationE2EGroup(targetSvcs, group, measureName)
+               Eventually(func() bool {
+                       _, ok := targetSvcs.measure.LoadGroup(group)
+                       return ok
+               }).WithTimeout(30 * time.Second).Should(BeTrue())
+               Eventually(func() error {
+                       _, err := 
targetSvcs.measure.Measure(&commonv1.Metadata{Name: measureName, Group: group})
+                       return err
+               }).WithTimeout(30*time.Second).Should(Succeed(),
+                       "target service should resolve the measure before the 
series part is received")
+
+               // Real chunked-sync server + the REAL series handler, fed the 
fake protector.
+               // Keep memWaitTimeout small so the FinishSync WaitWhileHigh 
path stays bounded.
+               const memWaitTimeout = 2 * time.Second
+               server, serverAddr, serverDeferFn := startThrottleSyncServer()
+               defer serverDeferFn()
+               
measure.RegisterMeasureSeriesSyncHandlerForTest(targetSvcs.measure, server, 
targetPM, memWaitTimeout)
+
+               // Real pub gRPC client; resolve the target node by address (no 
metadata repo).
+               nodeName := "oom-throttle-target-node"
+               client := pub.NewWithoutMetadata(observability.BypassRegistry)
+               client.OnAddOrUpdate(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{Name: nodeName, Kind: 
schema.KindNode},
+                       Spec: &databasev1.Node{
+                               Metadata:    &commonv1.Metadata{Name: nodeName},
+                               Roles:       
[]databasev1.Role{databasev1.Role_ROLE_DATA},
+                               GrpcAddress: serverAddr,
+                       },
+               })
+               var chunkedClient queue.ChunkedSyncClient
+               Eventually(func() error {
+                       var clientErr error
+                       chunkedClient, clientErr = 
client.NewChunkedSyncClient(nodeName, 1024)
+                       return clientErr
+               }).WithTimeout(30 * time.Second).Should(Succeed())
+               defer func() {
+                       _ = chunkedClient.Close()
+               }()
+
+               // mkSeriesPart rebuilds fresh offset-0 readers for the picked 
sidx .seg, so
+               // each migration attempt streams the same bytes (the sender 
reopens on retry).
+               lfs := fs.NewLocalFileSystem()
+               mkSeriesPart := func() queue.StreamingPartData {
+                       f, openErr := lfs.OpenFile(picked.path)
+                       Expect(openErr).NotTo(HaveOccurred())
+                       return queue.StreamingPartData{
+                               Group:        group,
+                               ShardID:      0,
+                               Topic:        
data.TopicMeasureSeriesSync.String(),
+                               Files:        []queue.FileInfo{{Name: 
picked.name, Reader: f.SequentialRead()}},
+                               MinTimestamp: picked.minTS,
+                               MaxTimestamp: picked.maxTS,
+                       }
+               }
+
+               // --- 3. HIGH → assert SERVER_BUSY is observed on the receive 
path ---
+               targetPM.high.Store(true)
+               _, busyErr := chunkedClient.SyncStreamingParts(ctx, 
[]queue.StreamingPartData{mkSeriesPart()})
+               Expect(busyErr).To(HaveOccurred(),
+                       "while memory is HIGH the receiver must shed the series 
part instead of accepting it")
+               Expect(errors.Is(busyErr, queue.ErrServerBusy)).To(BeTrue(),
+                       "the receive-side SYNC_STATUS_SERVER_BUSY must surface 
to the sender as queue.ErrServerBusy, got: %v", busyErr)
+
+               // The target must not have introduced the series index while 
busy.
+               Consistently(func() bool {
+                       return targetSidxHasSnapshot(filepath.Join(targetRoot, 
"measure", "data", group))
+               
}).WithTimeout(time.Second).WithPolling(200*time.Millisecond).Should(BeFalse(),
+                       "no series-index segment should be introduced into the 
target while memory is HIGH")
+
+               // --- 4. Drop to LOW (normal) ---
+               targetPM.high.Store(false)
+
+               // --- 5. Eventually the migration COMPLETES and the series 
index lands ---
+               Eventually(func() error {
+                       result, syncErr := 
chunkedClient.SyncStreamingParts(ctx, []queue.StreamingPartData{mkSeriesPart()})
+                       if syncErr != nil {
+                               return fmt.Errorf("sync still failing after 
LOW: %w", syncErr)
+                       }
+                       if !result.Success {
+                               return fmt.Errorf("sync result not successful: 
failedParts=%v", result.FailedParts)
+                       }
+                       return nil
+               
}).WithTimeout(30*time.Second).WithPolling(500*time.Millisecond).Should(Succeed(),
+                       "once memory recovers to LOW the series-index part must 
migrate cleanly")
+
+               // The receiver introduced the external series-index segment 
into the target.
+               Eventually(func() bool {
+                       return targetSidxHasSnapshot(filepath.Join(targetRoot, 
"measure", "data", group))
+               
}).WithTimeout(30*time.Second).WithPolling(500*time.Millisecond).Should(BeTrue(),
+                       "after recovery the target must carry a committed 
series-index snapshot for the migrated segment")
+       })
+})
+
+// sidxSegFile locates one committed sidx .seg file under the source tree and 
the
+// time range of its owning segment, so the test can frame the 
StreamingPartData
+// exactly as the lifecycle sender's createStreamingSegmentFromFiles does.
+type sidxSegFile struct {
+       path  string
+       name  string
+       minTS int64
+       maxTS int64
+}
+
+// collectSidxSegFiles walks <groupRoot>/seg-*/sidx and returns every *.seg 
file
+// with the [start, end) of its owning segment (parsed from the seg dir name on
+// the DAY×2 grid), which the receive handler uses to route the external 
segment.
+func collectSidxSegFiles(groupRoot string) []sidxSegFile {
+       twoDayInterval := storage.IntervalRule{Unit: storage.DAY, Num: 2}
+       segs, err := os.ReadDir(groupRoot)
+       Expect(err).NotTo(HaveOccurred())
+       var out []sidxSegFile
+       for _, seg := range segs {
+               if !seg.IsDir() || !strings.HasPrefix(seg.Name(), "seg-") {
+                       continue
+               }
+               start, parseErr := parseSegDirStart(seg.Name())
+               if parseErr != nil {
+                       continue
+               }
+               end := twoDayInterval.NextTime(start)
+               sidxDir := filepath.Join(groupRoot, seg.Name(), "sidx")
+               entries, readErr := os.ReadDir(sidxDir)
+               if readErr != nil {
+                       continue
+               }
+               for _, e := range entries {
+                       if e.IsDir() || !strings.HasSuffix(e.Name(), ".seg") {
+                               continue
+                       }
+                       out = append(out, sidxSegFile{
+                               path:  filepath.Join(sidxDir, e.Name()),
+                               name:  e.Name(),
+                               minTS: start.UnixNano(),
+                               maxTS: end.UnixNano() - 1,
+                       })
+               }
+       }
+       return out
+}
+
+// parseSegDirStart parses a "seg-YYYYMMDD" directory name into its start 
instant.
+func parseSegDirStart(name string) (time.Time, error) {
+       const segPrefix = "seg-"
+       raw := strings.TrimPrefix(name, segPrefix)
+       return time.ParseInLocation("20060102", raw, time.Local)
+}
+
+// targetSidxHasSnapshot reports whether any <groupRoot>/seg-*/sidx carries a
+// committed .seg file (the external segment introduced by CompleteSegment).
+func targetSidxHasSnapshot(groupRoot string) bool {
+       segs, err := os.ReadDir(groupRoot)
+       if err != nil {
+               return false
+       }
+       for _, seg := range segs {
+               if !seg.IsDir() || !strings.HasPrefix(seg.Name(), "seg-") {
+                       continue
+               }
+               sidxDir := filepath.Join(groupRoot, seg.Name(), "sidx")
+               entries, readErr := os.ReadDir(sidxDir)
+               if readErr != nil {
+                       continue
+               }
+               for _, e := range entries {
+                       if !e.IsDir() && strings.HasSuffix(e.Name(), ".seg") {
+                               return true
+                       }
+               }
+       }
+       return false
+}
+
+// setUpThrottleTarget brings up a REAL standalone measure receiver whose 
memory
+// protector is a controllablePM (so the test can flip HIGH/LOW), returning the
+// services bundle and the protector handle.
+func setUpThrottleTarget(measureRootPath string) (*services, *controllablePM, 
func()) {
+       pipeline := queue.Local()
+       metadataService, err := service.NewService()
+       Expect(err).NotTo(HaveOccurred())
+       metricSvc := obsservice.NewMetricService(metadataService, pipeline, 
"test", nil)
+       pm := &controllablePM{Memory: protector.NewMemory(metricSvc)}
+       measureService, err := measure.NewStandalone(metadataService, pipeline, 
nil, metricSvc, pm)
+       Expect(err).NotTo(HaveOccurred())
+       preloadSvc := &preloadMeasureService{metaSvc: metadataService}
+       querySvc, err := query.NewService(context.TODO(), nil, measureService, 
nil, metadataService, pipeline, metricSvc, false)
+       Expect(err).NotTo(HaveOccurred())
+
+       metaPath, metaDeferFunc, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       schemaPorts, err := test.AllocateFreePorts(1)
+       Expect(err).NotTo(HaveOccurred())
+       flags := []string{
+               "--schema-server-root-path=" + metaPath,
+               fmt.Sprintf("--schema-server-grpc-port=%d", schemaPorts[0]),
+               "--schema-server-grpc-host=127.0.0.1",
+               "--measure-root-path=" + measureRootPath,
+       }
+       moduleDeferFunc := test.SetupModules(flags, pipeline, metadataService, 
preloadSvc, measureService, querySvc)
+       return &services{
+                       measure:         measureService,
+                       metadataService: metadataService,
+                       pipeline:        pipeline,
+               }, pm, func() {
+                       moduleDeferFunc()
+                       metaDeferFunc()
+               }
+}
+
+// startThrottleSyncServer starts a REAL chunked-sync gRPC server and returns 
it
+// alongside its localhost address. It mirrors the proven setup in
+// banyand/queue/test: a run.Group driven by a cobra command so the server's
+// flags are registered and parsed before Run, otherwise the listener never 
binds.
+// The series handler is attached by the caller via 
RegisterMeasureSeriesSyncHandlerForTest.
+func startThrottleSyncServer() (queue.Server, string, func()) {
+       ports, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+       grpcPort := uint32(ports[0])
+       httpPort := uint32(ports[1])
+
+       server := sub.NewServerWithPorts(observability.BypassRegistry, 
"oom-throttle-sync", grpcPort, httpPort)
+       closer, deferFn := run.NewTester("oom-throttle-sync-closer")
+       g := run.NewGroup("oom-throttle-sync")
+       g.Register(closer, server)
+
+       cmd := &cobra.Command{
+               Use:                "oom-throttle-sync",
+               FParseErrWhitelist: cobra.FParseErrWhitelist{UnknownFlags: 
true},
+               Run: func(_ *cobra.Command, _ []string) {
+                       Expect(g.Run(context.Background())).To(Succeed())
+               },
+       }
+       cmd.Flags().AddFlagSet(g.RegisterFlags().FlagSet)
+       go func() {
+               defer GinkgoRecover()
+               Expect(cmd.Execute()).To(Succeed())
+       }()
+
+       addr := fmt.Sprintf("localhost:%d", grpcPort)
+       Eventually(func() error {
+               return helpers.HealthCheck(addr, 10*time.Second, 10*time.Second,
+                       
grpc.WithTransportCredentials(insecure.NewCredentials()))()
+       }).WithTimeout(30 * time.Second).Should(Succeed())
+
+       return server, addr, deferFn
+}

Reply via email to