This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch measure-flusher in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 117f6171dbe5638c6b1726f4d894ae5253d71d65 Author: Gao Hongtao <[email protected]> AuthorDate: Sat Dec 23 08:31:10 2023 +0800 Add introducer and flusher Signed-off-by: Gao Hongtao <[email protected]> --- banyand/internal/storage/segment.go | 2 +- banyand/internal/storage/storage.go | 2 +- banyand/measure/block_writer.go | 2 - banyand/measure/flusher.go | 88 +++++++++++ banyand/measure/gc.go | 67 +++++++++ banyand/measure/gc_test.go | 105 +++++++++++++ banyand/measure/introducer.go | 152 +++++++++++++++++++ banyand/measure/part.go | 120 ++++++++++++++- banyand/measure/part_metadata.go | 62 ++++++-- banyand/measure/part_test.go | 29 +++- banyand/measure/primary_metadata.go | 3 +- banyand/measure/query.go | 33 +++-- banyand/measure/query_test.go | 12 +- banyand/measure/snapshot.go | 109 ++++++++++++++ banyand/measure/snapshot_test.go | 287 ++++++++++++++++++++++++++++++++++++ banyand/measure/tstable.go | 201 ++++++++++++++++++++++--- banyand/measure/tstable_test.go | 28 ++-- go.mod | 2 +- pkg/fs/error.go | 21 +-- pkg/fs/file_system.go | 8 + pkg/fs/local_file_system.go | 61 +++++++- pkg/fs/local_file_system_nix.go | 74 ++++++++++ pkg/watcher/watcher.go | 70 +++++++++ 23 files changed, 1452 insertions(+), 86 deletions(-) diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index a74da252..85decd8b 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -293,7 +293,7 @@ func (sc *segmentController[T]) sortLst() { func (sc *segmentController[T]) load(start, end time.Time, root string) (seg *segment[T], err error) { var tsTable T - if tsTable, err = sc.tsTableCreator(sc.location, sc.position, sc.l, timestamp.NewSectionTimeRange(start, end)); err != nil { + if tsTable, err = sc.tsTableCreator(lfs, sc.location, sc.position, sc.l, timestamp.NewSectionTimeRange(start, end)); err != nil { return nil, err } suffix := sc.Format(start) diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index cfda39bf..911c5410 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -92,7 +92,7 @@ type TSTableWrapper[T TSTable] interface { } // TSTableCreator creates a TSTable. -type TSTableCreator[T TSTable] func(root string, position common.Position, +type TSTableCreator[T TSTable] func(fileSystem fs.FileSystem, root string, position common.Position, l *logger.Logger, timeRange timestamp.TimeRange) (T, error) // IntervalUnit denotes the unit of a time point. diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go index 9fcc6e9a..fc840084 100644 --- a/banyand/measure/block_writer.go +++ b/banyand/measure/block_writer.go @@ -19,7 +19,6 @@ package measure import ( "sync" - "time" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" @@ -242,7 +241,6 @@ func (bw *blockWriter) Flush(ph *partMetadata) { ph.BlocksCount = bw.totalBlocksCount ph.MinTimestamp = bw.totalMinTimestamp ph.MaxTimestamp = bw.totalMaxTimestamp - ph.Version = time.Now().UnixNano() // TODO: use a global version bw.mustFlushPrimaryBlock(bw.primaryBlockData) diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go new file mode 100644 index 00000000..2609f09e --- /dev/null +++ b/banyand/measure/flusher.go @@ -0,0 +1,88 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +func (tst *tsTable) flusherLoop(flushCh chan *flusherIntroduction, introducerWatcher watcher.Channel, epoch uint64) { + defer tst.loopCloser.Done() + epochWatcher := introducerWatcher.Add(0, tst.loopCloser.CloseNotify()) + + for { + select { + case <-tst.loopCloser.CloseNotify(): + return + case <-epochWatcher.Watch(): + var curSnapshot *snapshot + tst.Lock() + if tst.snapshot != nil && tst.snapshot.epoch > epoch { + curSnapshot = tst.snapshot + curSnapshot.incRef() + } + tst.Unlock() + if curSnapshot != nil { + epoch = tst.flush(curSnapshot, flushCh) + if epoch == 0 { + return + } + tst.persistSnapshot(curSnapshot) + if tst.currentEpoch() != epoch { + continue + } + } + epochWatcher = introducerWatcher.Add(epoch, tst.loopCloser.CloseNotify()) + tst.gc.clean() + } + } +} + +func (tst *tsTable) flush(snapshot *snapshot, flushCh chan *flusherIntroduction) uint64 { + ind := generateFlusherIntroduction() + for _, pw := range snapshot.parts { + if pw.mp == nil || pw.mp.partMetadata.TotalCount < 1 { + continue + } + partPath := partPath(tst.root, pw.ID()) + pw.mp.mustFlush(tst.fileSystem, partPath) + newPW := newPartWrapper(nil, mustOpenFilePart(partPath, tst.fileSystem), tst.fileSystem) + ind.flushed[newPW.ID()] = newPW + } + ind.applied = make(chan struct{}) + select { + case flushCh <- ind: + case <-tst.loopCloser.CloseNotify(): + return 0 + } + select { + case <-ind.applied: + return snapshot.epoch + case <-tst.loopCloser.CloseNotify(): + return 0 + } +} + +func (tst *tsTable) persistSnapshot(snapshot *snapshot) { + var partNames []string + for i := range snapshot.parts { + partNames = append(partNames, snapshotName(snapshot.parts[i].ID())) + } + tst.mustWriteSnapshot(snapshot.epoch, partNames) + tst.gc.registerSnapshot(snapshot.epoch) +} diff --git a/banyand/measure/gc.go b/banyand/measure/gc.go new file mode 100644 index 00000000..dbdde01d --- /dev/null +++ b/banyand/measure/gc.go @@ -0,0 +1,67 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "path" + "sort" +) + +type garbageCleaner struct { + parent *tsTable + snapshots []uint64 + parts []uint64 +} + +func (g *garbageCleaner) registerSnapshot(snapshot uint64) { + g.snapshots = append(g.snapshots, snapshot) +} + +func (g *garbageCleaner) submitParts(parts ...uint64) { + g.parts = append(g.parts, parts...) +} + +func (g garbageCleaner) clean() { + if len(g.snapshots) > 1 { + g.cleanSnapshots() + } + if len(g.parts) > 0 { + g.cleanParts() + } +} + +func (g *garbageCleaner) cleanSnapshots() { + if len(g.snapshots) < 2 { + return + } + sort.Slice(g.snapshots, func(i, j int) bool { + return g.snapshots[i] < g.snapshots[j] + }) + // keep the latest snapshot + for i := 0; i < len(g.snapshots)-1; i++ { + filePath := path.Join(g.parent.root, snapshotName(g.snapshots[i])) + if err := g.parent.fileSystem.DeleteFile(filePath); err != nil { + g.parent.l.Warn().Err(err).Str("path", filePath).Msg("failed to delete snapshot. Please check manually") + } + } + g.snapshots = g.snapshots[len(g.snapshots)-1:] +} + +func (g garbageCleaner) cleanParts() { + panic("implement me") +} diff --git a/banyand/measure/gc_test.go b/banyand/measure/gc_test.go new file mode 100644 index 00000000..2d6d8901 --- /dev/null +++ b/banyand/measure/gc_test.go @@ -0,0 +1,105 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "path/filepath" + "sort" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" +) + +func TestGarbageCleanerCleanSnapshot(t *testing.T) { + tests := []struct { + name string + snapshotsOnDisk []uint64 + snapshotsInGC []uint64 + wantOnDisk []uint64 + wantInGC []uint64 + }{ + { + name: "Test with no snapshots on disk and no snapshots in GC", + }, + { + name: "Test with some snapshots on disk and no snapshots in GC", + snapshotsInGC: nil, + snapshotsOnDisk: []uint64{1, 2, 3}, + wantInGC: nil, + wantOnDisk: []uint64{1, 2, 3}, + }, + { + name: "Test with no snapshots on disk and some snapshots in GC", + snapshotsOnDisk: nil, + snapshotsInGC: []uint64{1, 2, 3}, + wantOnDisk: nil, + // gc won't fix the miss match between inmemory and disk + wantInGC: []uint64{3}, + }, + { + name: "Test with some snapshots on disk and some snapshots in GC", + snapshotsOnDisk: []uint64{1, 2, 3, 4, 5}, + snapshotsInGC: []uint64{1, 3, 5}, + wantOnDisk: []uint64{2, 4, 5}, + wantInGC: []uint64{5}, + }, + { + name: "Test with unsorted", + snapshotsOnDisk: []uint64{1, 3, 2, 5, 4}, + snapshotsInGC: []uint64{5, 1, 3}, + wantOnDisk: []uint64{2, 4, 5}, + wantInGC: []uint64{5}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + root, defFn := test.Space(require.New(t)) + defer defFn() + parent := &tsTable{ + root: root, + l: logger.GetLogger("test"), + fileSystem: fs.NewLocalFileSystem(), + } + for i := range tt.snapshotsOnDisk { + filePath := filepath.Join(parent.root, snapshotName(tt.snapshotsOnDisk[i])) + file, err := parent.fileSystem.CreateFile(filePath, filePermission) + require.NoError(t, err) + require.NoError(t, file.Close()) + } + g := &garbageCleaner{ + parent: parent, + snapshots: tt.snapshotsInGC, + } + g.cleanSnapshots() + var got []uint64 + for _, de := range parent.fileSystem.ReadDir(parent.root) { + s, err := parseSnapshot(de.Name()) + require.NoError(t, err, "failed to parse snapshot name:%s", de.Name()) + got = append(got, s) + } + sort.Slice(got, func(i, j int) bool { return got[i] < got[j] }) + assert.Equal(t, tt.wantOnDisk, got) + assert.Equal(t, tt.wantInGC, g.snapshots) + }) + } +} diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go new file mode 100644 index 00000000..3c124d2a --- /dev/null +++ b/banyand/measure/introducer.go @@ -0,0 +1,152 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "sync" + + "github.com/apache/skywalking-banyandb/pkg/watcher" +) + +type introduction struct { + memPart *partWrapper + applied chan struct{} +} + +func (i *introduction) reset() { + i.memPart = nil + i.applied = nil +} + +var introductionPool = sync.Pool{} + +func generateIntroduction() *introduction { + v := introductionPool.Get() + if v == nil { + return &introduction{} + } + return v.(*introduction) +} + +func releaseIntroduction(i *introduction) { + i.reset() + introductionPool.Put(i) +} + +type flusherIntroduction struct { + flushed map[uint64]*partWrapper + applied chan struct{} +} + +func (i *flusherIntroduction) reset() { + for k := range i.flushed { + delete(i.flushed, k) + } + i.applied = nil +} + +var flusherIntroductionPool = sync.Pool{} + +func generateFlusherIntroduction() *flusherIntroduction { + v := flusherIntroductionPool.Get() + if v == nil { + return &flusherIntroduction{ + flushed: make(map[uint64]*partWrapper), + } + } + return v.(*flusherIntroduction) +} + +func releaseFlusherIntroduction(i *flusherIntroduction) { + i.reset() + flusherIntroductionPool.Put(i) +} + +func (tst *tsTable) introducerLoop(flushCh chan *flusherIntroduction, watcherCh watcher.Channel, epoch uint64) { + var introducerWatchers watcher.Epochs + defer tst.loopCloser.Done() + for { + select { + case <-tst.loopCloser.CloseNotify(): + return + case next := <-tst.introductions: + tst.introduceMemPart(next, epoch) + epoch++ + case next := <-flushCh: + tst.introduceFlushed(next, epoch) + epoch++ + case epochWatcher := <-watcherCh: + introducerWatchers.Add(epochWatcher) + } + curEpoch := tst.currentEpoch() + introducerWatchers.Notify(curEpoch) + } +} + +func (tst *tsTable) introduceMemPart(nextIntroduction *introduction, epoch uint64) { + cur := tst.currentSnapshot() + if cur != nil { + defer cur.decRef() + } else { + cur = new(snapshot) + } + defer releaseIntroduction(nextIntroduction) + + next := nextIntroduction.memPart + nextSnp := cur.copyAllTo(epoch) + next.p.partMetadata.ID = epoch + nextSnp.parts = append(nextSnp.parts, next) + tst.replaceSnapshot(&nextSnp) + if nextIntroduction.applied != nil { + close(nextIntroduction.applied) + } +} + +func (tst *tsTable) introduceFlushed(nextIntroduction *flusherIntroduction, epoch uint64) { + cur := tst.currentSnapshot() + if cur == nil { + tst.l.Panic().Msg("current snapshot is nil") + } + defer func() { + cur.decRef() + releaseFlusherIntroduction(nextIntroduction) + }() + nextSnp := cur.merge(epoch, nextIntroduction.flushed) + tst.replaceSnapshot(&nextSnp) + if nextIntroduction.applied != nil { + close(nextIntroduction.applied) + } +} + +func (tst *tsTable) replaceSnapshot(next *snapshot) { + tst.Lock() + defer tst.Unlock() + if tst.snapshot != nil { + tst.snapshot.decRef() + } + tst.snapshot = next +} + +func (tst *tsTable) currentEpoch() uint64 { + s := tst.currentSnapshot() + if s == nil { + return 0 + } + defer s.decRef() + return s.epoch +} diff --git a/banyand/measure/part.go b/banyand/measure/part.go index 6760f10e..11e635b8 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -18,6 +18,9 @@ package measure import ( + "fmt" + "path" + "path/filepath" "sort" "sync" "sync/atomic" @@ -26,9 +29,21 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +const ( + metadataFilename = "metadata.json" + primaryFilename = "primary.bin" + metaFilename = "meta.bin" + timestampsFilename = "timestamps.bin" + fieldValuesFilename = "fields.bin" + tagFamiliesMetadataFilenameExt = ".tfm" + tagFamiliesFilenameExt = ".tf" ) type part struct { + path string meta fs.Reader primary fs.Reader timestamps fs.Reader @@ -39,6 +54,18 @@ type part struct { partMetadata partMetadata } +func (p *part) close() { + fs.MustClose(p.primary) + fs.MustClose(p.timestamps) + fs.MustClose(p.fieldValues) + for _, tf := range p.tagFamilies { + fs.MustClose(tf) + } + for _, tfh := range p.tagFamilyMetadata { + fs.MustClose(tfh) + } +} + func openMemPart(mp *memPart) *part { var p part p.partMetadata = mp.partMetadata @@ -140,6 +167,25 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) { releaseBlockWriter(bsw) } +func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path string) { + fileSystem.MkdirPanicIfExist(path, dirPermission) + + fileSystem.Write(mp.meta.Buf, filepath.Join(path, metaFilename), filePermission) + fileSystem.Write(mp.primary.Buf, filepath.Join(path, primaryFilename), filePermission) + fileSystem.Write(mp.timestamps.Buf, filepath.Join(path, timestampsFilename), filePermission) + fileSystem.Write(mp.fieldValues.Buf, filepath.Join(path, fieldValuesFilename), filePermission) + for name, tf := range mp.tagFamilies { + fileSystem.Write(tf.Buf, filepath.Join(path, name+tagFamiliesFilenameExt), filePermission) + } + for name, tfh := range mp.tagFamilyMetadata { + fileSystem.Write(tfh.Buf, filepath.Join(path, name+tagFamiliesMetadataFilenameExt), filePermission) + } + + mp.partMetadata.mustWriteMetadata(fileSystem, path) + + fileSystem.SyncPath(path) +} + func uncompressedDataPointSizeBytes(index int, dps *dataPoints) uint64 { n := uint64(len(time.RFC3339Nano)) n += uint64(len(dps.fields[index].name)) @@ -171,13 +217,15 @@ func releaseMemPart(mp *memPart) { var memPartPool sync.Pool type partWrapper struct { - mp *memPart - p *part - ref int32 + mp *memPart + p *part + ref int32 + mustBeDeleted uint32 + fileSystem fs.FileSystem } -func newMemPartWrapper(mp *memPart, p *part) *partWrapper { - return &partWrapper{mp: mp, p: p, ref: 1} +func newPartWrapper(mp *memPart, p *part, fileSystem fs.FileSystem) *partWrapper { + return &partWrapper{mp: mp, p: p, fileSystem: fileSystem, ref: 1} } func (pw *partWrapper) incRef() { @@ -192,5 +240,67 @@ func (pw *partWrapper) decRef() { if pw.mp != nil { releaseMemPart(pw.mp) pw.mp = nil + pw.p = nil + return + } + pw.p.close() + if atomic.LoadUint32(&pw.mustBeDeleted) == 0 { + return + } + pw.fileSystem.MustRMAll(pw.p.path) +} + +func (pw *partWrapper) ID() uint64 { + return pw.p.partMetadata.ID +} + +func mustOpenFilePart(partPath string, fileSystem fs.FileSystem) *part { + var p part + p.path = partPath + p.partMetadata.mustReadMetadata(fileSystem, partPath) + + metaPath := path.Join(partPath, metaFilename) + pr := mustOpenReader(metaPath, fileSystem) + p.primaryBlockMetadata = mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], newReader(pr)) + fs.MustClose(pr) + + p.primary = mustOpenReader(path.Join(partPath, primaryFilename), fileSystem) + p.timestamps = mustOpenReader(path.Join(partPath, timestampsFilename), fileSystem) + p.fieldValues = mustOpenReader(path.Join(partPath, fieldValuesFilename), fileSystem) + ee := fileSystem.ReadDir(partPath) + for _, e := range ee { + if e.IsDir() { + continue + } + if filepath.Ext(e.Name()) == tagFamiliesMetadataFilenameExt { + if p.tagFamilyMetadata == nil { + p.tagFamilyMetadata = make(map[string]fs.Reader) + } + p.tagFamilyMetadata[removeExt(e.Name(), tagFamiliesMetadataFilenameExt)] = mustOpenReader(path.Join(partPath, e.Name()), fileSystem) + } + if filepath.Ext(e.Name()) == tagFamiliesFilenameExt { + if p.tagFamilies == nil { + p.tagFamilies = make(map[string]fs.Reader) + } + p.tagFamilies[removeExt(e.Name(), tagFamiliesFilenameExt)] = mustOpenReader(path.Join(partPath, e.Name()), fileSystem) + } + + } + return &p +} + +func mustOpenReader(name string, fileSystem fs.FileSystem) fs.Reader { + f, err := fileSystem.OpenFile(name) + if err != nil { + logger.Panicf("cannot open %q: %s", name, err) } + return f +} + +func removeExt(nameWithExt, ext string) string { + return nameWithExt[:len(nameWithExt)-len(ext)] +} + +func partPath(root string, epoch uint64) string { + return filepath.Join(root, fmt.Sprintf("%016x", epoch)) } diff --git a/banyand/measure/part_metadata.go b/banyand/measure/part_metadata.go index 6c6f6e17..0323e660 100644 --- a/banyand/measure/part_metadata.go +++ b/banyand/measure/part_metadata.go @@ -17,6 +17,14 @@ package measure +import ( + "encoding/json" + "path/filepath" + + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + type partMetadata struct { CompressedSizeBytes uint64 UncompressedSizeBytes uint64 @@ -24,15 +32,51 @@ type partMetadata struct { BlocksCount uint64 MinTimestamp int64 MaxTimestamp int64 - Version int64 + ID uint64 +} + +func (pm *partMetadata) reset() { + pm.CompressedSizeBytes = 0 + pm.UncompressedSizeBytes = 0 + pm.TotalCount = 0 + pm.BlocksCount = 0 + pm.MinTimestamp = 0 + pm.MaxTimestamp = 0 + pm.ID = 0 +} + +func (pm *partMetadata) mustReadMetadata(fileSystem fs.FileSystem, partPath string) { + pm.reset() + + metadataPath := filepath.Join(partPath, metadataFilename) + metadata, err := fileSystem.Read(metadataPath) + if err != nil { + logger.Panicf("cannot read %s", err) + return + } + if err := json.Unmarshal(metadata, pm); err != nil { + logger.Panicf("cannot parse %q: %s", metadataPath, err) + return + } + + if pm.MinTimestamp > pm.MaxTimestamp { + logger.Panicf("MinTimestamp cannot exceed MaxTimestamp; got %d vs %d", pm.MinTimestamp, pm.MaxTimestamp) + } } -func (ph *partMetadata) reset() { - ph.CompressedSizeBytes = 0 - ph.UncompressedSizeBytes = 0 - ph.TotalCount = 0 - ph.BlocksCount = 0 - ph.MinTimestamp = 0 - ph.MaxTimestamp = 0 - ph.Version = 0 +func (pm *partMetadata) mustWriteMetadata(fileSystem fs.FileSystem, partPath string) { + metadata, err := json.Marshal(pm) + if err != nil { + logger.Panicf("cannot marshal metadata: %s", err) + return + } + metadataPath := filepath.Join(partPath, metadataFilename) + n, err := fileSystem.Write(metadata, metadataPath, filePermission) + if err != nil { + logger.Panicf("cannot write metadata: %s", err) + return + } + if n != len(metadata) { + logger.Panicf("unexpected number of bytes written to %s; got %d; want %d", metadataPath, n, len(metadata)) + } } diff --git a/banyand/measure/part_test.go b/banyand/measure/part_test.go index 29b5fdfe..5473fd76 100644 --- a/banyand/measure/part_test.go +++ b/banyand/measure/part_test.go @@ -21,13 +21,16 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/fs" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/test" ) -func Test_memPart_mustInitFromDataPoints(t *testing.T) { +func TestMustInitFromDataPoints(t *testing.T) { tests := []struct { dps *dataPoints name string @@ -92,6 +95,30 @@ func Test_memPart_mustInitFromDataPoints(t *testing.T) { assert.Equal(t, tt.want.MinTimestamp, mp.partMetadata.MinTimestamp) assert.Equal(t, tt.want.MaxTimestamp, mp.partMetadata.MaxTimestamp) assert.Equal(t, tt.want.TotalCount, mp.partMetadata.TotalCount) + assert.Equal(t, len(mp.tagFamilies), len(mp.tagFamilyMetadata)) + tmpPath, defFn := test.Space(require.New(t)) + defer defFn() + epoch := uint64(1) + path := partPath(tmpPath, epoch) + + fileSystem := fs.NewLocalFileSystem() + mp.mustFlush(fileSystem, path) + p := mustOpenFilePart(path, fileSystem) + defer p.close() + assert.Equal(t, tt.want.BlocksCount, p.partMetadata.BlocksCount) + assert.Equal(t, tt.want.MinTimestamp, p.partMetadata.MinTimestamp) + assert.Equal(t, tt.want.MaxTimestamp, p.partMetadata.MaxTimestamp) + assert.Equal(t, tt.want.TotalCount, p.partMetadata.TotalCount) + if len(mp.tagFamilies) > 0 { + for k := range mp.tagFamilies { + _, ok := mp.tagFamilyMetadata[k] + require.True(t, ok, "mp.tagFamilyMetadata %s not found", k) + _, ok = p.tagFamilies[k] + require.True(t, ok, "p.tagFamilies %s not found", k) + _, ok = p.tagFamilyMetadata[k] + require.True(t, ok, "p.tagFamilyMetadata %s not found", k) + } + } }) } } diff --git a/banyand/measure/primary_metadata.go b/banyand/measure/primary_metadata.go index 19374283..7b09503e 100644 --- a/banyand/measure/primary_metadata.go +++ b/banyand/measure/primary_metadata.go @@ -100,7 +100,6 @@ func mustReadPrimaryBlockMetadata(dst []primaryBlockMetadata, r *reader) []prima return dst } -// unmarshalPrimaryBlockMetadata appends unmarshaled from src indexBlockHeader entries to dst and returns the result. func unmarshalPrimaryBlockMetadata(dst []primaryBlockMetadata, src []byte) ([]primaryBlockMetadata, error) { dstOrig := dst for len(src) > 0 { @@ -112,7 +111,7 @@ func unmarshalPrimaryBlockMetadata(dst []primaryBlockMetadata, src []byte) ([]pr ih := &dst[len(dst)-1] tail, err := ih.unmarshal(src) if err != nil { - return dstOrig, fmt.Errorf("cannot unmarshal indexBlockHeader %d: %w", len(dst)-len(dstOrig), err) + return dstOrig, fmt.Errorf("cannot unmarshal primaryBlockHeader %d: %w", len(dst)-len(dstOrig), err) } src = tail } diff --git a/banyand/measure/query.go b/banyand/measure/query.go index d4ebadab..4451a253 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -86,15 +86,24 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 for i := range sl { sids = append(sids, sl[i].ID) } - var pws []*partWrapper var parts []*part qo := queryOptions{ MeasureQueryOptions: mqo, minTimestamp: mqo.TimeRange.Start.UnixNano(), maxTimestamp: mqo.TimeRange.End.UnixNano(), } - for _, tw := range tabWrappers { - pws, parts = tw.Table().getParts(pws, parts, qo) + var n int + for i := range tabWrappers { + s := tabWrappers[i].Table().currentSnapshot() + if s == nil { + continue + } + parts, n = s.getParts(parts, qo) + if n < 1 { + s.decRef() + continue + } + result.snapshots = append(result.snapshots, s) } // TODO: cache tstIter var tstIter tstIter @@ -303,7 +312,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue { type queryResult struct { sidToIndex map[common.SeriesID]int data []*blockCursor - pws []*partWrapper + snapshots []*snapshot loaded bool orderByTS bool ascTS bool @@ -348,10 +357,10 @@ func (qr *queryResult) Release() { qr.data[i] = nil } qr.data = qr.data[:0] - for i := range qr.pws { - qr.pws[i].decRef() + for i := range qr.snapshots { + qr.snapshots[i].decRef() } - qr.pws = qr.pws[:0] + qr.snapshots = qr.snapshots[:0] } func (qr queryResult) Len() int { @@ -361,8 +370,8 @@ func (qr queryResult) Len() int { func (qr queryResult) Less(i, j int) bool { leftTS := qr.data[i].timestamps[qr.data[i].idx] rightTS := qr.data[j].timestamps[qr.data[j].idx] - leftVersion := qr.data[i].p.partMetadata.Version - rightVersion := qr.data[j].p.partMetadata.Version + leftVersion := qr.data[i].p.partMetadata.ID + rightVersion := qr.data[j].p.partMetadata.ID if qr.orderByTS { if leftTS == rightTS { if qr.data[i].bm.seriesID == qr.data[j].bm.seriesID { @@ -416,7 +425,7 @@ func (qr *queryResult) merge() *pbv1.Result { step = -1 } result := &pbv1.Result{} - var lastPartVersion int64 + var lastPartVersion uint64 var lastSid common.SeriesID for qr.Len() > 0 { @@ -428,12 +437,12 @@ func (qr *queryResult) merge() *pbv1.Result { if len(result.Timestamps) > 0 && topBC.timestamps[topBC.idx] == result.Timestamps[len(result.Timestamps)-1] { - if topBC.p.partMetadata.Version > lastPartVersion { + if topBC.p.partMetadata.ID > lastPartVersion { logger.Panicf("following parts version should be less or equal to the previous one") } } else { topBC.copyTo(result) - lastPartVersion = topBC.p.partMetadata.Version + lastPartVersion = topBC.p.partMetadata.ID } topBC.idx += step diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index d69f77ad..44ad9c52 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" "github.com/apache/skywalking-banyandb/api/common" @@ -374,12 +375,11 @@ func TestQueryResult(t *testing.T) { minTimestamp: tt.minTimestamp, maxTimestamp: tt.maxTimestamp, } - pws, pp := tst.getParts(nil, nil, queryOpts) - defer func() { - for _, pw := range pws { - pw.decRef() - } - }() + s := tst.currentSnapshot() + require.NotNil(t, s) + defer s.decRef() + pp, n := s.getParts(nil, queryOpts) + require.Equal(t, len(tt.dpsList), n) sids := make([]common.SeriesID, len(tt.sids)) copy(sids, tt.sids) sort.Slice(sids, func(i, j int) bool { diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go new file mode 100644 index 00000000..63339392 --- /dev/null +++ b/banyand/measure/snapshot.go @@ -0,0 +1,109 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "errors" + "fmt" + "path/filepath" + "sync/atomic" +) + +func (tst *tsTable) currentSnapshot() *snapshot { + tst.RLock() + defer tst.RUnlock() + if tst.snapshot == nil { + return nil + } + s := tst.snapshot + s.incRef() + return s +} + +type snapshot struct { + parts []*partWrapper + epoch uint64 + + ref int32 +} + +func (s *snapshot) getParts(dst []*part, opts queryOptions) ([]*part, int) { + var count int + for _, p := range s.parts { + pm := p.p.partMetadata + if opts.maxTimestamp < pm.MinTimestamp || opts.minTimestamp > pm.MaxTimestamp { + continue + } + dst = append(dst, p.p) + count++ + } + return dst, count +} + +func (s *snapshot) incRef() { + atomic.AddInt32(&s.ref, 1) +} + +func (s *snapshot) decRef() { + n := atomic.AddInt32(&s.ref, -1) + if n > 0 { + return + } + for i := range s.parts { + s.parts[i].decRef() + } + s.parts = s.parts[:0] +} + +func (s snapshot) copyAllTo(nextEpoch uint64) snapshot { + s.epoch = nextEpoch + s.ref = 1 + for i := range s.parts { + s.parts[i].incRef() + } + return s +} + +func (s *snapshot) merge(nextEpoch uint64, nextParts map[uint64]*partWrapper) snapshot { + var result snapshot + result.epoch = nextEpoch + result.ref = 1 + for i := 0; i < len(s.parts); i++ { + if n, ok := nextParts[s.parts[i].ID()]; ok { + result.parts = append(result.parts, n) + continue + } + s.parts[i].incRef() + result.parts = append(result.parts, s.parts[i]) + } + return result +} + +func snapshotName(snapshot uint64) string { + return fmt.Sprintf("%016x%s", snapshot, snapshotSuffix) +} + +func parseSnapshot(name string) (uint64, error) { + if filepath.Ext(name) != snapshotSuffix { + return 0, errors.New("invalid snapshot file ext") + } + if len(name) < 16 { + return 0, errors.New("invalid snapshot file name") + } + return parseEpoch(name[:16]) +} diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go new file mode 100644 index 00000000..1b471745 --- /dev/null +++ b/banyand/measure/snapshot_test.go @@ -0,0 +1,287 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSnapshotGetParts(t *testing.T) { + tests := []struct { + name string + snapshot *snapshot + dst []*part + opts queryOptions + expected []*part + count int + }{ + { + name: "Test with empty snapshot", + snapshot: &snapshot{ + parts: []*partWrapper{}, + }, + dst: []*part{}, + opts: queryOptions{ + minTimestamp: 0, + maxTimestamp: 10, + }, + expected: []*part{}, + count: 0, + }, + { + name: "Test with non-empty snapshot and no matching parts", + snapshot: &snapshot{ + parts: []*partWrapper{ + { + p: &part{partMetadata: partMetadata{ + MinTimestamp: 0, + MaxTimestamp: 5, + }}, + }, + { + p: &part{partMetadata: partMetadata{ + MinTimestamp: 6, + MaxTimestamp: 10, + }}, + }, + }, + }, + dst: []*part{}, + opts: queryOptions{ + minTimestamp: 11, + maxTimestamp: 15, + }, + expected: []*part{}, + count: 0, + }, + { + name: "Test with non-empty snapshot and some matching parts", + snapshot: &snapshot{ + parts: []*partWrapper{ + { + p: &part{partMetadata: partMetadata{ + MinTimestamp: 0, + MaxTimestamp: 5, + }}, + }, + { + p: &part{partMetadata: partMetadata{ + MinTimestamp: 6, + MaxTimestamp: 10, + }}, + }, + }, + }, + dst: []*part{}, + opts: queryOptions{ + minTimestamp: 5, + maxTimestamp: 10, + }, + expected: []*part{ + {partMetadata: partMetadata{ + MinTimestamp: 0, + MaxTimestamp: 5, + }}, + {partMetadata: partMetadata{ + MinTimestamp: 6, + MaxTimestamp: 10, + }}, + }, + count: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, count := tt.snapshot.getParts(tt.dst, tt.opts) + assert.Equal(t, tt.expected, result) + assert.Equal(t, tt.count, count) + }) + } +} + +func TestSnapshotCopyAllTo(t *testing.T) { + tests := []struct { + name string + snapshot snapshot + nextEpoch uint64 + closePrev bool + expected snapshot + }{ + { + name: "Test with empty snapshot", + snapshot: snapshot{ + parts: []*partWrapper{}, + }, + nextEpoch: 1, + expected: snapshot{ + epoch: 1, + ref: 1, + parts: []*partWrapper{}, + }, + }, + { + name: "Test with non-empty snapshot", + snapshot: snapshot{ + parts: []*partWrapper{ + {ref: 1}, + {ref: 2}, + }, + }, + nextEpoch: 1, + expected: snapshot{ + epoch: 1, + ref: 1, + parts: []*partWrapper{ + {ref: 2}, + {ref: 3}, + }, + }, + }, + { + name: "Test with closed previous snapshot", + snapshot: snapshot{ + parts: []*partWrapper{ + {ref: 1}, + {ref: 2}, + }, + }, + nextEpoch: 1, + closePrev: true, + expected: snapshot{ + epoch: 1, + ref: 1, + parts: []*partWrapper{ + {ref: 1}, + {ref: 2}, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.snapshot.copyAllTo(tt.nextEpoch) + if tt.closePrev { + tt.snapshot.decRef() + } + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestSnapshotMerge(t *testing.T) { + tests := []struct { + name string + snapshot *snapshot + closePrev bool + nextEpoch uint64 + nextParts map[uint64]*partWrapper + expected snapshot + }{ + { + name: "Test with empty snapshot and empty next parts", + snapshot: &snapshot{ + parts: []*partWrapper{}, + }, + nextEpoch: 1, + nextParts: map[uint64]*partWrapper{}, + expected: snapshot{ + epoch: 1, + ref: 1, + parts: nil, + }, + }, + { + name: "Test with non-empty snapshot and empty next parts", + snapshot: &snapshot{ + parts: []*partWrapper{ + {p: &part{partMetadata: partMetadata{ID: 1}}, ref: 1}, + {p: &part{partMetadata: partMetadata{ID: 2}}, ref: 2}, + }, + }, + nextEpoch: 1, + nextParts: map[uint64]*partWrapper{}, + expected: snapshot{ + epoch: 1, + ref: 1, + parts: []*partWrapper{ + {p: &part{partMetadata: partMetadata{ID: 1}}, ref: 2}, + {p: &part{partMetadata: partMetadata{ID: 2}}, ref: 3}, + }, + }, + }, + { + name: "Test with non-empty snapshot and non-empty next parts", + snapshot: &snapshot{ + parts: []*partWrapper{ + {p: &part{partMetadata: partMetadata{ID: 1}}, ref: 1}, + {p: &part{partMetadata: partMetadata{ID: 2}}, ref: 2}, + }, + }, + nextEpoch: 1, + nextParts: map[uint64]*partWrapper{ + 2: {p: &part{partMetadata: partMetadata{ID: 2}}, ref: 1}, + 3: {p: &part{partMetadata: partMetadata{ID: 3}}, ref: 1}, + }, + expected: snapshot{ + epoch: 1, + ref: 1, + parts: []*partWrapper{ + {p: &part{partMetadata: partMetadata{ID: 1}}, ref: 2}, + {p: &part{partMetadata: partMetadata{ID: 2}}, ref: 1}, + }, + }, + }, + { + name: "Test with closed previous snapshot", + snapshot: &snapshot{ + parts: []*partWrapper{ + {p: &part{partMetadata: partMetadata{ID: 1}}, ref: 1}, + {p: &part{partMetadata: partMetadata{ID: 2}}, ref: 2}, + }, + }, + closePrev: true, + nextEpoch: 1, + nextParts: map[uint64]*partWrapper{ + 2: {p: &part{partMetadata: partMetadata{ID: 2}}, ref: 1}, + 3: {p: &part{partMetadata: partMetadata{ID: 3}}, ref: 1}, + }, + expected: snapshot{ + epoch: 1, + ref: 1, + parts: []*partWrapper{ + {p: &part{partMetadata: partMetadata{ID: 1}}, ref: 1}, + {p: &part{partMetadata: partMetadata{ID: 2}}, ref: 1}, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.snapshot.merge(tt.nextEpoch, tt.nextParts) + if tt.closePrev { + tt.snapshot.decRef() + } + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index 6ce4adbe..a51d51e8 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -19,30 +19,192 @@ package measure import ( "container/heap" + "encoding/json" "errors" "fmt" "io" + "path/filepath" + "sort" + "strconv" "sync" + "time" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/timestamp" + "github.com/apache/skywalking-banyandb/pkg/watcher" ) -func newTSTable(_ string, _ common.Position, _ *logger.Logger, _ timestamp.TimeRange) (*tsTable, error) { - return &tsTable{}, nil +const ( + snapshotSuffix = ".snp" + filePermission = 0o600 + dirPermission = 0o700 +) + +func newTSTable(fileSystem fs.FileSystem, rootPath string, _ common.Position, l *logger.Logger, _ timestamp.TimeRange) (*tsTable, error) { + var tsTable tsTable + tsTable.fileSystem = fileSystem + tsTable.root = rootPath + tsTable.l = l + tsTable.gc.parent = &tsTable + ee := fileSystem.ReadDir(rootPath) + if len(ee) == 0 { + t := &tsTable + t.startLoop(uint64(time.Now().UnixNano())) + return t, nil + } + var loadedParts []uint64 + var loadedSnapshots []uint64 + var needToDelete []string + for i := range ee { + if ee[i].IsDir() { + p, err := parseEpoch(ee[i].Name()) + if err != nil { + l.Info().Err(err).Msg("cannot parse part file name. skip and delete it") + needToDelete = append(needToDelete, ee[i].Name()) + continue + } + loadedParts = append(loadedParts, p) + continue + } + if filepath.Ext(ee[i].Name()) != snapshotSuffix { + continue + } + snapshot, err := parseSnapshot(ee[i].Name()) + if err != nil { + l.Info().Err(err).Msg("cannot parse snapshot file name. skip and delete it") + needToDelete = append(needToDelete, ee[i].Name()) + continue + } + loadedSnapshots = append(loadedSnapshots, snapshot) + tsTable.gc.registerSnapshot(snapshot) + } + for i := range needToDelete { + if err := fileSystem.DeleteFile(filepath.Join(rootPath, needToDelete[i])); err != nil { + l.Warn().Err(err).Str("path", filepath.Join(rootPath, needToDelete[i])).Msg("failed to delete part. Please check manually") + } + } + if len(loadedParts) == 0 || len(loadedSnapshots) == 0 { + t := &tsTable + t.startLoop(uint64(time.Now().UnixNano())) + return t, nil + } + sort.Slice(loadedSnapshots, func(i, j int) bool { + return loadedSnapshots[i] > loadedSnapshots[j] + }) + epoch := loadedSnapshots[0] + t := &tsTable + t.loadSnapshot(epoch, loadedParts) + t.startLoop(epoch) + return t, nil } type tsTable struct { - memParts []*partWrapper + l *logger.Logger + fileSystem fs.FileSystem + gc garbageCleaner + root string + snapshot *snapshot sync.RWMutex + introductions chan *introduction + loopCloser *run.Closer +} + +func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { + parts := tst.mustReadSnapshot(epoch) + var snp snapshot + for _, partName := range loadedParts { + var find bool + for j := range parts { + if partName == parts[j] { + find = true + break + } + } + if !find { + tst.gc.submitParts(partName) + } + p := mustOpenFilePart(partPath(tst.root, partName), tst.fileSystem) + snp.parts = append(snp.parts, newPartWrapper(nil, p, tst.fileSystem)) + } + tst.gc.clean() + if len(snp.parts) < 1 { + return + } + snp.incRef() + tst.snapshot = &snp +} + +func (tst *tsTable) startLoop(cur uint64) { + next := cur + 1 + tst.loopCloser = run.NewCloser(3) + tst.introductions = make(chan *introduction) + flushCh := make(chan *flusherIntroduction) + introducerWatcher := make(watcher.Channel, 1) + go tst.introducerLoop(flushCh, introducerWatcher, next) + go tst.flusherLoop(flushCh, introducerWatcher, cur) +} + +func parseEpoch(epochStr string) (uint64, error) { + p, err := strconv.ParseUint(epochStr, 16, 64) + if err != nil { + return 0, fmt.Errorf("cannot parse path %s: %w", epochStr, err) + } + return p, nil +} + +func (tst *tsTable) mustWriteSnapshot(snapshot uint64, partNames []string) { + data, err := json.Marshal(partNames) + if err != nil { + logger.Panicf("cannot marshal partNames to JSON: %s", err) + } + snapshotPath := filepath.Join(tst.root, snapshotName(snapshot)) + lf, err := tst.fileSystem.CreateLockFile(snapshotPath, filePermission) + if err != nil { + logger.Panicf("cannot create lock file %s: %s", snapshotPath, err) + } + n, err := lf.Write(data) + if err != nil { + logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err) + } + if n != len(data) { + logger.Panicf("unexpected number of bytes written to %s; got %d; want %d", snapshotPath, n, len(data)) + } +} + +func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 { + snapshotPath := filepath.Join(tst.root, snapshotName(snapshot)) + data, err := tst.fileSystem.Read(snapshotPath) + if err != nil { + logger.Panicf("cannot read %s: %s", snapshotPath, err) + } + var partNames []string + if err := json.Unmarshal(data, &partNames); err != nil { + logger.Panicf("cannot parse %s: %s", snapshotPath, err) + } + var result []uint64 + for i := range partNames { + e, err := parseEpoch(partNames[i]) + if err != nil { + logger.Panicf("cannot parse %s: %s", partNames[i], err) + } + result = append(result, e) + } + return result } func (tst *tsTable) Close() error { tst.Lock() defer tst.Unlock() - for _, p := range tst.memParts { - p.decRef() + if tst.loopCloser != nil { + + tst.loopCloser.Done() + tst.loopCloser.CloseThenWait() + } + if tst.snapshot != nil { + tst.snapshot.decRef() } return nil } @@ -56,26 +218,19 @@ func (tst *tsTable) mustAddDataPoints(dps *dataPoints) { mp.mustInitFromDataPoints(dps) p := openMemPart(mp) - pw := newMemPartWrapper(mp, p) - - tst.Lock() - defer tst.Unlock() - tst.memParts = append(tst.memParts, pw) -} + ind := generateIntroduction() + ind.applied = make(chan struct{}) + ind.memPart = newPartWrapper(mp, p, tst.fileSystem) -func (tst *tsTable) getParts(dst []*partWrapper, dstPart []*part, opts queryOptions) ([]*partWrapper, []*part) { - tst.RLock() - defer tst.RUnlock() - for _, p := range tst.memParts { - pm := p.mp.partMetadata - if opts.maxTimestamp < pm.MinTimestamp || opts.minTimestamp > pm.MaxTimestamp { - continue - } - p.incRef() - dst = append(dst, p) - dstPart = append(dstPart, p.p) + select { + case tst.introductions <- ind: + case <-tst.loopCloser.CloseNotify(): + return + } + select { + case <-ind.applied: + case <-tst.loopCloser.CloseNotify(): } - return dst, dstPart } type tstIter struct { diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go index fdfb9879..e4599596 100644 --- a/banyand/measure/tstable_test.go +++ b/banyand/measure/tstable_test.go @@ -94,13 +94,18 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) { tst.mustAddDataPoints(dps) time.Sleep(100 * time.Millisecond) } - assert.Equal(t, tt.want, len(tst.memParts)) - var lastVersion int64 - for _, pw := range tst.memParts { + s := tst.currentSnapshot() + if s == nil { + s = new(snapshot) + } + defer s.decRef() + assert.Equal(t, tt.want, len(s.parts)) + var lastVersion uint64 + for _, pw := range s.parts { if lastVersion == 0 { - lastVersion = pw.p.partMetadata.Version + lastVersion = pw.p.partMetadata.ID } else { - require.Less(t, lastVersion, pw.p.partMetadata.Version) + require.Less(t, lastVersion, pw.p.partMetadata.ID) } } }) @@ -175,15 +180,16 @@ func Test_tstIter(t *testing.T) { tst.mustAddDataPoints(dps) time.Sleep(100 * time.Millisecond) } - pws, pp := tst.getParts(nil, nil, queryOptions{ + s := tst.currentSnapshot() + if s == nil { + s = new(snapshot) + } + defer s.decRef() + pp, n := s.getParts(nil, queryOptions{ minTimestamp: tt.minTimestamp, maxTimestamp: tt.maxTimestamp, }) - defer func() { - for _, pw := range pws { - pw.decRef() - } - }() + require.Equal(t, len(s.parts), n) ti := &tstIter{} ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp) var got []blockMetadata diff --git a/go.mod b/go.mod index 3d57a1a9..4a9f0d19 100644 --- a/go.mod +++ b/go.mod @@ -139,7 +139,7 @@ require ( go.uber.org/zap v1.26.0 golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.19.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/sys v0.15.0 golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.16.1 // indirect diff --git a/pkg/fs/error.go b/pkg/fs/error.go index a54c513a..6db456a5 100644 --- a/pkg/fs/error.go +++ b/pkg/fs/error.go @@ -21,16 +21,17 @@ package fs import "fmt" const ( - isExistError = 0 - isNotExistError = 1 - permissionError = 2 - openError = 3 - deleteError = 4 - writeError = 5 - readError = 6 - flushError = 7 - closeError = 8 - otherError = 9 + isExistError = iota + isNotExistError + permissionError + openError + deleteError + writeError + readError + flushError + closeError + lockError + otherError ) // FileSystemError implements the Error interface. diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go index 098287e2..b5f8aa10 100644 --- a/pkg/fs/file_system.go +++ b/pkg/fs/file_system.go @@ -94,12 +94,20 @@ type FileSystem interface { ReadDir(dirname string) []DirEntry // Create and open the file by specified name and mode. CreateFile(name string, permission Mode) (File, error) + // Create and open lock file by specified name and mode. + CreateLockFile(name string, permission Mode) (File, error) + // Open the file by specified name and mode. + OpenFile(name string) (File, error) // Flush mode, which flushes all data to one file. Write(buffer []byte, name string, permission Mode) (int, error) + // Read the entire file using streaming read. + Read(name string) ([]byte, error) // Delete the file. DeleteFile(name string) error // Delete the directory. MustRMAll(path string) + // SyncPath the directory of file. + SyncPath(path string) } // DirEntry is the interface that wraps the basic information about a file or directory. diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go index 4de839ff..563a2aa2 100644 --- a/pkg/fs/local_file_system.go +++ b/pkg/fs/local_file_system.go @@ -25,6 +25,7 @@ import ( "io" "os" "path/filepath" + "time" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -159,6 +160,31 @@ func (fs *localFileSystem) CreateFile(name string, permission Mode) (File, error } } +func (fs *localFileSystem) OpenFile(name string) (File, error) { + file, err := os.Open(name) + switch { + case err == nil: + return &LocalFile{ + file: file, + }, nil + case os.IsNotExist(err): + return nil, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s,error message: %s", name, err), + } + case os.IsPermission(err): + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", name, err), + } + default: + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err), + } + } +} + // Write flushes all data to one file. func (fs *localFileSystem) Write(buffer []byte, name string, permission Mode) (int, error) { file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) @@ -194,6 +220,30 @@ func (fs *localFileSystem) Write(buffer []byte, name string, permission Mode) (i return size, nil } +// Read is used to read the entire file using streaming read. +func (fs *localFileSystem) Read(name string) ([]byte, error) { + data, err := os.ReadFile(name) + switch { + case err == nil: + return data, nil + case os.IsNotExist(err): + return data, &FileSystemError{ + Code: isNotExistError, + Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", name, err), + } + case os.IsPermission(err): + return data, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", name, err), + } + default: + return data, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Read file error, file name: %s, error message: %s", name, err), + } + } +} + // DeleteFile is used to delete the file. func (fs *localFileSystem) DeleteFile(name string) error { err := os.Remove(name) @@ -219,9 +269,16 @@ func (fs *localFileSystem) DeleteFile(name string) error { } func (fs *localFileSystem) MustRMAll(path string) { - if err := os.RemoveAll(path); err != nil { - logger.Panicf("failed to remove all files under %s", path) + if err := os.RemoveAll(path); err == nil { + return + } + for i := 0; i < 5; i++ { + time.Sleep(time.Second) + if err := os.RemoveAll(path); err == nil { + return + } } + fs.logger.Panic().Str("path", path).Msg("failed to remove all files under path") } // Write adds new data to the end of a file. diff --git a/pkg/fs/local_file_system_nix.go b/pkg/fs/local_file_system_nix.go new file mode 100644 index 00000000..05f71fef --- /dev/null +++ b/pkg/fs/local_file_system_nix.go @@ -0,0 +1,74 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd + +package fs + +import ( + "fmt" + "os" + + "golang.org/x/sys/unix" +) + +// localFileSystem is the implementation of FileSystem interface. +func (*localFileSystem) CreateLockFile(name string, permission Mode) (File, error) { + file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) + switch { + case err == nil: + if err := unix.Flock(int(file.Fd()), unix.LOCK_EX|unix.LOCK_NB); err != nil { + return nil, &FileSystemError{ + Code: lockError, + Message: fmt.Sprintf("Cannot lock file, file name: %s, error message: %s", name, err), + } + } + return &LocalFile{ + file: file, + }, nil + case os.IsExist(err): + return nil, &FileSystemError{ + Code: isExistError, + Message: fmt.Sprintf("File is exist, file name: %s,error message: %s", name, err), + } + case os.IsPermission(err): + return nil, &FileSystemError{ + Code: permissionError, + Message: fmt.Sprintf("There is not enough permission, file name: %s, permission: %d,error message: %s", name, permission, err), + } + default: + return nil, &FileSystemError{ + Code: otherError, + Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err), + } + } +} + +func (fs *localFileSystem) SyncPath(name string) { + file, err := os.Open(name) + if err != nil { + fs.logger.Panic().Str("name", name).Err(err).Msg("failed to open file") + } + if err := file.Sync(); err != nil { + _ = file.Close() + fs.logger.Panic().Str("name", name).Err(err).Msg("failed to sync file") + } + if err := file.Close(); err != nil { + fs.logger.Panic().Str("name", name).Err(err).Msg("failed to close file") + } +} diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go new file mode 100644 index 00000000..6d5d63d4 --- /dev/null +++ b/pkg/watcher/watcher.go @@ -0,0 +1,70 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package watcher provides a watcher to watch the epoch. +package watcher + +// Epoch is a epoch watcher. +// It will be notified when the epoch is reached. +type Epoch struct { + epoch uint64 + ch chan struct{} +} + +// Watch returns a channel that will be notified when the epoch is reached. +func (e *Epoch) Watch() <-chan struct{} { + return e.ch +} + +// Epochs is a list of epoch watchers. +type Epochs []*Epoch + +// Add adds a epoch watcher. +func (e *Epochs) Add(epoch *Epoch) { + *e = append(*e, epoch) +} + +// Notify notifies all epoch watchers that the epoch is reached. +func (e *Epochs) Notify(epoch uint64) { + var remained Epochs + for _, ep := range *e { + if ep.epoch <= epoch { + close(ep.ch) + continue + } + remained.Add(ep) + } + *e = remained +} + +// Channel is a channel of epoch watchers. +type Channel chan *Epoch + +// Add adds a epoch watcher. +// It returns nil if the channel is closed. +func (w Channel) Add(epoch uint64, closeCh <-chan struct{}) *Epoch { + ep := &Epoch{ + epoch: epoch, + ch: make(chan struct{}, 1), + } + select { + case w <- ep: + case <-closeCh: + return nil + } + return ep +}
