This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new fd64f687 Flush measure memory data to disk. (#360)
fd64f687 is described below
commit fd64f687c5efbb40a60e4b05ebb7ec5ca286a98d
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Dec 25 19:38:23 2023 +0800
Flush measure memory data to disk. (#360)
---
CHANGES.md | 1 +
banyand/internal/storage/segment.go | 7 +-
banyand/internal/storage/shard.go | 2 +-
banyand/internal/storage/storage.go | 2 +-
banyand/internal/storage/tsdb.go | 2 +-
banyand/measure/block_writer.go | 2 -
banyand/measure/flusher.go | 100 +++++++
banyand/measure/gc.go | 86 ++++++
banyand/measure/gc_test.go | 105 ++++++++
banyand/measure/introducer.go | 148 +++++++++++
banyand/measure/part.go | 127 ++++++++-
banyand/measure/part_iter_test.go | 62 +++--
banyand/measure/part_metadata.go | 62 ++++-
banyand/measure/part_test.go | 29 ++-
banyand/measure/primary_metadata.go | 3 +-
banyand/measure/query.go | 38 ++-
banyand/measure/query_test.go | 154 ++++++-----
banyand/measure/snapshot.go | 111 ++++++++
banyand/measure/snapshot_test.go | 287 +++++++++++++++++++++
banyand/measure/tstable.go | 207 +++++++++++++--
banyand/measure/tstable_test.go | 127 ++++++---
go.mod | 2 +-
pkg/fs/error.go | 22 +-
pkg/fs/file_system.go | 19 ++
pkg/fs/local_file_system.go | 71 ++++-
pkg/fs/local_file_system_nix.go | 74 ++++++
pkg/fs/local_file_system_windows.go | 61 +++++
pkg/watcher/watcher.go | 70 +++++
.../query_ondisk/query_ondisk_suite_test.go | 1 -
29 files changed, 1781 insertions(+), 201 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ad38d375..b5e03537 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,7 @@ Release Notes.
- Remove primary index.
- Measure column-based storage:
- Data ingestion and retrieval.
+ - Flush memory data to disk.
### Bugs
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index a74da252..d4b0d7ff 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -292,16 +292,17 @@ func (sc *segmentController[T]) sortLst() {
}
func (sc *segmentController[T]) load(start, end time.Time, root string) (seg
*segment[T], err error) {
+ suffix := sc.Format(start)
+ segPath := path.Join(root, fmt.Sprintf(segTemplate, suffix))
var tsTable T
- if tsTable, err = sc.tsTableCreator(sc.location, sc.position, sc.l,
timestamp.NewSectionTimeRange(start, end)); err != nil {
+ if tsTable, err = sc.tsTableCreator(lfs, segPath, sc.position, sc.l,
timestamp.NewSectionTimeRange(start, end)); err != nil {
return nil, err
}
- suffix := sc.Format(start)
ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l)
seg, err = openSegment[T](common.SetPosition(ctx, func(p
common.Position) common.Position {
p.Segment = suffix
return p
- }), start, end, path.Join(root, fmt.Sprintf(segTemplate, suffix)),
suffix, sc.segmentSize, sc.scheduler, tsTable)
+ }), start, end, segPath, suffix, sc.segmentSize, sc.scheduler, tsTable)
if err != nil {
return nil, err
}
diff --git a/banyand/internal/storage/shard.go
b/banyand/internal/storage/shard.go
index cdec81ae..19150f9c 100644
--- a/banyand/internal/storage/shard.go
+++ b/banyand/internal/storage/shard.go
@@ -75,7 +75,7 @@ func (d *database[T]) openShard(ctx context.Context, id
common.ShardID) (*shard[
return s, nil
}
-func (s *shard[T]) closer() {
+func (s *shard[T]) close() {
s.closeOnce.Do(func() {
s.scheduler.Close()
s.segmentManageStrategy.Close()
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index cfda39bf..911c5410 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -92,7 +92,7 @@ type TSTableWrapper[T TSTable] interface {
}
// TSTableCreator creates a TSTable.
-type TSTableCreator[T TSTable] func(root string, position common.Position,
+type TSTableCreator[T TSTable] func(fileSystem fs.FileSystem, root string,
position common.Position,
l *logger.Logger, timeRange timestamp.TimeRange) (T, error)
// IntervalUnit denotes the unit of a time point.
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index a425609e..fd5b648a 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -73,7 +73,7 @@ func (d *database[T]) Close() error {
d.Lock()
defer d.Unlock()
for _, s := range d.sLst {
- s.closer()
+ s.close()
}
return d.index.Close()
}
diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go
index 9fcc6e9a..fc840084 100644
--- a/banyand/measure/block_writer.go
+++ b/banyand/measure/block_writer.go
@@ -19,7 +19,6 @@ package measure
import (
"sync"
- "time"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
@@ -242,7 +241,6 @@ func (bw *blockWriter) Flush(ph *partMetadata) {
ph.BlocksCount = bw.totalBlocksCount
ph.MinTimestamp = bw.totalMinTimestamp
ph.MaxTimestamp = bw.totalMaxTimestamp
- ph.Version = time.Now().UnixNano() // TODO: use a global version
bw.mustFlushPrimaryBlock(bw.primaryBlockData)
diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go
new file mode 100644
index 00000000..4fe26408
--- /dev/null
+++ b/banyand/measure/flusher.go
@@ -0,0 +1,100 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "github.com/apache/skywalking-banyandb/pkg/watcher"
+)
+
+func (tst *tsTable) flusherLoop(flushCh chan *flusherIntroduction,
introducerWatcher watcher.Channel, epoch uint64) {
+ defer tst.loopCloser.Done()
+ epochWatcher := introducerWatcher.Add(0, tst.loopCloser.CloseNotify())
+ if epochWatcher == nil {
+ return
+ }
+
+ for {
+ select {
+ case <-tst.loopCloser.CloseNotify():
+ return
+ case <-epochWatcher.Watch():
+ var curSnapshot *snapshot
+ tst.RLock()
+ if tst.snapshot != nil && tst.snapshot.epoch > epoch {
+ curSnapshot = tst.snapshot
+ curSnapshot.incRef()
+ }
+ tst.RUnlock()
+ if curSnapshot != nil {
+ epoch = tst.flush(curSnapshot, flushCh)
+ if epoch == 0 {
+ return
+ }
+ tst.persistSnapshot(curSnapshot)
+ curSnapshot.decRef()
+ if tst.currentEpoch() != epoch {
+ continue
+ }
+ }
+ epochWatcher = introducerWatcher.Add(epoch,
tst.loopCloser.CloseNotify())
+ if epochWatcher == nil {
+ return
+ }
+ tst.gc.clean()
+ }
+ }
+}
+
+func (tst *tsTable) flush(snapshot *snapshot, flushCh chan
*flusherIntroduction) uint64 {
+ ind := generateFlusherIntroduction()
+ defer releaseFlusherIntroduction(ind)
+ for _, pw := range snapshot.parts {
+ if pw.mp == nil || pw.mp.partMetadata.TotalCount < 1 {
+ continue
+ }
+ partPath := partPath(tst.root, pw.ID())
+ pw.mp.mustFlush(tst.fileSystem, partPath)
+ newPW := newPartWrapper(nil, mustOpenFilePart(partPath,
tst.fileSystem), tst.fileSystem)
+ newPW.p.partMetadata.ID = pw.ID()
+ ind.flushed[newPW.ID()] = newPW
+ }
+ if len(ind.flushed) < 1 {
+ return snapshot.epoch
+ }
+ ind.applied = make(chan struct{})
+ select {
+ case flushCh <- ind:
+ case <-tst.loopCloser.CloseNotify():
+ return 0
+ }
+ select {
+ case <-ind.applied:
+ return snapshot.epoch
+ case <-tst.loopCloser.CloseNotify():
+ return 0
+ }
+}
+
+func (tst *tsTable) persistSnapshot(snapshot *snapshot) {
+ var partNames []string
+ for i := range snapshot.parts {
+ partNames = append(partNames, partName(snapshot.parts[i].ID()))
+ }
+ tst.mustWriteSnapshot(snapshot.epoch, partNames)
+ tst.gc.registerSnapshot(snapshot.epoch)
+}
diff --git a/banyand/measure/gc.go b/banyand/measure/gc.go
new file mode 100644
index 00000000..8fb6f608
--- /dev/null
+++ b/banyand/measure/gc.go
@@ -0,0 +1,86 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "errors"
+ "path"
+ "sort"
+
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+)
+
+type garbageCleaner struct {
+ parent *tsTable
+ snapshots []uint64
+ parts []uint64
+}
+
+func (g *garbageCleaner) registerSnapshot(snapshot uint64) {
+ g.snapshots = append(g.snapshots, snapshot)
+}
+
+func (g *garbageCleaner) submitParts(parts ...uint64) {
+ g.parts = append(g.parts, parts...)
+}
+
+func (g garbageCleaner) clean() {
+ if len(g.snapshots) > 1 {
+ g.cleanSnapshots()
+ }
+ if len(g.parts) > 0 {
+ g.cleanParts()
+ }
+}
+
+func (g *garbageCleaner) cleanSnapshots() {
+ if len(g.snapshots) < 2 {
+ return
+ }
+ if !sort.SliceIsSorted(g.snapshots, func(i, j int) bool {
+ return g.snapshots[i] < g.snapshots[j]
+ }) {
+ sort.Slice(g.snapshots, func(i, j int) bool {
+ return g.snapshots[i] < g.snapshots[j]
+ })
+ }
+ // keep the latest snapshot
+ var remainingSnapshots []uint64
+ for i := 0; i < len(g.snapshots)-1; i++ {
+ filePath := path.Join(g.parent.root,
snapshotName(g.snapshots[i]))
+ if err := g.parent.fileSystem.DeleteFile(filePath); err != nil {
+ var notExistErr *fs.FileSystemError
+ if errors.As(err, ¬ExistErr) && notExistErr.Code !=
fs.IsNotExistError {
+ g.parent.l.Warn().Err(err).Str("path",
filePath).Msg("failed to delete snapshot, will retry in next round. Please
check manually")
+ remainingSnapshots = append(remainingSnapshots,
g.snapshots[i])
+ }
+ }
+ }
+ if remainingSnapshots == nil {
+ g.snapshots = g.snapshots[len(g.snapshots)-1:]
+ return
+ }
+ remained := g.snapshots[len(g.snapshots)-1]
+ g.snapshots = g.snapshots[:0]
+ g.snapshots = append(g.snapshots, remainingSnapshots...)
+ g.snapshots = append(g.snapshots, remained)
+}
+
+func (g garbageCleaner) cleanParts() {
+ panic("implement me")
+}
diff --git a/banyand/measure/gc_test.go b/banyand/measure/gc_test.go
new file mode 100644
index 00000000..2d6d8901
--- /dev/null
+++ b/banyand/measure/gc_test.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "path/filepath"
+ "sort"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestGarbageCleanerCleanSnapshot(t *testing.T) {
+ tests := []struct {
+ name string
+ snapshotsOnDisk []uint64
+ snapshotsInGC []uint64
+ wantOnDisk []uint64
+ wantInGC []uint64
+ }{
+ {
+ name: "Test with no snapshots on disk and no snapshots
in GC",
+ },
+ {
+ name: "Test with some snapshots on disk and
no snapshots in GC",
+ snapshotsInGC: nil,
+ snapshotsOnDisk: []uint64{1, 2, 3},
+ wantInGC: nil,
+ wantOnDisk: []uint64{1, 2, 3},
+ },
+ {
+ name: "Test with no snapshots on disk and
some snapshots in GC",
+ snapshotsOnDisk: nil,
+ snapshotsInGC: []uint64{1, 2, 3},
+ wantOnDisk: nil,
+ // gc won't fix the miss match between inmemory and disk
+ wantInGC: []uint64{3},
+ },
+ {
+ name: "Test with some snapshots on disk and
some snapshots in GC",
+ snapshotsOnDisk: []uint64{1, 2, 3, 4, 5},
+ snapshotsInGC: []uint64{1, 3, 5},
+ wantOnDisk: []uint64{2, 4, 5},
+ wantInGC: []uint64{5},
+ },
+ {
+ name: "Test with unsorted",
+ snapshotsOnDisk: []uint64{1, 3, 2, 5, 4},
+ snapshotsInGC: []uint64{5, 1, 3},
+ wantOnDisk: []uint64{2, 4, 5},
+ wantInGC: []uint64{5},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ root, defFn := test.Space(require.New(t))
+ defer defFn()
+ parent := &tsTable{
+ root: root,
+ l: logger.GetLogger("test"),
+ fileSystem: fs.NewLocalFileSystem(),
+ }
+ for i := range tt.snapshotsOnDisk {
+ filePath := filepath.Join(parent.root,
snapshotName(tt.snapshotsOnDisk[i]))
+ file, err :=
parent.fileSystem.CreateFile(filePath, filePermission)
+ require.NoError(t, err)
+ require.NoError(t, file.Close())
+ }
+ g := &garbageCleaner{
+ parent: parent,
+ snapshots: tt.snapshotsInGC,
+ }
+ g.cleanSnapshots()
+ var got []uint64
+ for _, de := range
parent.fileSystem.ReadDir(parent.root) {
+ s, err := parseSnapshot(de.Name())
+ require.NoError(t, err, "failed to parse
snapshot name:%s", de.Name())
+ got = append(got, s)
+ }
+ sort.Slice(got, func(i, j int) bool { return got[i] <
got[j] })
+ assert.Equal(t, tt.wantOnDisk, got)
+ assert.Equal(t, tt.wantInGC, g.snapshots)
+ })
+ }
+}
diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go
new file mode 100644
index 00000000..17a4edef
--- /dev/null
+++ b/banyand/measure/introducer.go
@@ -0,0 +1,148 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "sync"
+
+ "github.com/apache/skywalking-banyandb/pkg/watcher"
+)
+
+type introduction struct {
+ memPart *partWrapper
+ applied chan struct{}
+}
+
+func (i *introduction) reset() {
+ i.memPart = nil
+ i.applied = nil
+}
+
+var introductionPool = sync.Pool{}
+
+func generateIntroduction() *introduction {
+ v := introductionPool.Get()
+ if v == nil {
+ return &introduction{}
+ }
+ return v.(*introduction)
+}
+
+func releaseIntroduction(i *introduction) {
+ i.reset()
+ introductionPool.Put(i)
+}
+
+type flusherIntroduction struct {
+ flushed map[uint64]*partWrapper
+ applied chan struct{}
+}
+
+func (i *flusherIntroduction) reset() {
+ for k := range i.flushed {
+ delete(i.flushed, k)
+ }
+ i.applied = nil
+}
+
+var flusherIntroductionPool = sync.Pool{}
+
+func generateFlusherIntroduction() *flusherIntroduction {
+ v := flusherIntroductionPool.Get()
+ if v == nil {
+ return &flusherIntroduction{
+ flushed: make(map[uint64]*partWrapper),
+ }
+ }
+ return v.(*flusherIntroduction)
+}
+
+func releaseFlusherIntroduction(i *flusherIntroduction) {
+ i.reset()
+ flusherIntroductionPool.Put(i)
+}
+
+func (tst *tsTable) introducerLoop(flushCh chan *flusherIntroduction,
watcherCh watcher.Channel, epoch uint64) {
+ var introducerWatchers watcher.Epochs
+ defer tst.loopCloser.Done()
+ for {
+ select {
+ case <-tst.loopCloser.CloseNotify():
+ return
+ case next := <-tst.introductions:
+ tst.introduceMemPart(next, epoch)
+ epoch++
+ case next := <-flushCh:
+ tst.introduceFlushed(next, epoch)
+ epoch++
+ case epochWatcher := <-watcherCh:
+ introducerWatchers.Add(epochWatcher)
+ }
+ curEpoch := tst.currentEpoch()
+ introducerWatchers.Notify(curEpoch)
+ }
+}
+
+func (tst *tsTable) introduceMemPart(nextIntroduction *introduction, epoch
uint64) {
+ cur := tst.currentSnapshot()
+ if cur != nil {
+ defer cur.decRef()
+ } else {
+ cur = new(snapshot)
+ }
+
+ next := nextIntroduction.memPart
+ nextSnp := cur.copyAllTo(epoch)
+ next.p.partMetadata.ID = epoch
+ nextSnp.parts = append(nextSnp.parts, next)
+ tst.replaceSnapshot(&nextSnp)
+ if nextIntroduction.applied != nil {
+ close(nextIntroduction.applied)
+ }
+}
+
+func (tst *tsTable) introduceFlushed(nextIntroduction *flusherIntroduction,
epoch uint64) {
+ cur := tst.currentSnapshot()
+ if cur == nil {
+ tst.l.Panic().Msg("current snapshot is nil")
+ }
+ defer cur.decRef()
+ nextSnp := cur.merge(epoch, nextIntroduction.flushed)
+ tst.replaceSnapshot(&nextSnp)
+ if nextIntroduction.applied != nil {
+ close(nextIntroduction.applied)
+ }
+}
+
+func (tst *tsTable) replaceSnapshot(next *snapshot) {
+ tst.Lock()
+ defer tst.Unlock()
+ if tst.snapshot != nil {
+ tst.snapshot.decRef()
+ }
+ tst.snapshot = next
+}
+
+func (tst *tsTable) currentEpoch() uint64 {
+ s := tst.currentSnapshot()
+ if s == nil {
+ return 0
+ }
+ defer s.decRef()
+ return s.epoch
+}
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 6760f10e..81fcce0e 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -18,6 +18,9 @@
package measure
import (
+ "fmt"
+ "path"
+ "path/filepath"
"sort"
"sync"
"sync/atomic"
@@ -26,9 +29,21 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+ metadataFilename = "metadata.json"
+ primaryFilename = "primary.bin"
+ metaFilename = "meta.bin"
+ timestampsFilename = "timestamps.bin"
+ fieldValuesFilename = "fields.bin"
+ tagFamiliesMetadataFilenameExt = ".tfm"
+ tagFamiliesFilenameExt = ".tf"
)
type part struct {
+ path string
meta fs.Reader
primary fs.Reader
timestamps fs.Reader
@@ -39,6 +54,22 @@ type part struct {
partMetadata partMetadata
}
+func (p *part) close() {
+ fs.MustClose(p.primary)
+ fs.MustClose(p.timestamps)
+ fs.MustClose(p.fieldValues)
+ for _, tf := range p.tagFamilies {
+ fs.MustClose(tf)
+ }
+ for _, tfh := range p.tagFamilyMetadata {
+ fs.MustClose(tfh)
+ }
+}
+
+func (p *part) String() string {
+ return fmt.Sprintf("part %d", p.partMetadata.ID)
+}
+
func openMemPart(mp *memPart) *part {
var p part
p.partMetadata = mp.partMetadata
@@ -140,6 +171,25 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints)
{
releaseBlockWriter(bsw)
}
+func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path string) {
+ fileSystem.MkdirPanicIfExist(path, dirPermission)
+
+ fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(path,
metaFilename), filePermission)
+ fs.MustFlush(fileSystem, mp.primary.Buf, filepath.Join(path,
primaryFilename), filePermission)
+ fs.MustFlush(fileSystem, mp.timestamps.Buf, filepath.Join(path,
timestampsFilename), filePermission)
+ fs.MustFlush(fileSystem, mp.fieldValues.Buf, filepath.Join(path,
fieldValuesFilename), filePermission)
+ for name, tf := range mp.tagFamilies {
+ fs.MustFlush(fileSystem, tf.Buf, filepath.Join(path,
name+tagFamiliesFilenameExt), filePermission)
+ }
+ for name, tfh := range mp.tagFamilyMetadata {
+ fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path,
name+tagFamiliesMetadataFilenameExt), filePermission)
+ }
+
+ mp.partMetadata.mustWriteMetadata(fileSystem, path)
+
+ fileSystem.SyncPath(path)
+}
+
func uncompressedDataPointSizeBytes(index int, dps *dataPoints) uint64 {
n := uint64(len(time.RFC3339Nano))
n += uint64(len(dps.fields[index].name))
@@ -171,13 +221,15 @@ func releaseMemPart(mp *memPart) {
var memPartPool sync.Pool
type partWrapper struct {
- mp *memPart
- p *part
- ref int32
+ fileSystem fs.FileSystem
+ mp *memPart
+ p *part
+ ref int32
+ mustBeDeleted uint32
}
-func newMemPartWrapper(mp *memPart, p *part) *partWrapper {
- return &partWrapper{mp: mp, p: p, ref: 1}
+func newPartWrapper(mp *memPart, p *part, fileSystem fs.FileSystem)
*partWrapper {
+ return &partWrapper{mp: mp, p: p, fileSystem: fileSystem, ref: 1}
}
func (pw *partWrapper) incRef() {
@@ -192,5 +244,70 @@ func (pw *partWrapper) decRef() {
if pw.mp != nil {
releaseMemPart(pw.mp)
pw.mp = nil
+ pw.p = nil
+ return
+ }
+ pw.p.close()
+ if atomic.LoadUint32(&pw.mustBeDeleted) == 0 {
+ return
}
+ pw.fileSystem.MustRMAll(pw.p.path)
+}
+
+func (pw *partWrapper) ID() uint64 {
+ return pw.p.partMetadata.ID
+}
+
+func mustOpenFilePart(partPath string, fileSystem fs.FileSystem) *part {
+ var p part
+ p.path = partPath
+ p.partMetadata.mustReadMetadata(fileSystem, partPath)
+
+ metaPath := path.Join(partPath, metaFilename)
+ pr := mustOpenReader(metaPath, fileSystem)
+ p.primaryBlockMetadata =
mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], newReader(pr))
+ fs.MustClose(pr)
+
+ p.primary = mustOpenReader(path.Join(partPath, primaryFilename),
fileSystem)
+ p.timestamps = mustOpenReader(path.Join(partPath, timestampsFilename),
fileSystem)
+ p.fieldValues = mustOpenReader(path.Join(partPath,
fieldValuesFilename), fileSystem)
+ ee := fileSystem.ReadDir(partPath)
+ for _, e := range ee {
+ if e.IsDir() {
+ continue
+ }
+ if filepath.Ext(e.Name()) == tagFamiliesMetadataFilenameExt {
+ if p.tagFamilyMetadata == nil {
+ p.tagFamilyMetadata = make(map[string]fs.Reader)
+ }
+ p.tagFamilyMetadata[removeExt(e.Name(),
tagFamiliesMetadataFilenameExt)] = mustOpenReader(path.Join(partPath,
e.Name()), fileSystem)
+ }
+ if filepath.Ext(e.Name()) == tagFamiliesFilenameExt {
+ if p.tagFamilies == nil {
+ p.tagFamilies = make(map[string]fs.Reader)
+ }
+ p.tagFamilies[removeExt(e.Name(),
tagFamiliesFilenameExt)] = mustOpenReader(path.Join(partPath, e.Name()),
fileSystem)
+ }
+ }
+ return &p
+}
+
+func mustOpenReader(name string, fileSystem fs.FileSystem) fs.Reader {
+ f, err := fileSystem.OpenFile(name)
+ if err != nil {
+ logger.Panicf("cannot open %q: %s", name, err)
+ }
+ return f
+}
+
+func removeExt(nameWithExt, ext string) string {
+ return nameWithExt[:len(nameWithExt)-len(ext)]
+}
+
+func partPath(root string, epoch uint64) string {
+ return filepath.Join(root, partName(epoch))
+}
+
+func partName(epoch uint64) string {
+ return fmt.Sprintf("%016x", epoch)
}
diff --git a/banyand/measure/part_iter_test.go
b/banyand/measure/part_iter_test.go
index 37579062..4aa6d7a9 100644
--- a/banyand/measure/part_iter_test.go
+++ b/banyand/measure/part_iter_test.go
@@ -23,8 +23,11 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
+ "github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/test"
)
// TODO: test more scenarios.
@@ -108,36 +111,47 @@ func Test_partIter_nextBlock(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- mp := generateMemPart()
- releaseMemPart(mp)
- mp.mustInitFromDataPoints(dps)
+ verifyPart := func(p *part) {
+ defer p.close()
+ pi := partIter{}
+ pi.init(p, tt.sids, tt.opt.minTimestamp,
tt.opt.maxTimestamp)
- p := openMemPart(mp)
-
- pi := partIter{}
- pi.init(p, tt.sids, tt.opt.minTimestamp,
tt.opt.maxTimestamp)
+ var got []blockMetadata
+ for pi.nextBlock() {
+ if pi.curBlock.seriesID == 0 {
+ t.Errorf("Expected currBlock to
be initialized, but it was nil")
+ }
+ got = append(got, pi.curBlock)
+ }
- var got []blockMetadata
- for pi.nextBlock() {
- if pi.curBlock.seriesID == 0 {
- t.Errorf("Expected currBlock to be
initialized, but it was nil")
+ if !errors.Is(pi.error(), tt.wantErr) {
+ t.Errorf("Unexpected error: got %v,
want %v", pi.err, tt.wantErr)
}
- got = append(got, pi.curBlock)
- }
- if !errors.Is(pi.error(), tt.wantErr) {
- t.Errorf("Unexpected error: got %v, want %v",
pi.err, tt.wantErr)
+ if diff := cmp.Diff(got, tt.want,
+ cmpopts.IgnoreFields(blockMetadata{},
"uncompressedSizeBytes"),
+ cmpopts.IgnoreFields(blockMetadata{},
"timestamps"),
+ cmpopts.IgnoreFields(blockMetadata{},
"field"),
+ cmpopts.IgnoreFields(blockMetadata{},
"tagFamilies"),
+ cmp.AllowUnexported(blockMetadata{}),
+ ); diff != "" {
+ t.Errorf("Unexpected blockMetadata
(-got +want):\n%s", diff)
+ }
}
+ mp := generateMemPart()
+ releaseMemPart(mp)
+ mp.mustInitFromDataPoints(dps)
- if diff := cmp.Diff(got, tt.want,
- cmpopts.IgnoreFields(blockMetadata{},
"uncompressedSizeBytes"),
- cmpopts.IgnoreFields(blockMetadata{},
"timestamps"),
- cmpopts.IgnoreFields(blockMetadata{}, "field"),
- cmpopts.IgnoreFields(blockMetadata{},
"tagFamilies"),
- cmp.AllowUnexported(blockMetadata{}),
- ); diff != "" {
- t.Errorf("Unexpected blockMetadata (-got
+want):\n%s", diff)
- }
+ p := openMemPart(mp)
+ verifyPart(p)
+ tmpDir, defFn := test.Space(require.New(t))
+ defer defFn()
+ epoch := uint64(1)
+ partPath := partPath(tmpDir, epoch)
+ fileSystem := fs.NewLocalFileSystem()
+ mp.mustFlush(fileSystem, partPath)
+ p = mustOpenFilePart(partPath, fileSystem)
+ verifyPart(p)
})
}
}
diff --git a/banyand/measure/part_metadata.go b/banyand/measure/part_metadata.go
index 6c6f6e17..0323e660 100644
--- a/banyand/measure/part_metadata.go
+++ b/banyand/measure/part_metadata.go
@@ -17,6 +17,14 @@
package measure
+import (
+ "encoding/json"
+ "path/filepath"
+
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
type partMetadata struct {
CompressedSizeBytes uint64
UncompressedSizeBytes uint64
@@ -24,15 +32,51 @@ type partMetadata struct {
BlocksCount uint64
MinTimestamp int64
MaxTimestamp int64
- Version int64
+ ID uint64
+}
+
+func (pm *partMetadata) reset() {
+ pm.CompressedSizeBytes = 0
+ pm.UncompressedSizeBytes = 0
+ pm.TotalCount = 0
+ pm.BlocksCount = 0
+ pm.MinTimestamp = 0
+ pm.MaxTimestamp = 0
+ pm.ID = 0
+}
+
+func (pm *partMetadata) mustReadMetadata(fileSystem fs.FileSystem, partPath
string) {
+ pm.reset()
+
+ metadataPath := filepath.Join(partPath, metadataFilename)
+ metadata, err := fileSystem.Read(metadataPath)
+ if err != nil {
+ logger.Panicf("cannot read %s", err)
+ return
+ }
+ if err := json.Unmarshal(metadata, pm); err != nil {
+ logger.Panicf("cannot parse %q: %s", metadataPath, err)
+ return
+ }
+
+ if pm.MinTimestamp > pm.MaxTimestamp {
+ logger.Panicf("MinTimestamp cannot exceed MaxTimestamp; got %d
vs %d", pm.MinTimestamp, pm.MaxTimestamp)
+ }
}
-func (ph *partMetadata) reset() {
- ph.CompressedSizeBytes = 0
- ph.UncompressedSizeBytes = 0
- ph.TotalCount = 0
- ph.BlocksCount = 0
- ph.MinTimestamp = 0
- ph.MaxTimestamp = 0
- ph.Version = 0
+func (pm *partMetadata) mustWriteMetadata(fileSystem fs.FileSystem, partPath
string) {
+ metadata, err := json.Marshal(pm)
+ if err != nil {
+ logger.Panicf("cannot marshal metadata: %s", err)
+ return
+ }
+ metadataPath := filepath.Join(partPath, metadataFilename)
+ n, err := fileSystem.Write(metadata, metadataPath, filePermission)
+ if err != nil {
+ logger.Panicf("cannot write metadata: %s", err)
+ return
+ }
+ if n != len(metadata) {
+ logger.Panicf("unexpected number of bytes written to %s; got
%d; want %d", metadataPath, n, len(metadata))
+ }
}
diff --git a/banyand/measure/part_test.go b/banyand/measure/part_test.go
index 29b5fdfe..5473fd76 100644
--- a/banyand/measure/part_test.go
+++ b/banyand/measure/part_test.go
@@ -21,13 +21,16 @@ import (
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/test"
)
-func Test_memPart_mustInitFromDataPoints(t *testing.T) {
+func TestMustInitFromDataPoints(t *testing.T) {
tests := []struct {
dps *dataPoints
name string
@@ -92,6 +95,30 @@ func Test_memPart_mustInitFromDataPoints(t *testing.T) {
assert.Equal(t, tt.want.MinTimestamp,
mp.partMetadata.MinTimestamp)
assert.Equal(t, tt.want.MaxTimestamp,
mp.partMetadata.MaxTimestamp)
assert.Equal(t, tt.want.TotalCount,
mp.partMetadata.TotalCount)
+ assert.Equal(t, len(mp.tagFamilies),
len(mp.tagFamilyMetadata))
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+ epoch := uint64(1)
+ path := partPath(tmpPath, epoch)
+
+ fileSystem := fs.NewLocalFileSystem()
+ mp.mustFlush(fileSystem, path)
+ p := mustOpenFilePart(path, fileSystem)
+ defer p.close()
+ assert.Equal(t, tt.want.BlocksCount,
p.partMetadata.BlocksCount)
+ assert.Equal(t, tt.want.MinTimestamp,
p.partMetadata.MinTimestamp)
+ assert.Equal(t, tt.want.MaxTimestamp,
p.partMetadata.MaxTimestamp)
+ assert.Equal(t, tt.want.TotalCount,
p.partMetadata.TotalCount)
+ if len(mp.tagFamilies) > 0 {
+ for k := range mp.tagFamilies {
+ _, ok := mp.tagFamilyMetadata[k]
+ require.True(t, ok,
"mp.tagFamilyMetadata %s not found", k)
+ _, ok = p.tagFamilies[k]
+ require.True(t, ok, "p.tagFamilies %s
not found", k)
+ _, ok = p.tagFamilyMetadata[k]
+ require.True(t, ok,
"p.tagFamilyMetadata %s not found", k)
+ }
+ }
})
}
}
diff --git a/banyand/measure/primary_metadata.go
b/banyand/measure/primary_metadata.go
index 19374283..7b09503e 100644
--- a/banyand/measure/primary_metadata.go
+++ b/banyand/measure/primary_metadata.go
@@ -100,7 +100,6 @@ func mustReadPrimaryBlockMetadata(dst
[]primaryBlockMetadata, r *reader) []prima
return dst
}
-// unmarshalPrimaryBlockMetadata appends unmarshaled from src indexBlockHeader
entries to dst and returns the result.
func unmarshalPrimaryBlockMetadata(dst []primaryBlockMetadata, src []byte)
([]primaryBlockMetadata, error) {
dstOrig := dst
for len(src) > 0 {
@@ -112,7 +111,7 @@ func unmarshalPrimaryBlockMetadata(dst
[]primaryBlockMetadata, src []byte) ([]pr
ih := &dst[len(dst)-1]
tail, err := ih.unmarshal(src)
if err != nil {
- return dstOrig, fmt.Errorf("cannot unmarshal
indexBlockHeader %d: %w", len(dst)-len(dstOrig), err)
+ return dstOrig, fmt.Errorf("cannot unmarshal
primaryBlockHeader %d: %w", len(dst)-len(dstOrig), err)
}
src = tail
}
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index d4ebadab..d0b17d7f 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -73,6 +73,11 @@ func (s *measure) Query(ctx context.Context, mqo
pbv1.MeasureQueryOptions) (pbv1
}
tsdb := s.databaseSupplier.SupplyTSDB().(storage.TSDB[*tsTable])
tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange)
+ defer func() {
+ for i := range tabWrappers {
+ tabWrappers[i].DecRef()
+ }
+ }()
sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name,
EntityValues: mqo.Entity}, mqo.Filter, mqo.Order)
if err != nil {
return nil, err
@@ -86,15 +91,24 @@ func (s *measure) Query(ctx context.Context, mqo
pbv1.MeasureQueryOptions) (pbv1
for i := range sl {
sids = append(sids, sl[i].ID)
}
- var pws []*partWrapper
var parts []*part
qo := queryOptions{
MeasureQueryOptions: mqo,
minTimestamp: mqo.TimeRange.Start.UnixNano(),
maxTimestamp: mqo.TimeRange.End.UnixNano(),
}
- for _, tw := range tabWrappers {
- pws, parts = tw.Table().getParts(pws, parts, qo)
+ var n int
+ for i := range tabWrappers {
+ s := tabWrappers[i].Table().currentSnapshot()
+ if s == nil {
+ continue
+ }
+ parts, n = s.getParts(parts, qo)
+ if n < 1 {
+ s.decRef()
+ continue
+ }
+ result.snapshots = append(result.snapshots, s)
}
// TODO: cache tstIter
var tstIter tstIter
@@ -303,7 +317,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue
{
type queryResult struct {
sidToIndex map[common.SeriesID]int
data []*blockCursor
- pws []*partWrapper
+ snapshots []*snapshot
loaded bool
orderByTS bool
ascTS bool
@@ -348,10 +362,10 @@ func (qr *queryResult) Release() {
qr.data[i] = nil
}
qr.data = qr.data[:0]
- for i := range qr.pws {
- qr.pws[i].decRef()
+ for i := range qr.snapshots {
+ qr.snapshots[i].decRef()
}
- qr.pws = qr.pws[:0]
+ qr.snapshots = qr.snapshots[:0]
}
func (qr queryResult) Len() int {
@@ -361,8 +375,8 @@ func (qr queryResult) Len() int {
func (qr queryResult) Less(i, j int) bool {
leftTS := qr.data[i].timestamps[qr.data[i].idx]
rightTS := qr.data[j].timestamps[qr.data[j].idx]
- leftVersion := qr.data[i].p.partMetadata.Version
- rightVersion := qr.data[j].p.partMetadata.Version
+ leftVersion := qr.data[i].p.partMetadata.ID
+ rightVersion := qr.data[j].p.partMetadata.ID
if qr.orderByTS {
if leftTS == rightTS {
if qr.data[i].bm.seriesID == qr.data[j].bm.seriesID {
@@ -416,7 +430,7 @@ func (qr *queryResult) merge() *pbv1.Result {
step = -1
}
result := &pbv1.Result{}
- var lastPartVersion int64
+ var lastPartVersion uint64
var lastSid common.SeriesID
for qr.Len() > 0 {
@@ -428,12 +442,12 @@ func (qr *queryResult) merge() *pbv1.Result {
if len(result.Timestamps) > 0 &&
topBC.timestamps[topBC.idx] ==
result.Timestamps[len(result.Timestamps)-1] {
- if topBC.p.partMetadata.Version > lastPartVersion {
+ if topBC.p.partMetadata.ID > lastPartVersion {
logger.Panicf("following parts version should
be less or equal to the previous one")
}
} else {
topBC.copyTo(result)
- lastPartVersion = topBC.p.partMetadata.Version
+ lastPartVersion = topBC.p.partMetadata.ID
}
topBC.idx += step
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index d69f77ad..4226dfc7 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -24,11 +24,19 @@ import (
"time"
"github.com/google/go-cmp/cmp"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"github.com/apache/skywalking-banyandb/api/common"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ "github.com/apache/skywalking-banyandb/pkg/watcher"
)
func TestQueryResult(t *testing.T) {
@@ -363,71 +371,101 @@ func TestQueryResult(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- // Initialize a tstIter object.
- tst := &tsTable{}
- defer tst.Close()
- for _, dps := range tt.dpsList {
- tst.mustAddDataPoints(dps)
- time.Sleep(100 * time.Millisecond)
- }
- queryOpts := queryOptions{
- minTimestamp: tt.minTimestamp,
- maxTimestamp: tt.maxTimestamp,
- }
- pws, pp := tst.getParts(nil, nil, queryOpts)
- defer func() {
- for _, pw := range pws {
- pw.decRef()
+ verify := func(tst *tsTable) uint64 {
+ defer tst.Close()
+ queryOpts := queryOptions{
+ minTimestamp: tt.minTimestamp,
+ maxTimestamp: tt.maxTimestamp,
}
- }()
- sids := make([]common.SeriesID, len(tt.sids))
- copy(sids, tt.sids)
- sort.Slice(sids, func(i, j int) bool {
- return sids[i] < tt.sids[j]
- })
- ti := &tstIter{}
- ti.init(pp, sids, tt.minTimestamp, tt.maxTimestamp)
+ s := tst.currentSnapshot()
+ require.NotNil(t, s)
+ defer s.decRef()
+ pp, n := s.getParts(nil, queryOpts)
+ require.Equal(t, len(tt.dpsList), n, "parts:
%+v", pp)
+ sids := make([]common.SeriesID, len(tt.sids))
+ copy(sids, tt.sids)
+ sort.Slice(sids, func(i, j int) bool {
+ return sids[i] < tt.sids[j]
+ })
+ ti := &tstIter{}
+ ti.init(pp, sids, tt.minTimestamp,
tt.maxTimestamp)
- var result queryResult
- for ti.nextBlock() {
- bc := generateBlockCursor()
- p := ti.piHeap[0]
- opts := queryOpts
- opts.TagProjection =
tagProjections[int(p.curBlock.seriesID)]
- opts.FieldProjection =
fieldProjections[int(p.curBlock.seriesID)]
- bc.init(p.p, p.curBlock, opts)
- result.data = append(result.data, bc)
- }
- defer result.Release()
- if tt.orderBySeries {
- result.sidToIndex =
make(map[common.SeriesID]int)
- for i, si := range tt.sids {
- result.sidToIndex[si] = i
+ var result queryResult
+ for ti.nextBlock() {
+ bc := generateBlockCursor()
+ p := ti.piHeap[0]
+ opts := queryOpts
+ opts.TagProjection =
tagProjections[int(p.curBlock.seriesID)]
+ opts.FieldProjection =
fieldProjections[int(p.curBlock.seriesID)]
+ bc.init(p.p, p.curBlock, opts)
+ result.data = append(result.data, bc)
}
- } else {
- result.orderByTS = true
- result.ascTS = tt.ascTS
- }
- var got []pbv1.Result
- for {
- r := result.Pull()
- if r == nil {
- break
+ defer result.Release()
+ if tt.orderBySeries {
+ result.sidToIndex =
make(map[common.SeriesID]int)
+ for i, si := range tt.sids {
+ result.sidToIndex[si] = i
+ }
+ } else {
+ result.orderByTS = true
+ result.ascTS = tt.ascTS
+ }
+ var got []pbv1.Result
+ for {
+ r := result.Pull()
+ if r == nil {
+ break
+ }
+ sort.Slice(r.TagFamilies, func(i, j
int) bool {
+ return r.TagFamilies[i].Name <
r.TagFamilies[j].Name
+ })
+ got = append(got, *r)
}
- sort.Slice(r.TagFamilies, func(i, j int) bool {
- return r.TagFamilies[i].Name <
r.TagFamilies[j].Name
- })
- got = append(got, *r)
- }
- if !errors.Is(ti.Error(), tt.wantErr) {
- t.Errorf("Unexpected error: got %v, want %v",
ti.err, tt.wantErr)
- }
+ if !errors.Is(ti.Error(), tt.wantErr) {
+ t.Errorf("Unexpected error: got %v,
want %v", ti.err, tt.wantErr)
+ }
- if diff := cmp.Diff(got, tt.want,
- protocmp.IgnoreUnknown(),
protocmp.Transform()); diff != "" {
- t.Errorf("Unexpected []pbv1.Result (-got
+want):\n%s", diff)
+ if diff := cmp.Diff(got, tt.want,
+ protocmp.IgnoreUnknown(),
protocmp.Transform()); diff != "" {
+ t.Errorf("Unexpected []pbv1.Result
(-got +want):\n%s", diff)
+ }
+ return s.epoch
}
+
+ t.Run("memory snapshot", func(t *testing.T) {
+ tst := &tsTable{
+ loopCloser: run.NewCloser(2),
+ introductions: make(chan *introduction),
+ }
+ flushCh := make(chan *flusherIntroduction)
+ introducerWatcher := make(watcher.Channel, 1)
+ go tst.introducerLoop(flushCh,
introducerWatcher, 1)
+ for _, dps := range tt.dpsList {
+ tst.mustAddDataPoints(dps)
+ time.Sleep(100 * time.Millisecond)
+ }
+ _ = verify(tst)
+ })
+
+ t.Run("file snapshot", func(t *testing.T) {
+ // Initialize a tstIter object.
+ tmpPath, defFn := test.Space(require.New(t))
+ fileSystem := fs.NewLocalFileSystem()
+ defer defFn()
+ tst, err := newTSTable(fileSystem, tmpPath,
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+ require.NoError(t, err)
+ for _, dps := range tt.dpsList {
+ tst.mustAddDataPoints(dps)
+ time.Sleep(100 * time.Millisecond)
+ }
+ epoch := verify(tst)
+
+ // reopen the table
+ tst, err = newTSTable(fileSystem, tmpPath,
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+ require.NoError(t, err)
+ assert.Equal(t, epoch, verify(tst))
+ })
})
}
}
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
new file mode 100644
index 00000000..ddc995dc
--- /dev/null
+++ b/banyand/measure/snapshot.go
@@ -0,0 +1,111 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "errors"
+ "fmt"
+ "path/filepath"
+ "sync/atomic"
+)
+
+func (tst *tsTable) currentSnapshot() *snapshot {
+ tst.RLock()
+ defer tst.RUnlock()
+ if tst.snapshot == nil {
+ return nil
+ }
+ s := tst.snapshot
+ s.incRef()
+ return s
+}
+
+type snapshot struct {
+ parts []*partWrapper
+ epoch uint64
+
+ ref int32
+}
+
+func (s *snapshot) getParts(dst []*part, opts queryOptions) ([]*part, int) {
+ var count int
+ for _, p := range s.parts {
+ pm := p.p.partMetadata
+ if opts.maxTimestamp < pm.MinTimestamp || opts.minTimestamp >
pm.MaxTimestamp {
+ continue
+ }
+ dst = append(dst, p.p)
+ count++
+ }
+ return dst, count
+}
+
+func (s *snapshot) incRef() {
+ atomic.AddInt32(&s.ref, 1)
+}
+
+func (s *snapshot) decRef() {
+ n := atomic.AddInt32(&s.ref, -1)
+ if n > 0 {
+ return
+ }
+ for i := range s.parts {
+ s.parts[i].decRef()
+ }
+ s.parts = s.parts[:0]
+}
+
+func (s *snapshot) copyAllTo(nextEpoch uint64) snapshot {
+ var result snapshot
+ result.epoch = nextEpoch
+ result.ref = 1
+ for i := range s.parts {
+ s.parts[i].incRef()
+ result.parts = append(result.parts, s.parts[i])
+ }
+ return result
+}
+
+func (s *snapshot) merge(nextEpoch uint64, nextParts map[uint64]*partWrapper)
snapshot {
+ var result snapshot
+ result.epoch = nextEpoch
+ result.ref = 1
+ for i := 0; i < len(s.parts); i++ {
+ if n, ok := nextParts[s.parts[i].ID()]; ok {
+ result.parts = append(result.parts, n)
+ continue
+ }
+ s.parts[i].incRef()
+ result.parts = append(result.parts, s.parts[i])
+ }
+ return result
+}
+
+func snapshotName(snapshot uint64) string {
+ return fmt.Sprintf("%016x%s", snapshot, snapshotSuffix)
+}
+
+func parseSnapshot(name string) (uint64, error) {
+ if filepath.Ext(name) != snapshotSuffix {
+ return 0, errors.New("invalid snapshot file ext")
+ }
+ if len(name) < 16 {
+ return 0, errors.New("invalid snapshot file name")
+ }
+ return parseEpoch(name[:16])
+}
diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
new file mode 100644
index 00000000..f571d4c9
--- /dev/null
+++ b/banyand/measure/snapshot_test.go
@@ -0,0 +1,287 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestSnapshotGetParts(t *testing.T) {
+ tests := []struct {
+ snapshot *snapshot
+ name string
+ dst []*part
+ expected []*part
+ opts queryOptions
+ count int
+ }{
+ {
+ name: "Test with empty snapshot",
+ snapshot: &snapshot{
+ parts: []*partWrapper{},
+ },
+ dst: []*part{},
+ opts: queryOptions{
+ minTimestamp: 0,
+ maxTimestamp: 10,
+ },
+ expected: []*part{},
+ count: 0,
+ },
+ {
+ name: "Test with non-empty snapshot and no matching
parts",
+ snapshot: &snapshot{
+ parts: []*partWrapper{
+ {
+ p: &part{partMetadata:
partMetadata{
+ MinTimestamp: 0,
+ MaxTimestamp: 5,
+ }},
+ },
+ {
+ p: &part{partMetadata:
partMetadata{
+ MinTimestamp: 6,
+ MaxTimestamp: 10,
+ }},
+ },
+ },
+ },
+ dst: []*part{},
+ opts: queryOptions{
+ minTimestamp: 11,
+ maxTimestamp: 15,
+ },
+ expected: []*part{},
+ count: 0,
+ },
+ {
+ name: "Test with non-empty snapshot and some matching
parts",
+ snapshot: &snapshot{
+ parts: []*partWrapper{
+ {
+ p: &part{partMetadata:
partMetadata{
+ MinTimestamp: 0,
+ MaxTimestamp: 5,
+ }},
+ },
+ {
+ p: &part{partMetadata:
partMetadata{
+ MinTimestamp: 6,
+ MaxTimestamp: 10,
+ }},
+ },
+ },
+ },
+ dst: []*part{},
+ opts: queryOptions{
+ minTimestamp: 5,
+ maxTimestamp: 10,
+ },
+ expected: []*part{
+ {partMetadata: partMetadata{
+ MinTimestamp: 0,
+ MaxTimestamp: 5,
+ }},
+ {partMetadata: partMetadata{
+ MinTimestamp: 6,
+ MaxTimestamp: 10,
+ }},
+ },
+ count: 2,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result, count := tt.snapshot.getParts(tt.dst, tt.opts)
+ assert.Equal(t, tt.expected, result)
+ assert.Equal(t, tt.count, count)
+ })
+ }
+}
+
+func TestSnapshotCopyAllTo(t *testing.T) {
+ tests := []struct {
+ name string
+ snapshot snapshot
+ expected snapshot
+ nextEpoch uint64
+ closePrev bool
+ }{
+ {
+ name: "Test with empty snapshot",
+ snapshot: snapshot{
+ parts: []*partWrapper{},
+ },
+ nextEpoch: 1,
+ expected: snapshot{
+ epoch: 1,
+ ref: 1,
+ parts: nil,
+ },
+ },
+ {
+ name: "Test with non-empty snapshot",
+ snapshot: snapshot{
+ parts: []*partWrapper{
+ {ref: 1},
+ {ref: 2},
+ },
+ },
+ nextEpoch: 1,
+ expected: snapshot{
+ epoch: 1,
+ ref: 1,
+ parts: []*partWrapper{
+ {ref: 2},
+ {ref: 3},
+ },
+ },
+ },
+ {
+ name: "Test with closed previous snapshot",
+ snapshot: snapshot{
+ parts: []*partWrapper{
+ {ref: 1},
+ {ref: 2},
+ },
+ },
+ nextEpoch: 1,
+ closePrev: true,
+ expected: snapshot{
+ epoch: 1,
+ ref: 1,
+ parts: []*partWrapper{
+ {ref: 1},
+ {ref: 2},
+ },
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := tt.snapshot.copyAllTo(tt.nextEpoch)
+ if tt.closePrev {
+ tt.snapshot.decRef()
+ }
+ assert.Equal(t, tt.expected, result)
+ })
+ }
+}
+
+func TestSnapshotMerge(t *testing.T) {
+ tests := []struct {
+ snapshot *snapshot
+ nextParts map[uint64]*partWrapper
+ name string
+ expected snapshot
+ nextEpoch uint64
+ closePrev bool
+ }{
+ {
+ name: "Test with empty snapshot and empty next parts",
+ snapshot: &snapshot{
+ parts: []*partWrapper{},
+ },
+ nextEpoch: 1,
+ nextParts: map[uint64]*partWrapper{},
+ expected: snapshot{
+ epoch: 1,
+ ref: 1,
+ parts: nil,
+ },
+ },
+ {
+ name: "Test with non-empty snapshot and empty next
parts",
+ snapshot: &snapshot{
+ parts: []*partWrapper{
+ {p: &part{partMetadata:
partMetadata{ID: 1}}, ref: 1},
+ {p: &part{partMetadata:
partMetadata{ID: 2}}, ref: 2},
+ },
+ },
+ nextEpoch: 1,
+ nextParts: map[uint64]*partWrapper{},
+ expected: snapshot{
+ epoch: 1,
+ ref: 1,
+ parts: []*partWrapper{
+ {p: &part{partMetadata:
partMetadata{ID: 1}}, ref: 2},
+ {p: &part{partMetadata:
partMetadata{ID: 2}}, ref: 3},
+ },
+ },
+ },
+ {
+ name: "Test with non-empty snapshot and non-empty next
parts",
+ snapshot: &snapshot{
+ parts: []*partWrapper{
+ {p: &part{partMetadata:
partMetadata{ID: 1}}, ref: 1},
+ {p: &part{partMetadata:
partMetadata{ID: 2}}, ref: 2},
+ },
+ },
+ nextEpoch: 1,
+ nextParts: map[uint64]*partWrapper{
+ 2: {p: &part{partMetadata: partMetadata{ID:
2}}, ref: 1},
+ 3: {p: &part{partMetadata: partMetadata{ID:
3}}, ref: 1},
+ },
+ expected: snapshot{
+ epoch: 1,
+ ref: 1,
+ parts: []*partWrapper{
+ {p: &part{partMetadata:
partMetadata{ID: 1}}, ref: 2},
+ {p: &part{partMetadata:
partMetadata{ID: 2}}, ref: 1},
+ },
+ },
+ },
+ {
+ name: "Test with closed previous snapshot",
+ snapshot: &snapshot{
+ parts: []*partWrapper{
+ {p: &part{partMetadata:
partMetadata{ID: 1}}, ref: 1},
+ {p: &part{partMetadata:
partMetadata{ID: 2}}, ref: 2},
+ },
+ },
+ closePrev: true,
+ nextEpoch: 1,
+ nextParts: map[uint64]*partWrapper{
+ 2: {p: &part{partMetadata: partMetadata{ID:
2}}, ref: 1},
+ 3: {p: &part{partMetadata: partMetadata{ID:
3}}, ref: 1},
+ },
+ expected: snapshot{
+ epoch: 1,
+ ref: 1,
+ parts: []*partWrapper{
+ {p: &part{partMetadata:
partMetadata{ID: 1}}, ref: 1},
+ {p: &part{partMetadata:
partMetadata{ID: 2}}, ref: 1},
+ },
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := tt.snapshot.merge(tt.nextEpoch, tt.nextParts)
+ if tt.closePrev {
+ tt.snapshot.decRef()
+ }
+ assert.Equal(t, tt.expected, result)
+ })
+ }
+}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 6ce4adbe..f3c445dc 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -19,30 +19,193 @@ package measure
import (
"container/heap"
+ "encoding/json"
"errors"
"fmt"
"io"
+ "path/filepath"
+ "sort"
+ "strconv"
"sync"
+ "time"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
+ "github.com/apache/skywalking-banyandb/pkg/watcher"
)
-func newTSTable(_ string, _ common.Position, _ *logger.Logger, _
timestamp.TimeRange) (*tsTable, error) {
- return &tsTable{}, nil
+const (
+ snapshotSuffix = ".snp"
+ filePermission = 0o600
+ dirPermission = 0o700
+)
+
+func newTSTable(fileSystem fs.FileSystem, rootPath string, _ common.Position,
l *logger.Logger, _ timestamp.TimeRange) (*tsTable, error) {
+ var tst tsTable
+ tst.fileSystem = fileSystem
+ tst.root = rootPath
+ tst.l = l
+ tst.gc.parent = &tst
+ ee := fileSystem.ReadDir(rootPath)
+ if len(ee) == 0 {
+ t := &tst
+ t.startLoop(uint64(time.Now().UnixNano()))
+ return t, nil
+ }
+ var loadedParts []uint64
+ var loadedSnapshots []uint64
+ var needToDelete []string
+ for i := range ee {
+ if ee[i].IsDir() {
+ p, err := parseEpoch(ee[i].Name())
+ if err != nil {
+ l.Info().Err(err).Msg("cannot parse part file
name. skip and delete it")
+ needToDelete = append(needToDelete,
ee[i].Name())
+ continue
+ }
+ loadedParts = append(loadedParts, p)
+ continue
+ }
+ if filepath.Ext(ee[i].Name()) != snapshotSuffix {
+ continue
+ }
+ snapshot, err := parseSnapshot(ee[i].Name())
+ if err != nil {
+ l.Info().Err(err).Msg("cannot parse snapshot file name.
skip and delete it")
+ needToDelete = append(needToDelete, ee[i].Name())
+ continue
+ }
+ loadedSnapshots = append(loadedSnapshots, snapshot)
+ tst.gc.registerSnapshot(snapshot)
+ }
+ for i := range needToDelete {
+ if err := fileSystem.DeleteFile(filepath.Join(rootPath,
needToDelete[i])); err != nil {
+ l.Warn().Err(err).Str("path", filepath.Join(rootPath,
needToDelete[i])).Msg("failed to delete part. Please check manually")
+ }
+ }
+ if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+ t := &tst
+ t.startLoop(uint64(time.Now().UnixNano()))
+ return t, nil
+ }
+ sort.Slice(loadedSnapshots, func(i, j int) bool {
+ return loadedSnapshots[i] > loadedSnapshots[j]
+ })
+ epoch := loadedSnapshots[0]
+ t := &tst
+ t.loadSnapshot(epoch, loadedParts)
+ t.startLoop(epoch)
+ return t, nil
}
type tsTable struct {
- memParts []*partWrapper
+ fileSystem fs.FileSystem
+ l *logger.Logger
+ snapshot *snapshot
+ introductions chan *introduction
+ loopCloser *run.Closer
+ root string
+ gc garbageCleaner
sync.RWMutex
}
+func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
+ parts := tst.mustReadSnapshot(epoch)
+ snp := snapshot{
+ epoch: epoch,
+ }
+ for _, partName := range loadedParts {
+ var find bool
+ for j := range parts {
+ if partName == parts[j] {
+ find = true
+ break
+ }
+ }
+ if !find {
+ tst.gc.submitParts(partName)
+ }
+ p := mustOpenFilePart(partPath(tst.root, partName),
tst.fileSystem)
+ p.partMetadata.ID = partName
+ snp.parts = append(snp.parts, newPartWrapper(nil, p,
tst.fileSystem))
+ }
+ tst.gc.clean()
+ if len(snp.parts) < 1 {
+ return
+ }
+ snp.incRef()
+ tst.snapshot = &snp
+}
+
+func (tst *tsTable) startLoop(cur uint64) {
+ tst.loopCloser = run.NewCloser(3)
+ tst.introductions = make(chan *introduction)
+ flushCh := make(chan *flusherIntroduction)
+ introducerWatcher := make(watcher.Channel, 1)
+ go tst.introducerLoop(flushCh, introducerWatcher, cur+1)
+ go tst.flusherLoop(flushCh, introducerWatcher, cur)
+}
+
+func parseEpoch(epochStr string) (uint64, error) {
+ p, err := strconv.ParseUint(epochStr, 16, 64)
+ if err != nil {
+ return 0, fmt.Errorf("cannot parse path %s: %w", epochStr, err)
+ }
+ return p, nil
+}
+
+func (tst *tsTable) mustWriteSnapshot(snapshot uint64, partNames []string) {
+ data, err := json.Marshal(partNames)
+ if err != nil {
+ logger.Panicf("cannot marshal partNames to JSON: %s", err)
+ }
+ snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
+ lf, err := tst.fileSystem.CreateLockFile(snapshotPath, filePermission)
+ if err != nil {
+ logger.Panicf("cannot create lock file %s: %s", snapshotPath,
err)
+ }
+ n, err := lf.Write(data)
+ if err != nil {
+ logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err)
+ }
+ if n != len(data) {
+ logger.Panicf("unexpected number of bytes written to %s; got
%d; want %d", snapshotPath, n, len(data))
+ }
+}
+
+func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+ snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
+ data, err := tst.fileSystem.Read(snapshotPath)
+ if err != nil {
+ logger.Panicf("cannot read %s: %s", snapshotPath, err)
+ }
+ var partNames []string
+ if err := json.Unmarshal(data, &partNames); err != nil {
+ logger.Panicf("cannot parse %s: %s", snapshotPath, err)
+ }
+ var result []uint64
+ for i := range partNames {
+ e, err := parseEpoch(partNames[i])
+ if err != nil {
+ logger.Panicf("cannot parse %s: %s", partNames[i], err)
+ }
+ result = append(result, e)
+ }
+ return result
+}
+
func (tst *tsTable) Close() error {
- tst.Lock()
- defer tst.Unlock()
- for _, p := range tst.memParts {
- p.decRef()
+ if tst.loopCloser != nil {
+ tst.loopCloser.Done()
+ tst.loopCloser.CloseThenWait()
+ }
+ tst.RLock()
+ defer tst.RUnlock()
+ if tst.snapshot != nil {
+ tst.snapshot.decRef()
}
return nil
}
@@ -56,26 +219,20 @@ func (tst *tsTable) mustAddDataPoints(dps *dataPoints) {
mp.mustInitFromDataPoints(dps)
p := openMemPart(mp)
- pw := newMemPartWrapper(mp, p)
-
- tst.Lock()
- defer tst.Unlock()
- tst.memParts = append(tst.memParts, pw)
-}
+ ind := generateIntroduction()
+ defer releaseIntroduction(ind)
+ ind.applied = make(chan struct{})
+ ind.memPart = newPartWrapper(mp, p, tst.fileSystem)
-func (tst *tsTable) getParts(dst []*partWrapper, dstPart []*part, opts
queryOptions) ([]*partWrapper, []*part) {
- tst.RLock()
- defer tst.RUnlock()
- for _, p := range tst.memParts {
- pm := p.mp.partMetadata
- if opts.maxTimestamp < pm.MinTimestamp || opts.minTimestamp >
pm.MaxTimestamp {
- continue
- }
- p.incRef()
- dst = append(dst, p)
- dstPart = append(dstPart, p.p)
+ select {
+ case tst.introductions <- ind:
+ case <-tst.loopCloser.CloseNotify():
+ return
+ }
+ select {
+ case <-ind.applied:
+ case <-tst.loopCloser.CloseNotify():
}
- return dst, dstPart
}
type tstIter struct {
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index fdfb9879..ca231e4f 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -29,7 +29,13 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ "github.com/apache/skywalking-banyandb/pkg/watcher"
)
func Test_tsTable_mustAddDataPoints(t *testing.T) {
@@ -88,19 +94,31 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- tst := &tsTable{}
+ tst := &tsTable{
+ loopCloser: run.NewCloser(2),
+ introductions: make(chan *introduction),
+ }
+ flushCh := make(chan *flusherIntroduction)
+ introducerWatcher := make(watcher.Channel, 1)
+ go tst.introducerLoop(flushCh, introducerWatcher, 1)
defer tst.Close()
for _, dps := range tt.dpsList {
tst.mustAddDataPoints(dps)
time.Sleep(100 * time.Millisecond)
}
- assert.Equal(t, tt.want, len(tst.memParts))
- var lastVersion int64
- for _, pw := range tst.memParts {
+ s := tst.currentSnapshot()
+ if s == nil {
+ s = new(snapshot)
+ }
+ defer s.decRef()
+ assert.Equal(t, tt.want, len(s.parts))
+ var lastVersion uint64
+ for _, pw := range s.parts {
+ require.Greater(t, pw.ID(), uint64(0))
if lastVersion == 0 {
- lastVersion = pw.p.partMetadata.Version
+ lastVersion = pw.ID()
} else {
- require.Less(t, lastVersion,
pw.p.partMetadata.Version)
+ require.Less(t, lastVersion, pw.ID())
}
}
})
@@ -169,43 +187,76 @@ func Test_tstIter(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- tst := &tsTable{}
- defer tst.Close()
- for _, dps := range tt.dpsList {
- tst.mustAddDataPoints(dps)
- time.Sleep(100 * time.Millisecond)
- }
- pws, pp := tst.getParts(nil, nil, queryOptions{
- minTimestamp: tt.minTimestamp,
- maxTimestamp: tt.maxTimestamp,
- })
- defer func() {
- for _, pw := range pws {
- pw.decRef()
+ verify := func(tst *tsTable) uint64 {
+ defer tst.Close()
+ s := tst.currentSnapshot()
+ if s == nil {
+ s = new(snapshot)
}
- }()
- ti := &tstIter{}
- ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp)
- var got []blockMetadata
- for ti.nextBlock() {
- if ti.piHeap[0].curBlock.seriesID == 0 {
- t.Errorf("Expected curBlock to be
initialized, but it was nil")
+ defer s.decRef()
+ pp, n := s.getParts(nil, queryOptions{
+ minTimestamp: tt.minTimestamp,
+ maxTimestamp: tt.maxTimestamp,
+ })
+ require.Equal(t, len(s.parts), n)
+ ti := &tstIter{}
+ ti.init(pp, tt.sids, tt.minTimestamp,
tt.maxTimestamp)
+ var got []blockMetadata
+ for ti.nextBlock() {
+ if ti.piHeap[0].curBlock.seriesID == 0 {
+ t.Errorf("Expected curBlock to
be initialized, but it was nil")
+ }
+ got = append(got, ti.piHeap[0].curBlock)
}
- got = append(got, ti.piHeap[0].curBlock)
- }
- if !errors.Is(ti.Error(), tt.wantErr) {
- t.Errorf("Unexpected error: got %v, want %v",
ti.err, tt.wantErr)
- }
+ if !errors.Is(ti.Error(), tt.wantErr) {
+ t.Errorf("Unexpected error: got %v,
want %v", ti.err, tt.wantErr)
+ }
- if diff := cmp.Diff(got, tt.want,
- cmpopts.IgnoreFields(blockMetadata{},
"timestamps"),
- cmpopts.IgnoreFields(blockMetadata{}, "field"),
- cmpopts.IgnoreFields(blockMetadata{},
"tagFamilies"),
- cmp.AllowUnexported(blockMetadata{}),
- ); diff != "" {
- t.Errorf("Unexpected blockMetadata (-got
+want):\n%s", diff)
+ if diff := cmp.Diff(got, tt.want,
+ cmpopts.IgnoreFields(blockMetadata{},
"timestamps"),
+ cmpopts.IgnoreFields(blockMetadata{},
"field"),
+ cmpopts.IgnoreFields(blockMetadata{},
"tagFamilies"),
+ cmp.AllowUnexported(blockMetadata{}),
+ ); diff != "" {
+ t.Errorf("Unexpected blockMetadata
(-got +want):\n%s", diff)
+ }
+ return s.epoch
}
+
+ t.Run("memory snapshot", func(t *testing.T) {
+ tst := &tsTable{
+ loopCloser: run.NewCloser(2),
+ introductions: make(chan *introduction),
+ }
+ flushCh := make(chan *flusherIntroduction)
+ introducerWatcher := make(watcher.Channel, 1)
+ go tst.introducerLoop(flushCh,
introducerWatcher, 1)
+ for _, dps := range tt.dpsList {
+ tst.mustAddDataPoints(dps)
+ time.Sleep(100 * time.Millisecond)
+ }
+ verify(tst)
+ })
+
+ t.Run("file snapshot", func(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ fileSystem := fs.NewLocalFileSystem()
+ defer defFn()
+
+ tst, err := newTSTable(fileSystem, tmpPath,
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+ require.NoError(t, err)
+ for _, dps := range tt.dpsList {
+ tst.mustAddDataPoints(dps)
+ time.Sleep(100 * time.Millisecond)
+ }
+ epoch := verify(tst)
+
+ // reopen the table
+ tst, err = newTSTable(fileSystem, tmpPath,
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+ require.NoError(t, err)
+ assert.Equal(t, epoch, verify(tst))
+ })
})
}
}
diff --git a/go.mod b/go.mod
index 3d57a1a9..4a9f0d19 100644
--- a/go.mod
+++ b/go.mod
@@ -139,7 +139,7 @@ require (
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.19.0 // indirect
- golang.org/x/sys v0.15.0 // indirect
+ golang.org/x/sys v0.15.0
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.16.1 // indirect
diff --git a/pkg/fs/error.go b/pkg/fs/error.go
index a54c513a..8247d9e3 100644
--- a/pkg/fs/error.go
+++ b/pkg/fs/error.go
@@ -20,17 +20,19 @@ package fs
import "fmt"
+// FileSystemError code.
const (
- isExistError = 0
- isNotExistError = 1
- permissionError = 2
- openError = 3
- deleteError = 4
- writeError = 5
- readError = 6
- flushError = 7
- closeError = 8
- otherError = 9
+ isExistError = iota
+ IsNotExistError
+ permissionError
+ openError
+ deleteError
+ writeError
+ readError
+ flushError
+ closeError
+ lockError
+ otherError
)
// FileSystemError implements the Error interface.
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 098287e2..da483846 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -94,12 +94,20 @@ type FileSystem interface {
ReadDir(dirname string) []DirEntry
// Create and open the file by specified name and mode.
CreateFile(name string, permission Mode) (File, error)
+ // Create and open lock file by specified name and mode.
+ CreateLockFile(name string, permission Mode) (File, error)
+ // Open the file by specified name and mode.
+ OpenFile(name string) (File, error)
// Flush mode, which flushes all data to one file.
Write(buffer []byte, name string, permission Mode) (int, error)
+ // Read the entire file using streaming read.
+ Read(name string) ([]byte, error)
// Delete the file.
DeleteFile(name string) error
// Delete the directory.
MustRMAll(path string)
+ // SyncPath the directory of file.
+ SyncPath(path string)
}
// DirEntry is the interface that wraps the basic information about a file or
directory.
@@ -111,6 +119,17 @@ type DirEntry interface {
IsDir() bool
}
+// MustFlush flushes all data to one file and panics if it cannot write all
data.
+func MustFlush(fs FileSystem, buffer []byte, name string, permission Mode) {
+ n, err := fs.Write(buffer, name, permission)
+ if err != nil {
+ logger.GetLogger().Panic().Err(err).Str("path",
name).Msg("cannot write data")
+ }
+ if n != len(buffer) {
+ logger.GetLogger().Panic().Int("written", n).Int("expected",
len(buffer)).Str("path", name).Msg("BUG: writer wrote wrong number of bytes")
+ }
+}
+
// MustWriteData writes data to w and panics if it cannot write all data.
func MustWriteData(w Writer, data []byte) {
if len(data) == 0 {
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 4de839ff..006d7245 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -25,6 +25,7 @@ import (
"io"
"os"
"path/filepath"
+ "time"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -59,7 +60,7 @@ func readErrorHandle(operation string, err error, name
string, size int) (int, e
return size, err
case os.IsNotExist(err):
return size, &FileSystemError{
- Code: isNotExistError,
+ Code: IsNotExistError,
Message: fmt.Sprintf("%s failed, file is not exist,
file name: %s, error message: %s", operation, name, err),
}
case os.IsPermission(err):
@@ -159,6 +160,31 @@ func (fs *localFileSystem) CreateFile(name string,
permission Mode) (File, error
}
}
+func (fs *localFileSystem) OpenFile(name string) (File, error) {
+ file, err := os.Open(name)
+ switch {
+ case err == nil:
+ return &LocalFile{
+ file: file,
+ }, nil
+ case os.IsNotExist(err):
+ return nil, &FileSystemError{
+ Code: IsNotExistError,
+ Message: fmt.Sprintf("File is not exist, file name:
%s,error message: %s", name, err),
+ }
+ case os.IsPermission(err):
+ return nil, &FileSystemError{
+ Code: permissionError,
+ Message: fmt.Sprintf("There is not enough permission,
file name: %s, error message: %s", name, err),
+ }
+ default:
+ return nil, &FileSystemError{
+ Code: otherError,
+ Message: fmt.Sprintf("Create file return error, file
name: %s,error message: %s", name, err),
+ }
+ }
+}
+
// Write flushes all data to one file.
func (fs *localFileSystem) Write(buffer []byte, name string, permission Mode)
(int, error) {
file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC,
os.FileMode(permission))
@@ -194,6 +220,30 @@ func (fs *localFileSystem) Write(buffer []byte, name
string, permission Mode) (i
return size, nil
}
+// Read is used to read the entire file using streaming read.
+func (fs *localFileSystem) Read(name string) ([]byte, error) {
+ data, err := os.ReadFile(name)
+ switch {
+ case err == nil:
+ return data, nil
+ case os.IsNotExist(err):
+ return data, &FileSystemError{
+ Code: IsNotExistError,
+ Message: fmt.Sprintf("File is not exist, file name: %s,
error message: %s", name, err),
+ }
+ case os.IsPermission(err):
+ return data, &FileSystemError{
+ Code: permissionError,
+ Message: fmt.Sprintf("There is not enough permission,
file name: %s, error message: %s", name, err),
+ }
+ default:
+ return data, &FileSystemError{
+ Code: otherError,
+ Message: fmt.Sprintf("Read file error, file name: %s,
error message: %s", name, err),
+ }
+ }
+}
+
// DeleteFile is used to delete the file.
func (fs *localFileSystem) DeleteFile(name string) error {
err := os.Remove(name)
@@ -202,7 +252,7 @@ func (fs *localFileSystem) DeleteFile(name string) error {
return nil
case os.IsNotExist(err):
return &FileSystemError{
- Code: isNotExistError,
+ Code: IsNotExistError,
Message: fmt.Sprintf("File is not exist, file name: %s,
error message: %s", name, err),
}
case os.IsPermission(err):
@@ -219,9 +269,16 @@ func (fs *localFileSystem) DeleteFile(name string) error {
}
func (fs *localFileSystem) MustRMAll(path string) {
- if err := os.RemoveAll(path); err != nil {
- logger.Panicf("failed to remove all files under %s", path)
+ if err := os.RemoveAll(path); err == nil {
+ return
+ }
+ for i := 0; i < 5; i++ {
+ time.Sleep(time.Second)
+ if err := os.RemoveAll(path); err == nil {
+ return
+ }
}
+ fs.logger.Panic().Str("path", path).Msg("failed to remove all files
under path")
}
// Write adds new data to the end of a file.
@@ -232,7 +289,7 @@ func (file *LocalFile) Write(buffer []byte) (int, error) {
return size, nil
case os.IsNotExist(err):
return size, &FileSystemError{
- Code: isNotExistError,
+ Code: IsNotExistError,
Message: fmt.Sprintf("File is not exist, file name: %s,
error message: %s", file.file.Name(), err),
}
case os.IsPermission(err):
@@ -260,7 +317,7 @@ func (file *LocalFile) Writev(iov *[][]byte) (int, error) {
size += wsize
case os.IsNotExist(err):
return size, &FileSystemError{
- Code: isNotExistError,
+ Code: IsNotExistError,
Message: fmt.Sprintf("File is not exist, file
name: %s, error message: %s", file.file.Name(), err),
}
case os.IsPermission(err):
@@ -317,7 +374,7 @@ func (file *LocalFile) Size() (int64, error) {
if err != nil {
if os.IsNotExist(err) {
return -1, &FileSystemError{
- Code: isNotExistError,
+ Code: IsNotExistError,
Message: fmt.Sprintf("File is not exist, file
name: %s, error message: %s", file.file.Name(), err),
}
} else if os.IsPermission(err) {
diff --git a/pkg/fs/local_file_system_nix.go b/pkg/fs/local_file_system_nix.go
new file mode 100644
index 00000000..75db2649
--- /dev/null
+++ b/pkg/fs/local_file_system_nix.go
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd
+// +build darwin dragonfly freebsd linux netbsd openbsd
+
+package fs
+
+import (
+ "fmt"
+ "os"
+
+ "golang.org/x/sys/unix"
+)
+
+// localFileSystem is the implementation of FileSystem interface.
+func (*localFileSystem) CreateLockFile(name string, permission Mode) (File,
error) {
+ file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC,
os.FileMode(permission))
+ switch {
+ case err == nil:
+ if err = unix.Flock(int(file.Fd()), unix.LOCK_EX|unix.LOCK_NB);
err != nil {
+ return nil, &FileSystemError{
+ Code: lockError,
+ Message: fmt.Sprintf("Cannot lock file, file
name: %s, error message: %s", name, err),
+ }
+ }
+ return &LocalFile{
+ file: file,
+ }, nil
+ case os.IsExist(err):
+ return nil, &FileSystemError{
+ Code: isExistError,
+ Message: fmt.Sprintf("File is exist, file name:
%s,error message: %s", name, err),
+ }
+ case os.IsPermission(err):
+ return nil, &FileSystemError{
+ Code: permissionError,
+ Message: fmt.Sprintf("There is not enough permission,
file name: %s, permission: %d,error message: %s", name, permission, err),
+ }
+ default:
+ return nil, &FileSystemError{
+ Code: otherError,
+ Message: fmt.Sprintf("Create file return error, file
name: %s,error message: %s", name, err),
+ }
+ }
+}
+
+func (fs *localFileSystem) SyncPath(name string) {
+ file, err := os.Open(name)
+ if err != nil {
+ fs.logger.Panic().Str("name", name).Err(err).Msg("failed to
open file")
+ }
+ if err := file.Sync(); err != nil {
+ _ = file.Close()
+ fs.logger.Panic().Str("name", name).Err(err).Msg("failed to
sync file")
+ }
+ if err := file.Close(); err != nil {
+ fs.logger.Panic().Str("name", name).Err(err).Msg("failed to
close file")
+ }
+}
diff --git a/pkg/fs/local_file_system_windows.go
b/pkg/fs/local_file_system_windows.go
new file mode 100644
index 00000000..ab6d5c91
--- /dev/null
+++ b/pkg/fs/local_file_system_windows.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fs
+
+import (
+ "fmt"
+ "os"
+
+ "golang.org/x/sys/windows"
+)
+
+// localFileSystem is the implementation of FileSystem interface.
+func (*localFileSystem) CreateLockFile(name string, permission Mode) (File,
error) {
+ file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC,
os.FileMode(permission))
+ switch {
+ case err == nil:
+ lockFlags := uint32(windows.LOCKFILE_FAIL_IMMEDIATELY)
+ lockFlags |= uint32(windows.LOCKFILE_EXCLUSIVE_LOCK)
+ if err = windows.LockFileEx(windows.Handle(file.Fd()),
lockFlags, 0, 1, 0, &windows.Overlapped{}); err != nil {
+ return nil, &FileSystemError{
+ Code: lockError,
+ Message: fmt.Sprintf("Cannot lock file, file
name: %s, error message: %s", name, err),
+ }
+ }
+ return &LocalFile{
+ file: file,
+ }, nil
+ case os.IsExist(err):
+ return nil, &FileSystemError{
+ Code: isExistError,
+ Message: fmt.Sprintf("File is exist, file name:
%s,error message: %s", name, err),
+ }
+ case os.IsPermission(err):
+ return nil, &FileSystemError{
+ Code: permissionError,
+ Message: fmt.Sprintf("There is not enough permission,
file name: %s, permission: %d,error message: %s", name, permission, err),
+ }
+ default:
+ return nil, &FileSystemError{
+ Code: otherError,
+ Message: fmt.Sprintf("Create file return error, file
name: %s,error message: %s", name, err),
+ }
+ }
+}
+
+func (fs *localFileSystem) SyncPath(_ string) {}
diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go
new file mode 100644
index 00000000..eaedaa38
--- /dev/null
+++ b/pkg/watcher/watcher.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package watcher provides a watcher to watch the epoch.
+package watcher
+
+// Epoch is a epoch watcher.
+// It will be notified when the epoch is reached.
+type Epoch struct {
+ ch chan struct{}
+ epoch uint64
+}
+
+// Watch returns a channel that will be notified when the epoch is reached.
+func (e *Epoch) Watch() <-chan struct{} {
+ return e.ch
+}
+
+// Epochs is a list of epoch watchers.
+type Epochs []*Epoch
+
+// Add adds a epoch watcher.
+func (e *Epochs) Add(epoch *Epoch) {
+ *e = append(*e, epoch)
+}
+
+// Notify notifies all epoch watchers that the epoch is reached.
+func (e *Epochs) Notify(epoch uint64) {
+ var remained Epochs
+ for _, ep := range *e {
+ if ep.epoch < epoch {
+ close(ep.ch)
+ continue
+ }
+ remained.Add(ep)
+ }
+ *e = remained
+}
+
+// Channel is a channel of epoch watchers.
+type Channel chan *Epoch
+
+// Add adds a epoch watcher.
+// It returns nil if the channel is closed.
+func (w Channel) Add(epoch uint64, closeCh <-chan struct{}) *Epoch {
+ ep := &Epoch{
+ epoch: epoch,
+ ch: make(chan struct{}, 1),
+ }
+ select {
+ case w <- ep:
+ case <-closeCh:
+ return nil
+ }
+ return ep
+}
diff --git
a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
index d6846ee3..ac89d108 100644
--- a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
+++ b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
@@ -42,7 +42,6 @@ import (
)
func TestIntegrationQueryOnDisk(t *testing.T) {
- t.Skip("skip on-disk integration test until measure parts flushing is
supported")
RegisterFailHandler(Fail)
RunSpecs(t, "Integration Query OnDisk Suite",
Label(integration_standalone.Labels...))
}