This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new ef05f8a7 Implement skipping index for stream (#665) ef05f8a7 is described below commit ef05f8a7714f4a54b24bbaf76b3df5b528eafccf Author: Huang Youliang <butterbright0...@gmail.com> AuthorDate: Tue Jul 1 21:36:34 2025 +0800 Implement skipping index for stream (#665) * Implement skipping index writes --------- Co-authored-by: Gao Hongtao <hanahm...@gmail.com> Co-authored-by: 吴晟 Wu Sheng <wu.sh...@foxmail.com> --- CHANGES.md | 1 + api/proto/banyandb/database/v1/schema.proto | 1 + banyand/internal/storage/version.go | 2 +- banyand/internal/storage/versions.yml | 2 +- banyand/stream/benchmark_test.go | 7 +- banyand/stream/block.go | 44 +++- banyand/stream/block_scanner.go | 7 +- banyand/stream/block_test.go | 16 +- banyand/stream/block_writer.go | 32 ++- banyand/stream/elements.go | 2 + banyand/stream/part.go | 32 ++- banyand/stream/part_iter.go | 27 ++- banyand/stream/part_iter_test.go | 2 +- banyand/stream/query.go | 4 +- banyand/stream/query_by_idx.go | 2 +- banyand/stream/tag.go | 17 +- banyand/stream/tag_filter.go | 222 +++++++++++++++++++++ banyand/stream/tag_filter_test.go | 161 +++++++++++++++ banyand/stream/tag_metadata.go | 23 ++- banyand/stream/tag_metadata_test.go | 84 +++++--- banyand/stream/tag_test.go | 7 +- banyand/stream/tstable.go | 5 +- banyand/stream/tstable_test.go | 2 +- banyand/stream/write.go | 86 ++++++-- docs/api-reference.md | 1 + pkg/encoding/bytes.go | 10 +- pkg/encoding/dictionary.go | 2 +- pkg/filter/bloom_filter.go | 132 ++++++++++++ pkg/filter/bloom_filter_test.go | 136 +++++++++++++ pkg/index/{inverted => analyzer}/analyzer.go | 21 +- pkg/index/{inverted => analyzer}/analyzer_test.go | 4 +- pkg/index/index.go | 7 + pkg/index/inverted/inverted.go | 24 +-- pkg/index/inverted/inverted_series.go | 3 +- pkg/query/logical/stream/index_filter.go | 65 +++++- .../logical/stream/stream_plan_indexscan_local.go | 8 +- pkg/query/logical/stream/stream_plan_tag_filter.go | 13 +- pkg/query/logical/tag_filter.go | 88 ++++++-- pkg/query/model/model.go | 9 +- pkg/test/stream/testdata/index_rules/db.type.json | 2 +- .../stream/testdata/index_rules/endpoint_id.json | 2 +- .../stream/testdata/index_rules/http.method.json | 2 +- .../stream/testdata/index_rules/mq.broker.json | 2 +- pkg/test/stream/testdata/index_rules/mq.queue.json | 2 +- pkg/test/stream/testdata/index_rules/mq.topic.json | 2 +- .../stream/testdata/index_rules/status_code.json | 2 +- pkg/test/stream/testdata/index_rules/trace_id.json | 2 +- 47 files changed, 1163 insertions(+), 164 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index aba5d5f6..99804b8a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -16,6 +16,7 @@ Release Notes. - Add a wait and retry to write handlers to avoid the local metadata cache being loaded. - Implement primary block cache for measure. - Implement versioning properties and replace physical deletion with the tombstone mechanism for the property database. +- Implement skipping index for stream. - Add Load Balancer Feature to Liaison. - Implement fadvise for large files to prevent page cache pollution. - Data Model: Introduce the `Trace` data model to store the trace/span data. diff --git a/api/proto/banyandb/database/v1/schema.proto b/api/proto/banyandb/database/v1/schema.proto index a7e61ce8..c6e71ebb 100644 --- a/api/proto/banyandb/database/v1/schema.proto +++ b/api/proto/banyandb/database/v1/schema.proto @@ -164,6 +164,7 @@ message IndexRule { enum Type { TYPE_UNSPECIFIED = 0; TYPE_INVERTED = 1; + TYPE_SKIPPING = 2; // TYPE_TREE is a tree index, which is used for storing hierarchical data. TYPE_TREE = 3; } diff --git a/banyand/internal/storage/version.go b/banyand/internal/storage/version.go index 8225629e..2fdac4ca 100644 --- a/banyand/internal/storage/version.go +++ b/banyand/internal/storage/version.go @@ -28,7 +28,7 @@ import ( const ( metadataFilename = "metadata" - currentVersion = "1.2.0" + currentVersion = "1.3.0" compatibleVersionsKey = "versions" compatibleVersionsFilename = "versions.yml" ) diff --git a/banyand/internal/storage/versions.yml b/banyand/internal/storage/versions.yml index eb98c0e6..7f01e316 100644 --- a/banyand/internal/storage/versions.yml +++ b/banyand/internal/storage/versions.yml @@ -14,4 +14,4 @@ # limitations under the License. versions: -- 1.2.0 +- 1.3.0 diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go index 9c7c68db..273d91b7 100644 --- a/banyand/stream/benchmark_test.go +++ b/banyand/stream/benchmark_test.go @@ -94,6 +94,10 @@ func (mf mockFilter) Execute(_ index.GetSearcher, seriesID common.SeriesID, _ *i return mf.index[mf.value][seriesID], roaring.DummyPostingList, nil } +func (mf mockFilter) ShouldSkip(_ index.FilterOp) (bool, error) { + return false, nil +} + type databaseSupplier struct { database atomic.Value } @@ -311,7 +315,8 @@ func generateStreamQueryOptions(p parameter, midx mockIndex) model.StreamQueryOp Name: "benchmark", TimeRange: &timeRange, Entities: entities, - Filter: filter, + InvertedFilter: filter, + SkippingFilter: nil, Order: order, TagProjection: []model.TagProjection{tagProjection}, MaxElementSize: math.MaxInt32, diff --git a/banyand/stream/block.go b/banyand/stream/block.go index e4a27945..e6e6e5e0 100644 --- a/banyand/stream/block.go +++ b/banyand/stream/block.go @@ -18,6 +18,7 @@ package stream import ( + "bytes" "fmt" "sort" @@ -25,7 +26,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/pkg/bytes" + pkgbytes "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index/posting" @@ -103,6 +104,28 @@ func (b *block) processTags(tf tagValues, tagFamilyIdx, i int, elementsLen int) tags[j].resizeValues(elementsLen) tags[j].valueType = t.valueType tags[j].values[i] = t.marshal() + if !t.indexed { + continue + } + if tags[j].filter == nil { + filter := generateBloomFilter() + tags[j].filter = filter + } + tags[j].filter.SetN(elementsLen) + tags[j].filter.ResizeBits((elementsLen + 63) / 64) + tags[j].filter.Add(t.value) + if t.valueType == pbv1.ValueTypeInt64 { + if len(tags[j].min) == 0 { + tags[j].min = t.value + } else if bytes.Compare(t.value, tags[j].min) == -1 { + tags[j].min = t.value + } + if len(tags[j].max) == 0 { + tags[j].max = t.value + } else if bytes.Compare(t.value, tags[j].max) == 1 { + tags[j].max = t.value + } + } } } @@ -159,12 +182,12 @@ func (b *block) validate() { } func (b *block) marshalTagFamily(tf tagFamily, bm *blockMetadata, ww *writers) { - hw, w := ww.getTagMetadataWriterAndTagWriter(tf.name) + hw, w, fw := ww.getWriters(tf.name) cc := tf.tags cfm := generateTagFamilyMetadata() cmm := cfm.resizeTagMetadata(len(cc)) for i := range cc { - cc[i].mustWriteTo(&cmm[i], w) + cc[i].mustWriteTo(&cmm[i], w, fw) } bb := bigValuePool.Generate() defer bigValuePool.Release(bb) @@ -186,7 +209,7 @@ func (b *block) unmarshalTagFamily(decoder *encoding.BytesBlockDecoder, tfIndex return } bb := bigValuePool.Generate() - bb.Buf = bytes.ResizeExact(bb.Buf, int(tagFamilyMetadataBlock.size)) + bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(tagFamilyMetadataBlock.size)) fs.MustReadData(metaReader, int64(tagFamilyMetadataBlock.offset), bb.Buf) tfm := generateTagFamilyMetadata() defer releaseTagFamilyMetadata(tfm) @@ -221,7 +244,7 @@ func (b *block) unmarshalTagFamilyFromSeqReaders(decoder *encoding.BytesBlockDec logger.Panicf("offset %d must be equal to bytesRead %d", columnFamilyMetadataBlock.offset, metaReader.bytesRead) } bb := bigValuePool.Generate() - bb.Buf = bytes.ResizeExact(bb.Buf, int(columnFamilyMetadataBlock.size)) + bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(columnFamilyMetadataBlock.size)) metaReader.mustReadFull(bb.Buf) tfm := generateTagFamilyMetadata() defer releaseTagFamilyMetadata(tfm) @@ -330,7 +353,7 @@ func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps []int64, elementID func mustReadTimestampsFrom(timestamps []int64, elementIDs []uint64, tm *timestampsMetadata, count int, reader fs.Reader) ([]int64, []uint64) { bb := bigValuePool.Generate() defer bigValuePool.Release(bb) - bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size)) + bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(tm.size)) fs.MustReadData(reader, int64(tm.offset), bb.Buf) return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, count, reader.Path(), bb.Buf) } @@ -359,7 +382,7 @@ func mustSeqReadTimestampsFrom(timestamps []int64, elementIDs []uint64, tm *time } bb := bigValuePool.Generate() defer bigValuePool.Release(bb) - bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size)) + bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(tm.size)) reader.mustReadFull(bb.Buf) return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, count, reader.Path(), bb.Buf) } @@ -373,6 +396,13 @@ func generateBlock() *block { } func releaseBlock(b *block) { + for _, tf := range b.tagFamilies { + for _, t := range tf.tags { + if t.filter != nil { + releaseBloomFilter(t.filter) + } + } + } b.reset() blockPool.Put(b) } diff --git a/banyand/stream/block_scanner.go b/banyand/stream/block_scanner.go index 889adb64..601b3af0 100644 --- a/banyand/stream/block_scanner.go +++ b/banyand/stream/block_scanner.go @@ -162,7 +162,7 @@ func getBlockScanner(ctx context.Context, segment storage.Segment[*tsTable, *opt } func search(ctx context.Context, qo queryOptions, seriesList []common.SeriesID, tw *tsTable, tr *index.RangeOpts) (pl posting.List, plTS posting.List, err error) { - if qo.Filter == nil || qo.Filter == logicalstream.ENode { + if qo.InvertedFilter == nil || qo.InvertedFilter == logicalstream.ENode { return nil, nil, nil } tracer := query.GetTracer(ctx) @@ -184,7 +184,7 @@ func search(ctx context.Context, qo queryOptions, seriesList []common.SeriesID, for i := range seriesList { sid[i] = uint64(seriesList[i]) } - pl, plTS, err = tw.Index().Search(ctx, sid, qo.Filter, tr) + pl, plTS, err = tw.Index().Search(ctx, sid, qo.InvertedFilter, tr) if err != nil { return nil, nil, err } @@ -222,7 +222,7 @@ func (bsn *blockScanner) scan(ctx context.Context, blockCh chan *blockScanResult defer releaseBlockMetadataArray(bma) ti := generateTstIter() defer releaseTstIter(ti) - ti.init(bma, parts, bsn.qo.sortedSids, bsn.qo.minTimestamp, bsn.qo.maxTimestamp) + ti.init(bma, parts, bsn.qo.sortedSids, bsn.qo.minTimestamp, bsn.qo.maxTimestamp, bsn.qo.SkippingFilter) batch := generateBlockScanResultBatch() if ti.Error() != nil { batch.err = fmt.Errorf("cannot init tstIter: %w", ti.Error()) @@ -285,7 +285,6 @@ func (bsn *blockScanner) scan(ctx context.Context, blockCh chan *blockScanResult select { case blockCh <- batch: case <-ctx.Done(): - releaseBlockScanResultBatch(batch) } return diff --git a/banyand/stream/block_test.go b/banyand/stream/block_test.go index 8d784cd3..8b94e76b 100644 --- a/banyand/stream/block_test.go +++ b/banyand/stream/block_test.go @@ -250,13 +250,14 @@ func Test_mustWriteAndReadTimestamps(t *testing.T) { } func Test_marshalAndUnmarshalTagFamily(t *testing.T) { - metaBuffer, dataBuffer := &bytes.Buffer{}, &bytes.Buffer{} + metaBuffer, dataBuffer, filterBuffer := &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{} ww := &writers{ - mustCreateTagFamilyWriters: func(_ string) (fs.Writer, fs.Writer) { - return metaBuffer, dataBuffer + mustCreateTagFamilyWriters: func(_ string) (fs.Writer, fs.Writer, fs.Writer) { + return metaBuffer, dataBuffer, filterBuffer }, tagFamilyMetadataWriters: make(map[string]*writer), tagFamilyWriters: make(map[string]*writer), + tagFamilyFilterWriters: make(map[string]*writer), } b := &conventionalBlock tagProjection := toTagProjection(*b) @@ -286,7 +287,7 @@ func Test_marshalAndUnmarshalTagFamily(t *testing.T) { unmarshaled.unmarshalTagFamily(decoder, tfIndex, name, bm.getTagFamilyMetadata(name), tagProjection[name], metaBuffer, dataBuffer, 1) if diff := cmp.Diff(unmarshaled.tagFamilies[0], b.tagFamilies[0], - cmp.AllowUnexported(tagFamily{}, tag{}), + cmp.AllowUnexported(tagFamily{}, tag{}, tagFilter{}), ); diff != "" { t.Errorf("block.unmarshalTagFamily() (-got +want):\n%s", diff) } @@ -306,7 +307,7 @@ func Test_marshalAndUnmarshalTagFamily(t *testing.T) { unmarshaled2.unmarshalTagFamilyFromSeqReaders(decoder, tfIndex, name, bm.getTagFamilyMetadata(name), metaReader, valueReader) if diff := cmp.Diff(unmarshaled2.tagFamilies[0], b.tagFamilies[0], - cmp.AllowUnexported(tagFamily{}, tag{}), + cmp.AllowUnexported(tagFamily{}, tag{}, tagFilter{}), ); diff != "" { t.Errorf("block.unmarshalTagFamilyFromSeqReaders() (-got +want):\n%s", diff) } @@ -317,11 +318,12 @@ func Test_marshalAndUnmarshalBlock(t *testing.T) { timestampWriter := &writer{} timestampWriter.init(timestampBuffer) ww := &writers{ - mustCreateTagFamilyWriters: func(_ string) (fs.Writer, fs.Writer) { - return &bytes.Buffer{}, &bytes.Buffer{} + mustCreateTagFamilyWriters: func(_ string) (fs.Writer, fs.Writer, fs.Writer) { + return &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{} }, tagFamilyMetadataWriters: make(map[string]*writer), tagFamilyWriters: make(map[string]*writer), + tagFamilyFilterWriters: make(map[string]*writer), timestampsWriter: *timestampWriter, } p := &part{ diff --git a/banyand/stream/block_writer.go b/banyand/stream/block_writer.go index d20a3ea9..33fc0764 100644 --- a/banyand/stream/block_writer.go +++ b/banyand/stream/block_writer.go @@ -57,7 +57,7 @@ func (w *writer) MustClose() { w.reset() } -type mustCreateTagFamilyWriters func(name string) (fs.Writer, fs.Writer) +type mustCreateTagFamilyWriters func(name string) (fs.Writer, fs.Writer, fs.Writer) type writers struct { mustCreateTagFamilyWriters mustCreateTagFamilyWriters @@ -65,6 +65,7 @@ type writers struct { primaryWriter writer tagFamilyMetadataWriters map[string]*writer tagFamilyWriters map[string]*writer + tagFamilyFilterWriters map[string]*writer timestampsWriter writer } @@ -82,6 +83,10 @@ func (sw *writers) reset() { w.reset() delete(sw.tagFamilyWriters, i) } + for i, w := range sw.tagFamilyFilterWriters { + w.reset() + delete(sw.tagFamilyFilterWriters, i) + } } func (sw *writers) totalBytesWritten() uint64 { @@ -93,6 +98,9 @@ func (sw *writers) totalBytesWritten() uint64 { for _, w := range sw.tagFamilyWriters { n += w.bytesWritten } + for _, w := range sw.tagFamilyFilterWriters { + n += w.bytesWritten + } return n } @@ -107,22 +115,29 @@ func (sw *writers) MustClose() { for _, w := range sw.tagFamilyWriters { w.MustClose() } + for _, w := range sw.tagFamilyFilterWriters { + w.MustClose() + } } -func (sw *writers) getTagMetadataWriterAndTagWriter(tagName string) (*writer, *writer) { +func (sw *writers) getWriters(tagName string) (*writer, *writer, *writer) { thw, ok := sw.tagFamilyMetadataWriters[tagName] tw := sw.tagFamilyWriters[tagName] + tfw := sw.tagFamilyFilterWriters[tagName] if ok { - return thw, tw + return thw, tw, tfw } - hw, w := sw.mustCreateTagFamilyWriters(tagName) + hw, w, fw := sw.mustCreateTagFamilyWriters(tagName) thw = new(writer) thw.init(hw) tw = new(writer) tw.init(w) + tfw = new(writer) + tfw.init(fw) sw.tagFamilyMetadataWriters[tagName] = thw sw.tagFamilyWriters[tagName] = tw - return thw, tw + sw.tagFamilyFilterWriters[tagName] = tfw + return thw, tw, tfw } type blockWriter struct { @@ -172,11 +187,13 @@ func (bw *blockWriter) MustInitForMemPart(mp *memPart) { func (bw *blockWriter) mustInitForFilePart(fileSystem fs.FileSystem, path string, shouldCache bool) { bw.reset() fileSystem.MkdirPanicIfExist(path, storage.DirPerm) - bw.writers.mustCreateTagFamilyWriters = func(name string) (fs.Writer, fs.Writer) { + bw.writers.mustCreateTagFamilyWriters = func(name string) (fs.Writer, fs.Writer, fs.Writer) { metaPath := filepath.Join(path, name+tagFamiliesMetadataFilenameExt) dataPath := filepath.Join(path, name+tagFamiliesFilenameExt) + fitlerPath := filepath.Join(path, name+tagFamiliesFilterFilenameExt) return fs.MustCreateFile(fileSystem, metaPath, storage.FilePerm, shouldCache), - fs.MustCreateFile(fileSystem, dataPath, storage.FilePerm, shouldCache) + fs.MustCreateFile(fileSystem, dataPath, storage.FilePerm, shouldCache), + fs.MustCreateFile(fileSystem, fitlerPath, storage.FilePerm, shouldCache) } bw.writers.metaWriter.init(fs.MustCreateFile(fileSystem, filepath.Join(path, metaFilename), storage.FilePerm, shouldCache)) @@ -280,6 +297,7 @@ func generateBlockWriter() *blockWriter { writers: writers{ tagFamilyMetadataWriters: make(map[string]*writer), tagFamilyWriters: make(map[string]*writer), + tagFamilyFilterWriters: make(map[string]*writer), }, } } diff --git a/banyand/stream/elements.go b/banyand/stream/elements.go index 9d15cbfc..c040e23f 100644 --- a/banyand/stream/elements.go +++ b/banyand/stream/elements.go @@ -35,12 +35,14 @@ type tagValue struct { value []byte valueArr [][]byte valueType pbv1.ValueType + indexed bool } func (t *tagValue) reset() { t.tag = "" t.value = nil t.valueArr = nil + t.indexed = false } func (t *tagValue) size() int { diff --git a/banyand/stream/part.go b/banyand/stream/part.go index 69e36b74..eaacae09 100644 --- a/banyand/stream/part.go +++ b/banyand/stream/part.go @@ -40,6 +40,7 @@ const ( elementIndexFilename = "idx" tagFamiliesMetadataFilenameExt = ".tfm" tagFamiliesFilenameExt = ".tf" + tagFamiliesFilterFilenameExt = ".tff" ) type part struct { @@ -48,6 +49,7 @@ type part struct { fileSystem fs.FileSystem tagFamilyMetadata map[string]fs.Reader tagFamilies map[string]fs.Reader + tagFamilyFilter map[string]fs.Reader path string primaryBlockMetadata []primaryBlockMetadata partMetadata partMetadata @@ -62,6 +64,9 @@ func (p *part) close() { for _, tfh := range p.tagFamilyMetadata { fs.MustClose(tfh) } + for _, tff := range p.tagFamilyFilter { + fs.MustClose(tff) + } } func (p *part) String() string { @@ -80,9 +85,11 @@ func openMemPart(mp *memPart) *part { if mp.tagFamilies != nil { p.tagFamilies = make(map[string]fs.Reader) p.tagFamilyMetadata = make(map[string]fs.Reader) + p.tagFamilyFilter = make(map[string]fs.Reader) for name, tf := range mp.tagFamilies { p.tagFamilies[name] = tf p.tagFamilyMetadata[name] = mp.tagFamilyMetadata[name] + p.tagFamilyFilter[name] = mp.tagFamilyFilter[name] } } return &p @@ -91,27 +98,32 @@ func openMemPart(mp *memPart) *part { type memPart struct { tagFamilyMetadata map[string]*bytes.Buffer tagFamilies map[string]*bytes.Buffer + tagFamilyFilter map[string]*bytes.Buffer meta bytes.Buffer primary bytes.Buffer timestamps bytes.Buffer partMetadata partMetadata } -func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer, fs.Writer) { +func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer, fs.Writer, fs.Writer) { if mp.tagFamilies == nil { mp.tagFamilies = make(map[string]*bytes.Buffer) mp.tagFamilyMetadata = make(map[string]*bytes.Buffer) + mp.tagFamilyFilter = make(map[string]*bytes.Buffer) } tf, ok := mp.tagFamilies[name] tfh := mp.tagFamilyMetadata[name] + tff := mp.tagFamilyFilter[name] if ok { tf.Reset() tfh.Reset() - return tfh, tf + tff.Reset() + return tfh, tf, tff } mp.tagFamilies[name] = &bytes.Buffer{} mp.tagFamilyMetadata[name] = &bytes.Buffer{} - return mp.tagFamilyMetadata[name], mp.tagFamilies[name] + mp.tagFamilyFilter[name] = &bytes.Buffer{} + return mp.tagFamilyMetadata[name], mp.tagFamilies[name], mp.tagFamilyFilter[name] } func (mp *memPart) reset() { @@ -129,6 +141,11 @@ func (mp *memPart) reset() { tfh.Reset() } } + if mp.tagFamilyFilter != nil { + for _, tff := range mp.tagFamilyFilter { + tff.Reset() + } + } } func (mp *memPart) mustInitFromElements(es *elements) { @@ -176,6 +193,9 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path string) { for name, tfh := range mp.tagFamilyMetadata { fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path, name+tagFamiliesMetadataFilenameExt), storage.FilePerm) } + for name, tfh := range mp.tagFamilyFilter { + fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path, name+tagFamiliesFilterFilenameExt), storage.FilePerm) + } mp.partMetadata.mustWriteMetadata(fileSystem, path) @@ -287,6 +307,12 @@ func mustOpenFilePart(id uint64, root string, fileSystem fs.FileSystem) *part { } p.tagFamilies[removeExt(e.Name(), tagFamiliesFilenameExt)] = mustOpenReader(path.Join(partPath, e.Name()), fileSystem) } + if filepath.Ext(e.Name()) == tagFamiliesFilterFilenameExt { + if p.tagFamilyFilter == nil { + p.tagFamilyFilter = make(map[string]fs.Reader) + } + p.tagFamilyFilter[removeExt(e.Name(), tagFamiliesFilterFilenameExt)] = mustOpenReader(path.Join(partPath, e.Name()), fileSystem) + } } return &p } diff --git a/banyand/stream/part_iter.go b/banyand/stream/part_iter.go index f4277032..1e1de90d 100644 --- a/banyand/stream/part_iter.go +++ b/banyand/stream/part_iter.go @@ -28,8 +28,10 @@ import ( "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/pool" + logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" ) type partIter struct { @@ -37,6 +39,7 @@ type partIter struct { p *part curBlock *blockMetadata sids []common.SeriesID + blockFilter index.Filter primaryBlockMetadata []primaryBlockMetadata bms []blockMetadata compressedPrimaryBuf []byte @@ -50,6 +53,7 @@ func (pi *partIter) reset() { pi.curBlock = nil pi.p = nil pi.sids = nil + pi.blockFilter = nil pi.sidIdx = 0 pi.primaryBlockMetadata = nil pi.bms = nil @@ -58,13 +62,14 @@ func (pi *partIter) reset() { pi.err = nil } -func (pi *partIter) init(bma *blockMetadataArray, p *part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { +func (pi *partIter) init(bma *blockMetadataArray, p *part, sids []common.SeriesID, minTimestamp, maxTimestamp int64, blockFilter index.Filter) { pi.reset() pi.curBlock = &blockMetadata{} pi.p = p pi.bms = bma.arr pi.sids = sids + pi.blockFilter = blockFilter pi.minTimestamp = minTimestamp pi.maxTimestamp = maxTimestamp @@ -221,6 +226,7 @@ func (pi *partIter) findBlock() bool { bhs = bhs[1:] continue } + if bm.timestamps.min > pi.maxTimestamp { if !pi.nextSeriesID() { return false @@ -228,6 +234,25 @@ func (pi *partIter) findBlock() bool { continue } + if pi.blockFilter != nil && pi.blockFilter != logicalstream.ENode { + shouldSkip, err := func() (bool, error) { + tfs := generateTagFamilyFilters() + defer releaseTagFamilyFilters(tfs) + tfs.unmarshal(bm.tagFamilies, pi.p.tagFamilyMetadata, pi.p.tagFamilyFilter) + return pi.blockFilter.ShouldSkip(tfs) + }() + if err != nil { + pi.err = err + return false + } + if shouldSkip { + if !pi.nextSeriesID() { + return false + } + continue + } + } + pi.curBlock = bm pi.bms = bhs[1:] diff --git a/banyand/stream/part_iter_test.go b/banyand/stream/part_iter_test.go index ef42be37..6eb63da0 100644 --- a/banyand/stream/part_iter_test.go +++ b/banyand/stream/part_iter_test.go @@ -115,7 +115,7 @@ func Test_partIter_nextBlock(t *testing.T) { verifyPart := func(p *part) { defer p.close() pi := partIter{} - pi.init(bma, p, tt.sids, tt.opt.minTimestamp, tt.opt.maxTimestamp) + pi.init(bma, p, tt.sids, tt.opt.minTimestamp, tt.opt.maxTimestamp, nil) var got []blockMetadata for pi.nextBlock() { diff --git a/banyand/stream/query.go b/banyand/stream/query.go index 0bb1a7e9..0f8b787d 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -282,13 +282,13 @@ func (qo *queryOptions) copyFrom(other *queryOptions) { func indexSearch(ctx context.Context, sqo model.StreamQueryOptions, tabs []*tsTable, seriesList []uint64, tr *index.RangeOpts, ) (posting.List, posting.List, error) { - if sqo.Filter == nil || sqo.Filter == logicalstream.ENode { + if sqo.InvertedFilter == nil || sqo.InvertedFilter == logicalstream.ENode { return nil, nil, nil } result, resultTS := roaring.NewPostingList(), roaring.NewPostingList() for _, tw := range tabs { index := tw.Index() - pl, plTS, err := index.Search(ctx, seriesList, sqo.Filter, tr) + pl, plTS, err := index.Search(ctx, seriesList, sqo.InvertedFilter, tr) if err != nil { return nil, nil, err } diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go index ae3e70af..3d5ee61e 100644 --- a/banyand/stream/query_by_idx.go +++ b/banyand/stream/query_by_idx.go @@ -83,7 +83,7 @@ func (qr *idxResult) scanParts(ctx context.Context, qo queryOptions) error { ti := generateTstIter() defer releaseTstIter(ti) sids := qo.sortedSids - ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) + ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp, qo.SkippingFilter) if ti.Error() != nil { return fmt.Errorf("cannot init tstIter: %w", ti.Error()) } diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go index cb8a3d7c..7499747f 100644 --- a/banyand/stream/tag.go +++ b/banyand/stream/tag.go @@ -26,6 +26,7 @@ import ( ) type tag struct { + tagFilter name string values [][]byte valueType pbv1.ValueType @@ -39,6 +40,8 @@ func (t *tag) reset() { values[i] = nil } t.values = values[:0] + + t.tagFilter.reset() } func (t *tag) resizeValues(valuesLen int) [][]byte { @@ -51,7 +54,7 @@ func (t *tag) resizeValues(valuesLen int) [][]byte { return values } -func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer) { +func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer, tagFilterWriter *writer) { tm.reset() tm.name = t.name @@ -68,6 +71,18 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer) { } tm.offset = tagWriter.bytesWritten tagWriter.MustWrite(bb.Buf) + + if t.filter != nil { + bb.Reset() + bb.Buf = encodeBloomFilter(bb.Buf[:0], t.filter) + if tm.valueType == pbv1.ValueTypeInt64 { + tm.min = t.min + tm.max = t.max + } + tm.filterBlock.size = uint64(len(bb.Buf)) + tm.filterBlock.offset = tagFilterWriter.bytesWritten + tagFilterWriter.MustWrite(bb.Buf) + } } func (t *tag) mustReadValues(decoder *encoding.BytesBlockDecoder, reader fs.Reader, cm tagMetadata, count uint64) { diff --git a/banyand/stream/tag_filter.go b/banyand/stream/tag_filter.go new file mode 100644 index 00000000..c609e7d8 --- /dev/null +++ b/banyand/stream/tag_filter.go @@ -0,0 +1,222 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "bytes" + "fmt" + + pkgbytes "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/filter" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" +) + +func encodeBloomFilter(dst []byte, bf *filter.BloomFilter) []byte { + dst = encoding.Int64ToBytes(dst, int64(bf.N())) + dst = encoding.EncodeUint64Block(dst, bf.Bits()) + return dst +} + +func decodeBloomFilter(src []byte, bf *filter.BloomFilter) *filter.BloomFilter { + n := encoding.BytesToInt64(src) + bf.SetN(int(n)) + + m := n * filter.B + bits := make([]uint64, 0) + bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:], uint64((m+63)/64)) + if err != nil { + logger.Panicf("failed to decode Bloom filter: %v", err) + } + bf.SetBits(bits) + + return bf +} + +func generateBloomFilter() *filter.BloomFilter { + v := bloomFilterPool.Get() + if v == nil { + return filter.NewBloomFilter(0) + } + return v +} + +func releaseBloomFilter(bf *filter.BloomFilter) { + bf.Reset() + bloomFilterPool.Put(bf) +} + +var bloomFilterPool = pool.Register[*filter.BloomFilter]("stream-bloomFilter") + +type tagFilter struct { + filter *filter.BloomFilter + min []byte + max []byte +} + +func (tf *tagFilter) reset() { + tf.filter = nil + tf.min = tf.min[:0] + tf.max = tf.max[:0] +} + +func generateTagFilter() *tagFilter { + v := tagFilterPool.Get() + if v == nil { + return &tagFilter{} + } + return v +} + +func releaseTagFilter(tf *tagFilter) { + releaseBloomFilter(tf.filter) + tf.reset() + tagFilterPool.Put(tf) +} + +var tagFilterPool = pool.Register[*tagFilter]("stream-tagFilter") + +type tagFamilyFilter map[string]*tagFilter + +func (tff *tagFamilyFilter) reset() { + clear(*tff) +} + +func (tff tagFamilyFilter) unmarshal(tagFamilyMetadataBlock *dataBlock, metaReader, filterReader fs.Reader) { + bb := bigValuePool.Generate() + bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(tagFamilyMetadataBlock.size)) + fs.MustReadData(metaReader, int64(tagFamilyMetadataBlock.offset), bb.Buf) + tfm := generateTagFamilyMetadata() + defer releaseTagFamilyMetadata(tfm) + err := tfm.unmarshal(bb.Buf) + if err != nil { + logger.Panicf("%s: cannot unmarshal tagFamilyMetadata: %v", metaReader.Path(), err) + } + bigValuePool.Release(bb) + for _, tm := range tfm.tagMetadata { + if tm.filterBlock.size == 0 { + continue + } + bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(tm.filterBlock.size)) + fs.MustReadData(filterReader, int64(tm.filterBlock.offset), bb.Buf) + bf := generateBloomFilter() + bf = decodeBloomFilter(bb.Buf, bf) + tf := generateTagFilter() + tf.filter = bf + if tm.valueType == pbv1.ValueTypeInt64 { + tf.min = tm.min + tf.max = tm.max + } + tff[tm.name] = tf + } +} + +func generateTagFamilyFilter() *tagFamilyFilter { + v := tagFamilyFilterPool.Get() + if v == nil { + return &tagFamilyFilter{} + } + return v +} + +func releaseTagFamilyFilter(tff *tagFamilyFilter) { + for _, tf := range *tff { + releaseTagFilter(tf) + } + tff.reset() + tagFamilyFilterPool.Put(tff) +} + +var tagFamilyFilterPool = pool.Register[*tagFamilyFilter]("stream-tagFamilyFilter") + +type tagFamilyFilters struct { + tagFamilyFilters []*tagFamilyFilter +} + +func (tfs *tagFamilyFilters) reset() { + tfs.tagFamilyFilters = tfs.tagFamilyFilters[:0] +} + +func (tfs *tagFamilyFilters) unmarshal(tagFamilies map[string]*dataBlock, metaReader, filterReader map[string]fs.Reader) { + for tf := range tagFamilies { + tff := generateTagFamilyFilter() + tff.unmarshal(tagFamilies[tf], metaReader[tf], filterReader[tf]) + tfs.tagFamilyFilters = append(tfs.tagFamilyFilters, tff) + } +} + +func (tfs *tagFamilyFilters) Eq(tagName string, tagValue string) bool { + for _, tff := range tfs.tagFamilyFilters { + if tf, ok := (*tff)[tagName]; ok { + return tf.filter.MightContain([]byte(tagValue)) + } + } + return true +} + +func (tfs *tagFamilyFilters) Range(tagName string, rangeOpts index.RangeOpts) (bool, error) { + for _, tff := range tfs.tagFamilyFilters { + if tf, ok := (*tff)[tagName]; ok { + if rangeOpts.Lower != nil { + lower, ok := rangeOpts.Lower.(*index.FloatTermValue) + if !ok { + return false, fmt.Errorf("lower is not a float value: %v", rangeOpts.Lower) + } + value := make([]byte, 0) + value = encoding.Int64ToBytes(value, int64(lower.Value)) + if bytes.Compare(tf.max, value) == -1 || !rangeOpts.IncludesLower && bytes.Equal(tf.max, value) { + return false, nil + } + } + if rangeOpts.Upper != nil { + upper, ok := rangeOpts.Upper.(*index.FloatTermValue) + if !ok { + return false, fmt.Errorf("upper is not a float value: %v", rangeOpts.Upper) + } + value := make([]byte, 0) + value = encoding.Int64ToBytes(value, int64(upper.Value)) + if bytes.Compare(tf.min, value) == 1 || !rangeOpts.IncludesUpper && bytes.Equal(tf.min, value) { + return false, nil + } + } + } + } + return true, nil +} + +func generateTagFamilyFilters() *tagFamilyFilters { + v := tagFamilyFiltersPool.Get() + if v == nil { + return &tagFamilyFilters{} + } + return v +} + +func releaseTagFamilyFilters(tfs *tagFamilyFilters) { + for _, tff := range tfs.tagFamilyFilters { + releaseTagFamilyFilter(tff) + } + tfs.reset() + tagFamilyFiltersPool.Put(tfs) +} + +var tagFamilyFiltersPool = pool.Register[*tagFamilyFilters]("stream-tagFamilyFilters") diff --git a/banyand/stream/tag_filter_test.go b/banyand/stream/tag_filter_test.go new file mode 100644 index 00000000..b381ed78 --- /dev/null +++ b/banyand/stream/tag_filter_test.go @@ -0,0 +1,161 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "bytes" + "encoding/binary" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/apache/skywalking-banyandb/pkg/filter" + "github.com/apache/skywalking-banyandb/pkg/fs" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" +) + +func TestEncodeAndDecodeBloomFilter(t *testing.T) { + assert := assert.New(t) + + bf := filter.NewBloomFilter(3) + + items := [][]byte{ + []byte("skywalking"), + []byte("banyandb"), + []byte(""), + []byte("hello"), + []byte("world"), + } + + for i := 0; i < 3; i++ { + bf.Add(items[i]) + } + + buf := make([]byte, 0) + buf = encodeBloomFilter(buf, bf) + bf2 := filter.NewBloomFilter(0) + bf2 = decodeBloomFilter(buf, bf2) + + for i := 0; i < 3; i++ { + mightContain := bf2.MightContain(items[i]) + assert.True(mightContain) + } + + for i := 3; i < 5; i++ { + mightContain := bf2.MightContain(items[i]) + assert.False(mightContain) + } +} + +type mockReader struct { + data []byte +} + +func (mr *mockReader) Path() string { + return "mock" +} + +func (mr *mockReader) Read(offset int64, buffer []byte) (int, error) { + if offset >= int64(len(mr.data)) { + return 0, nil + } + n := copy(buffer, mr.data[offset:]) + return n, nil +} + +func (mr *mockReader) SequentialRead() fs.SeqReader { + return nil +} + +func (mr *mockReader) Close() error { + return nil +} + +func generateMetaAndFilter(tagCount int, itemsPerTag int) ([]byte, []byte) { + tfm := generateTagFamilyMetadata() + defer releaseTagFamilyMetadata(tfm) + filterBuf := bytes.Buffer{} + + for i := 0; i < tagCount; i++ { + bf := filter.NewBloomFilter(itemsPerTag) + for j := 0; j < itemsPerTag; j++ { + item := make([]byte, 8) + binary.BigEndian.PutUint64(item, uint64(i*itemsPerTag+j)) + bf.Add(item) + } + buf := make([]byte, 0) + buf = encodeBloomFilter(buf, bf) + + tm := &tagMetadata{ + name: fmt.Sprintf("tag_%d", i), + valueType: pbv1.ValueTypeInt64, + min: make([]byte, 8), + max: make([]byte, 8), + } + binary.BigEndian.PutUint64(tm.min, uint64(i*itemsPerTag)) + binary.BigEndian.PutUint64(tm.max, uint64(i*itemsPerTag+itemsPerTag-1)) + tm.filterBlock.offset = uint64(filterBuf.Len()) + tm.filterBlock.size = uint64(len(buf)) + tfm.tagMetadata = append(tfm.tagMetadata, *tm) + + filterBuf.Write(buf) + } + + metaBuf := make([]byte, 0) + metaBuf = tfm.marshal(metaBuf) + return metaBuf, filterBuf.Bytes() +} + +func BenchmarkTagFamilyFiltersUnmarshal(b *testing.B) { + testCases := []struct { + tagFamilyCount int + tagCount int + itemsPerTag int + }{ + {1, 5, 10}, + {2, 10, 100}, + {3, 15, 1000}, + } + + for _, tc := range testCases { + b.Run(fmt.Sprintf("tagFamilies=%d_tags=%d_items=%d", tc.tagFamilyCount, tc.tagCount, tc.itemsPerTag), func(b *testing.B) { + tagFamilies := make(map[string]*dataBlock) + metaReaders := make(map[string]fs.Reader) + filterReaders := make(map[string]fs.Reader) + for i := 0; i < tc.tagFamilyCount; i++ { + familyName := fmt.Sprintf("tagFamily_%d", i) + metaBuf, filterBuf := generateMetaAndFilter(tc.tagCount, tc.itemsPerTag) + tagFamilies[familyName] = &dataBlock{ + offset: 0, + size: uint64(len(metaBuf)), + } + metaReaders[familyName] = &mockReader{data: metaBuf} + filterReaders[familyName] = &mockReader{data: filterBuf} + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + tfs := generateTagFamilyFilters() + tfs.unmarshal(tagFamilies, metaReaders, filterReaders) + releaseTagFamilyFilters(tfs) + } + }) + } +} diff --git a/banyand/stream/tag_metadata.go b/banyand/stream/tag_metadata.go index 8b8629f4..b30aa766 100644 --- a/banyand/stream/tag_metadata.go +++ b/banyand/stream/tag_metadata.go @@ -28,26 +28,38 @@ import ( type tagMetadata struct { name string + min []byte + max []byte dataBlock - valueType pbv1.ValueType + valueType pbv1.ValueType + filterBlock dataBlock } func (tm *tagMetadata) reset() { tm.name = "" tm.valueType = 0 tm.dataBlock.reset() + tm.min = nil + tm.max = nil + tm.filterBlock.reset() } func (tm *tagMetadata) copyFrom(src *tagMetadata) { tm.name = src.name tm.valueType = src.valueType tm.dataBlock.copyFrom(&src.dataBlock) + tm.min = append(tm.min[:0], src.min...) + tm.max = append(tm.max[:0], src.max...) + tm.filterBlock.copyFrom(&src.filterBlock) } func (tm *tagMetadata) marshal(dst []byte) []byte { dst = encoding.EncodeBytes(dst, convert.StringToBytes(tm.name)) dst = append(dst, byte(tm.valueType)) dst = tm.dataBlock.marshal(dst) + dst = encoding.EncodeBytes(dst, tm.min) + dst = encoding.EncodeBytes(dst, tm.max) + dst = tm.filterBlock.marshal(dst) return dst } @@ -63,6 +75,15 @@ func (tm *tagMetadata) unmarshal(src []byte) ([]byte, error) { tm.valueType = pbv1.ValueType(src[0]) src = src[1:] src = tm.dataBlock.unmarshal(src) + src, tm.min, err = encoding.DecodeBytes(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal tagMetadata.min: %w", err) + } + src, tm.max, err = encoding.DecodeBytes(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal tagMetadata.max: %w", err) + } + src = tm.filterBlock.unmarshal(src) return src, nil } diff --git a/banyand/stream/tag_metadata_test.go b/banyand/stream/tag_metadata_test.go index fe35057e..c0399771 100644 --- a/banyand/stream/tag_metadata_test.go +++ b/banyand/stream/tag_metadata_test.go @@ -27,9 +27,12 @@ import ( func Test_tagMetadata_reset(t *testing.T) { tm := &tagMetadata{ - name: "test", - valueType: pbv1.ValueTypeStr, - dataBlock: dataBlock{offset: 1, size: 10}, + name: "test", + valueType: pbv1.ValueTypeStr, + dataBlock: dataBlock{offset: 1, size: 10}, + filterBlock: dataBlock{offset: 2, size: 20}, + min: []byte{1, 2, 3}, + max: []byte{4, 5, 6}, } tm.reset() @@ -37,13 +40,19 @@ func Test_tagMetadata_reset(t *testing.T) { assert.Equal(t, "", tm.name) assert.Equal(t, pbv1.ValueType(0), tm.valueType) assert.Equal(t, dataBlock{}, tm.dataBlock) + assert.Equal(t, dataBlock{}, tm.filterBlock) + assert.Nil(t, tm.min) + assert.Nil(t, tm.max) } func Test_tagMetadata_copyFrom(t *testing.T) { src := &tagMetadata{ - name: "test", - valueType: pbv1.ValueTypeStr, - dataBlock: dataBlock{offset: 1, size: 10}, + name: "test", + valueType: pbv1.ValueTypeStr, + dataBlock: dataBlock{offset: 1, size: 10}, + filterBlock: dataBlock{offset: 2, size: 20}, + min: []byte{1, 2, 3}, + max: []byte{4, 5, 6}, } dest := &tagMetadata{} @@ -55,9 +64,12 @@ func Test_tagMetadata_copyFrom(t *testing.T) { func Test_tagMetadata_marshal(t *testing.T) { original := &tagMetadata{ - name: "test", - valueType: pbv1.ValueTypeStr, - dataBlock: dataBlock{offset: 1, size: 10}, + name: "test", + valueType: pbv1.ValueTypeStr, + dataBlock: dataBlock{offset: 1, size: 10}, + filterBlock: dataBlock{offset: 2, size: 20}, + min: []byte{1, 2, 3}, + max: []byte{4, 5, 6}, } marshaled := original.marshal(nil) @@ -74,14 +86,20 @@ func Test_tagFamilyMetadata_reset(t *testing.T) { tfm := &tagFamilyMetadata{ tagMetadata: []tagMetadata{ { - name: "test1", - valueType: pbv1.ValueTypeStr, - dataBlock: dataBlock{offset: 1, size: 10}, + name: "test1", + valueType: pbv1.ValueTypeStr, + dataBlock: dataBlock{offset: 1, size: 10}, + filterBlock: dataBlock{offset: 2, size: 20}, + min: []byte{1, 2, 3}, + max: []byte{4, 5, 6}, }, { - name: "test2", - valueType: pbv1.ValueTypeInt64, - dataBlock: dataBlock{offset: 2, size: 20}, + name: "test2", + valueType: pbv1.ValueTypeInt64, + dataBlock: dataBlock{offset: 3, size: 30}, + filterBlock: dataBlock{offset: 4, size: 40}, + min: []byte{1, 2, 3}, + max: []byte{4, 5, 6}, }, }, } @@ -95,14 +113,20 @@ func Test_tagFamilyMetadata_copyFrom(t *testing.T) { src := &tagFamilyMetadata{ tagMetadata: []tagMetadata{ { - name: "test1", - valueType: pbv1.ValueTypeStr, - dataBlock: dataBlock{offset: 1, size: 10}, + name: "test1", + valueType: pbv1.ValueTypeStr, + dataBlock: dataBlock{offset: 1, size: 10}, + filterBlock: dataBlock{offset: 2, size: 20}, + min: []byte{1, 2, 3}, + max: []byte{4, 5, 6}, }, { - name: "test2", - valueType: pbv1.ValueTypeInt64, - dataBlock: dataBlock{offset: 2, size: 20}, + name: "test2", + valueType: pbv1.ValueTypeInt64, + dataBlock: dataBlock{offset: 3, size: 30}, + filterBlock: dataBlock{offset: 4, size: 40}, + min: []byte{1, 2, 3}, + max: []byte{4, 5, 6}, }, }, } @@ -139,14 +163,20 @@ func Test_tagFamilyMetadata_marshalUnmarshal(t *testing.T) { original: &tagFamilyMetadata{ tagMetadata: []tagMetadata{ { - name: "test1", - valueType: pbv1.ValueTypeStr, - dataBlock: dataBlock{offset: 1, size: 10}, + name: "test1", + valueType: pbv1.ValueTypeStr, + dataBlock: dataBlock{offset: 1, size: 10}, + filterBlock: dataBlock{offset: 2, size: 20}, + min: []byte{1, 2, 3}, + max: []byte{4, 5, 6}, }, { - name: "test2", - valueType: pbv1.ValueTypeInt64, - dataBlock: dataBlock{offset: 2, size: 20}, + name: "test2", + valueType: pbv1.ValueTypeInt64, + dataBlock: dataBlock{offset: 3, size: 30}, + filterBlock: dataBlock{offset: 4, size: 40}, + min: []byte{1, 2, 3}, + max: []byte{4, 5, 6}, }, }, }, diff --git a/banyand/stream/tag_test.go b/banyand/stream/tag_test.go index 070c3679..c6a88be4 100644 --- a/banyand/stream/tag_test.go +++ b/banyand/stream/tag_test.go @@ -63,10 +63,11 @@ func TestTag_mustWriteTo_mustReadValues(t *testing.T) { tm := &tagMetadata{} - buf := &bytes.Buffer{} - w := &writer{} + buf, filterBuf := &bytes.Buffer{}, &bytes.Buffer{} + w, fw := &writer{}, &writer{} w.init(buf) - original.mustWriteTo(tm, w) + fw.init(filterBuf) + original.mustWriteTo(tm, w, fw) assert.Equal(t, w.bytesWritten, tm.size) assert.Equal(t, uint64(len(buf.Buf)), tm.size) assert.Equal(t, uint64(0), tm.offset) diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index 218f16b2..493444c4 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -35,6 +35,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/pool" @@ -331,7 +332,7 @@ func (ti *tstIter) reset() { ti.nextBlockNoop = false } -func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { +func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids []common.SeriesID, minTimestamp, maxTimestamp int64, blockFilter index.Filter) { ti.reset() ti.parts = parts @@ -340,7 +341,7 @@ func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids []common.Se } ti.piPool = ti.piPool[:len(ti.parts)] for i, p := range ti.parts { - ti.piPool[i].init(bma, p, sids, minTimestamp, maxTimestamp) + ti.piPool[i].init(bma, p, sids, minTimestamp, maxTimestamp, blockFilter) } ti.piHeap = ti.piHeap[:0] diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go index 3945f0bb..a954c260 100644 --- a/banyand/stream/tstable_test.go +++ b/banyand/stream/tstable_test.go @@ -145,7 +145,7 @@ func Test_tstIter(t *testing.T) { pp, n := s.getParts(nil, tt.minTimestamp, tt.maxTimestamp) require.Equal(t, len(s.parts), n) ti := &tstIter{} - ti.init(bma, pp, tt.sids, tt.minTimestamp, tt.maxTimestamp) + ti.init(bma, pp, tt.sids, tt.minTimestamp, tt.maxTimestamp, nil) var got []blockMetadata for ti.nextBlock() { if ti.piHeap[0].curBlock.seriesID == 0 { diff --git a/banyand/stream/write.go b/banyand/stream/write.go index 23503cc9..1b310c18 100644 --- a/banyand/stream/write.go +++ b/banyand/stream/write.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "strings" + "time" "google.golang.org/protobuf/proto" @@ -71,18 +72,33 @@ func (w *writeCallback) CheckHealth() *common.Error { func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *streamv1.InternalWriteRequest, docIDBuilder *strings.Builder, ) (map[string]*elementsInGroup, error) { - req := writeEvent.Request - t := req.Element.Timestamp.AsTime().Local() + t := writeEvent.Request.Element.Timestamp.AsTime().Local() if err := timestamp.Check(t); err != nil { return nil, fmt.Errorf("invalid timestamp: %w", err) } ts := t.UnixNano() + eg, err := w.prepareElementsInGroup(dst, writeEvent, ts) + if err != nil { + return nil, err + } + et, err := w.prepareElementsInTable(eg, writeEvent, ts) + if err != nil { + return nil, err + } + err = w.processElements(et, eg, writeEvent, docIDBuilder, ts) + if err != nil { + return nil, err + } + return dst, nil +} - gn := req.Metadata.Group +func (w *writeCallback) prepareElementsInGroup(dst map[string]*elementsInGroup, writeEvent *streamv1.InternalWriteRequest, ts int64) (*elementsInGroup, error) { + gn := writeEvent.Request.Metadata.Group tsdb, err := w.schemaRepo.loadTSDB(gn) if err != nil { return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, err) } + eg, ok := dst[gn] if !ok { eg = &elementsInGroup{ @@ -96,7 +112,10 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre if eg.latestTS < ts { eg.latestTS = ts } + return eg, nil +} +func (w *writeCallback) prepareElementsInTable(eg *elementsInGroup, writeEvent *streamv1.InternalWriteRequest, ts int64) (*elementsInTable, error) { var et *elementsInTable for i := range eg.tables { if eg.tables[i].timeRange.Contains(ts) { @@ -104,25 +123,30 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre break } } - shardID := common.ShardID(writeEvent.ShardId) + if et == nil { var segment storage.Segment[*tsTable, option] for _, seg := range eg.segments { if seg.GetTimeRange().Contains(ts) { segment = seg + break } } if segment == nil { - segment, err = tsdb.CreateSegmentIfNotExist(t) + var err error + segment, err = eg.tsdb.CreateSegmentIfNotExist(time.Unix(0, ts)) if err != nil { return nil, fmt.Errorf("cannot create segment: %w", err) } eg.segments = append(eg.segments, segment) } + + shardID := common.ShardID(writeEvent.ShardId) tstb, err := segment.CreateTSTableIfNotExist(shardID) if err != nil { return nil, fmt.Errorf("cannot create ts table: %w", err) } + et = &elementsInTable{ timeRange: segment.GetTimeRange(), tsTable: tstb, @@ -131,6 +155,14 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre et.elements.reset() eg.tables = append(eg.tables, et) } + return et, nil +} + +func (w *writeCallback) processElements(et *elementsInTable, eg *elementsInGroup, writeEvent *streamv1.InternalWriteRequest, + docIDBuilder *strings.Builder, ts int64, +) error { + req := writeEvent.Request + et.elements.timestamps = append(et.elements.timestamps, ts) docIDBuilder.Reset() docIDBuilder.WriteString(req.Metadata.Name) @@ -138,34 +170,39 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre docIDBuilder.WriteString(req.Element.ElementId) eID := convert.HashStr(docIDBuilder.String()) et.elements.elementIDs = append(et.elements.elementIDs, eID) + stm, ok := w.schemaRepo.loadStream(writeEvent.GetRequest().GetMetadata()) if !ok { - return nil, fmt.Errorf("cannot find stream definition: %s", writeEvent.GetRequest().GetMetadata()) + return fmt.Errorf("cannot find stream definition: %s", writeEvent.GetRequest().GetMetadata()) } + fLen := len(req.Element.GetTagFamilies()) if fLen < 1 { - return nil, fmt.Errorf("%s has no tag family", req) + return fmt.Errorf("%s has no tag family", req) } if fLen > len(stm.schema.GetTagFamilies()) { - return nil, fmt.Errorf("%s has more tag families than %s", req.Metadata, stm.schema) + return fmt.Errorf("%s has more tag families than %s", req.Metadata, stm.schema) } + series := &pbv1.Series{ Subject: req.Metadata.Name, EntityValues: writeEvent.EntityValues, } if err := series.Marshal(); err != nil { - return nil, fmt.Errorf("cannot marshal series: %w", err) + return fmt.Errorf("cannot marshal series: %w", err) } et.elements.seriesIDs = append(et.elements.seriesIDs, series.ID) is := stm.indexSchema.Load().(indexSchema) - tagFamilies := make([]tagValues, 0, len(stm.schema.TagFamilies)) + indexedTags := make(map[string]map[string]struct{}) + var fields []index.Field + if len(is.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) { - logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d", + return fmt.Errorf("metadata crashed, tag family rule length %d, tag family length %d", len(is.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies())) } - var fields []index.Field + for i := range stm.GetSchema().GetTagFamilies() { var tagFamily *modelv1.TagFamilyForWrite if len(req.Element.TagFamilies) <= i { @@ -178,6 +215,7 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre tf := tagValues{ tag: tagFamilySpec.Name, } + indexedTags[tagFamilySpec.Name] = make(map[string]struct{}) for j := range tagFamilySpec.Tags { var tagValue *modelv1.TagValue @@ -188,21 +226,25 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre } t := tagFamilySpec.Tags[j] + indexed := false if r, ok := tfr[t.Name]; ok && tagValue != pbv1.NullTagValue { - fields = appendField(fields, index.FieldKey{ - IndexRuleID: r.GetMetadata().GetId(), - Analyzer: r.Analyzer, - SeriesID: series.ID, - }, t.Type, tagValue, r.GetNoSort()) + if r.GetType() == databasev1.IndexRule_TYPE_INVERTED { + fields = appendField(fields, index.FieldKey{ + IndexRuleID: r.GetMetadata().GetId(), + Analyzer: r.Analyzer, + SeriesID: series.ID, + }, t.Type, tagValue, r.GetNoSort()) + } else if r.GetType() == databasev1.IndexRule_TYPE_SKIPPING { + indexed = true + } } _, isEntity := is.indexRuleLocators.EntitySet[t.Name] if tagFamilySpec.Tags[j].IndexedOnly || isEntity { continue } - tf.values = append(tf.values, encodeTagValue( - t.Name, - t.Type, - tagValue)) + tv := encodeTagValue(t.Name, t.Type, tagValue) + tv.indexed = indexed + tf.values = append(tf.values, tv) } if len(tf.values) > 0 { tagFamilies = append(tagFamilies, tf) @@ -225,7 +267,7 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre eg.docIDsAdded[docID] = struct{}{} } - return dst, nil + return nil } func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Message) { diff --git a/docs/api-reference.md b/docs/api-reference.md index a2c27cca..4a5006d6 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -1495,6 +1495,7 @@ Type determine the index structure under the hood | ---- | ------ | ----------- | | TYPE_UNSPECIFIED | 0 | | | TYPE_INVERTED | 1 | | +| TYPE_SKIPPING | 2 | | | TYPE_TREE | 3 | TYPE_TREE is a tree index, which is used for storing hierarchical data. | diff --git a/pkg/encoding/bytes.go b/pkg/encoding/bytes.go index 14a02c7c..7650bf15 100644 --- a/pkg/encoding/bytes.go +++ b/pkg/encoding/bytes.go @@ -49,7 +49,7 @@ func EncodeBytesBlock(dst []byte, a [][]byte) []byte { aLens = append(aLens, uint64(len(s))) } u64s.L = aLens - dst = encodeUint64Block(dst, u64s.L) + dst = EncodeUint64Block(dst, u64s.L) ReleaseUint64List(u64s) bb := bbPool.Generate() @@ -82,7 +82,7 @@ func (bbd *BytesBlockDecoder) Decode(dst [][]byte, src []byte, itemsCount uint64 var tail []byte var err error - u64List.L, tail, err = decodeUint64Block(u64List.L[:0], src, itemsCount) + u64List.L, tail, err = DecodeUint64Block(u64List.L[:0], src, itemsCount) if err != nil { return dst, fmt.Errorf("cannot decode string lengths: %w", err) } @@ -114,7 +114,8 @@ func (bbd *BytesBlockDecoder) Decode(dst [][]byte, src []byte, itemsCount uint64 return dst, nil } -func encodeUint64Block(dst []byte, a []uint64) []byte { +// EncodeUint64Block encodes a block of uint64 values into dst. +func EncodeUint64Block(dst []byte, a []uint64) []byte { bb := bbPool.Generate() bb.Buf = encodeUint64List(bb.Buf[:0], a) dst = compressBlock(dst, bb.Buf) @@ -122,7 +123,8 @@ func encodeUint64Block(dst []byte, a []uint64) []byte { return dst } -func decodeUint64Block(dst []uint64, src []byte, itemsCount uint64) ([]uint64, []byte, error) { +// DecodeUint64Block decodes a block of uint64 values from src. +func DecodeUint64Block(dst []uint64, src []byte, itemsCount uint64) ([]uint64, []byte, error) { bb := bbPool.Generate() defer bbPool.Release(bb) diff --git a/pkg/encoding/dictionary.go b/pkg/encoding/dictionary.go index c134f86b..a878bfa2 100644 --- a/pkg/encoding/dictionary.go +++ b/pkg/encoding/dictionary.go @@ -99,7 +99,7 @@ func (d *Dictionary) decodeBytesBlockWithTail(src []byte, itemsCount uint64) ([] var tail []byte var err error - u64List.L, tail, err = decodeUint64Block(u64List.L[:0], src, itemsCount) + u64List.L, tail, err = DecodeUint64Block(u64List.L[:0], src, itemsCount) if err != nil { return nil, nil, fmt.Errorf("cannot decode string lengths: %w", err) } diff --git a/pkg/filter/bloom_filter.go b/pkg/filter/bloom_filter.go new file mode 100644 index 00000000..98ff47c4 --- /dev/null +++ b/pkg/filter/bloom_filter.go @@ -0,0 +1,132 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package filter defines and implements data structures and interfaces for skipping index. +package filter + +import ( + "sync/atomic" + "unsafe" + + "github.com/cespare/xxhash/v2" +) + +const ( + k = 6 + // B specifies the number of bits allocated for each item. + B = 15 +) + +// BloomFilter is a probabilistic data structure designed to test whether an element is a member of a set. +type BloomFilter struct { + bits []uint64 + n int +} + +// NewBloomFilter creates a new Bloom filter with the number of items n and false positive rate p. +func NewBloomFilter(n int) *BloomFilter { + m := n * B + bits := make([]uint64, (m+63)/64) + return &BloomFilter{ + bits, + n, + } +} + +// Reset resets the Bloom filter. +func (bf *BloomFilter) Reset() { + bf.bits = bf.bits[:0] + bf.n = 0 +} + +// Add adds an item to the Bloom filter. +func (bf *BloomFilter) Add(item []byte) bool { + h := xxhash.Sum64(item) + bits := bf.bits + maxBits := uint64(len(bits)) * 64 + bp := (*[8]byte)(unsafe.Pointer(&h)) + b := bp[:] + isNew := false + for i := 0; i < k; i++ { + hi := xxhash.Sum64(b) + h++ + idx := hi % maxBits + i, j := idx/64, idx%64 + mask := uint64(1) << j + w := atomic.LoadUint64(&bits[i]) + for (w & mask) == 0 { + wNew := w | mask + if atomic.CompareAndSwapUint64(&bits[i], w, wNew) { + isNew = true + break + } + w = atomic.LoadUint64(&bits[i]) + } + } + return isNew +} + +// MightContain checks if an item might be in the Bloom filter. +func (bf *BloomFilter) MightContain(item []byte) bool { + h := xxhash.Sum64(item) + bits := bf.bits + maxBits := uint64(len(bits)) * 64 + bp := (*[8]byte)(unsafe.Pointer(&h)) + b := bp[:] + for i := 0; i < k; i++ { + hi := xxhash.Sum64(b) + h++ + idx := hi % maxBits + i, j := idx/64, idx%64 + mask := uint64(1) << j + w := atomic.LoadUint64(&bits[i]) + if (w & mask) == 0 { + return false + } + } + return true +} + +// Bits returns the underlying bitset. +func (bf *BloomFilter) Bits() []uint64 { + return bf.bits +} + +// N returns the number of items. +func (bf *BloomFilter) N() int { + return bf.n +} + +// SetBits sets the underlying bitset. +func (bf *BloomFilter) SetBits(bits []uint64) { + bf.bits = bits +} + +// SetN sets the number of items. +func (bf *BloomFilter) SetN(n int) { + bf.n = n +} + +// ResizeBits resizes the underlying bitset. +func (bf *BloomFilter) ResizeBits(n int) { + bits := bf.bits + if m := n - cap(bits); m > 0 { + bits = append(bits[:cap(bits)], make([]uint64, m)...) + } + bits = bits[:n] + bf.bits = bits +} diff --git a/pkg/filter/bloom_filter_test.go b/pkg/filter/bloom_filter_test.go new file mode 100644 index 00000000..e76204b7 --- /dev/null +++ b/pkg/filter/bloom_filter_test.go @@ -0,0 +1,136 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filter + +import ( + "encoding/binary" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBloomFilter(t *testing.T) { + assert := assert.New(t) + + bf := NewBloomFilter(3) + assert.NotNil(bf) + + items := [][]byte{ + []byte("skywalking"), + []byte("banyandb"), + []byte(""), + []byte("hello"), + []byte("world"), + } + + for i := 0; i < 3; i++ { + res := bf.Add(items[i]) + assert.True(res) + } + + for i := 0; i < 3; i++ { + mightContain := bf.MightContain(items[i]) + assert.True(mightContain) + } + + for i := 3; i < 5; i++ { + mightContain := bf.MightContain(items[i]) + assert.False(mightContain) + } +} + +func BenchmarkFilterAdd(b *testing.B) { + for _, n := range []int{1e3, 1e4, 1e5, 1e6, 1e7} { + data := generateTestData(n) + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + benchmarkFilterAdd(b, n, data) + }) + } +} + +func benchmarkFilterAdd(b *testing.B, n int, data [][]byte) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + bf := NewBloomFilter(n) + for pb.Next() { + for i := 0; i < n; i++ { + bf.Add(data[i]) + } + } + }) +} + +func BenchmarkFilterMightContainHit(b *testing.B) { + for _, n := range []int{1e3, 1e4, 1e5, 1e6, 1e7} { + data := generateTestData(n) + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + benchmarkFilterMightContainHit(b, n, data) + }) + } +} + +func benchmarkFilterMightContainHit(b *testing.B, n int, data [][]byte) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + bf := NewBloomFilter(n) + for i := 0; i < n; i++ { + bf.Add(data[i]) + } + for pb.Next() { + for i := 0; i < n; i++ { + if !bf.MightContain(data[i]) { + panic(fmt.Errorf("missing item %d", data[i])) + } + } + } + }) +} + +func BenchmarkFilterMightContainMiss(b *testing.B) { + for _, n := range []int{1e3, 1e4, 1e5, 1e6, 1e7} { + data := generateTestData(n) + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + benchmarkFilterMightContainMiss(b, n, data) + }) + } +} + +func benchmarkFilterMightContainMiss(b *testing.B, n int, data [][]byte) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + bf := NewBloomFilter(n) + for pb.Next() { + for i := 0; i < n; i++ { + if bf.MightContain(data[i]) { + panic(fmt.Errorf("unexpected item %d", data[i])) + } + } + } + }) +} + +func generateTestData(n int) [][]byte { + data := make([][]byte, 0) + for i := 0; i < n; i++ { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(i)) + data = append(data, buf) + } + return data +} diff --git a/pkg/index/inverted/analyzer.go b/pkg/index/analyzer/analyzer.go similarity index 70% rename from pkg/index/inverted/analyzer.go rename to pkg/index/analyzer/analyzer.go index 7c9c4423..d7b4b7cd 100644 --- a/pkg/index/inverted/analyzer.go +++ b/pkg/index/analyzer/analyzer.go @@ -15,17 +15,34 @@ // specific language governing permissions and limitations // under the License. -package inverted +// Package analyzer provides analyzers for indexing and searching. +package analyzer import ( "bytes" "unicode" "github.com/blugelabs/bluge/analysis" + "github.com/blugelabs/bluge/analysis/analyzer" "github.com/blugelabs/bluge/analysis/tokenizer" + + "github.com/apache/skywalking-banyandb/pkg/index" ) -func newURLAnalyzer() *analysis.Analyzer { +// Analyzers is a map that associates each IndexRule_Analyzer type with a corresponding Analyzer. +var Analyzers map[string]*analysis.Analyzer + +func init() { + Analyzers = map[string]*analysis.Analyzer{ + index.AnalyzerKeyword: analyzer.NewKeywordAnalyzer(), + index.AnalyzerSimple: analyzer.NewSimpleAnalyzer(), + index.AnalyzerStandard: analyzer.NewStandardAnalyzer(), + index.AnalyzerURL: NewURLAnalyzer(), + } +} + +// NewURLAnalyzer creates a new URL analyzer. +func NewURLAnalyzer() *analysis.Analyzer { return &analysis.Analyzer{ Tokenizer: tokenizer.NewCharacterTokenizer(func(r rune) bool { return unicode.IsLetter(r) || unicode.IsNumber(r) diff --git a/pkg/index/inverted/analyzer_test.go b/pkg/index/analyzer/analyzer_test.go similarity index 98% rename from pkg/index/inverted/analyzer_test.go rename to pkg/index/analyzer/analyzer_test.go index 0deeb400..ca45b561 100644 --- a/pkg/index/inverted/analyzer_test.go +++ b/pkg/index/analyzer/analyzer_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package inverted +package analyzer import ( "testing" @@ -90,7 +90,7 @@ func TestAlphanumericFilter(t *testing.T) { } func TestNewURLAnalyzer(t *testing.T) { - analyzer := newURLAnalyzer() + analyzer := NewURLAnalyzer() tests := []struct { input string diff --git a/pkg/index/index.go b/pkg/index/index.go index b1a4fa1d..9568bba3 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -442,4 +442,11 @@ type GetSearcher func(location databasev1.IndexRule_Type) (Searcher, error) type Filter interface { fmt.Stringer Execute(getSearcher GetSearcher, seriesID common.SeriesID, timeRange *RangeOpts) (posting.List, posting.List, error) + ShouldSkip(tagFamilyFilters FilterOp) (bool, error) +} + +// FilterOp is an interface for filtering operations based on skipping index. +type FilterOp interface { + Eq(tagName string, tagValue string) bool + Range(tagName string, rangeOpts RangeOpts) (bool, error) } diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 96dcde0e..fc503811 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -28,7 +28,6 @@ import ( roaringpkg "github.com/RoaringBitmap/roaring" "github.com/blugelabs/bluge" "github.com/blugelabs/bluge/analysis" - "github.com/blugelabs/bluge/analysis/analyzer" blugeIndex "github.com/blugelabs/bluge/index" "github.com/blugelabs/bluge/numeric" "github.com/blugelabs/bluge/search" @@ -40,6 +39,7 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/analyzer" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -62,18 +62,6 @@ var ( defaultProjection = []string{docIDField, timestampField} ) -// Analyzers is a map that associates each IndexRule_Analyzer type with a corresponding Analyzer. -var Analyzers map[string]*analysis.Analyzer - -func init() { - Analyzers = map[string]*analysis.Analyzer{ - index.AnalyzerKeyword: analyzer.NewKeywordAnalyzer(), - index.AnalyzerSimple: analyzer.NewSimpleAnalyzer(), - index.AnalyzerStandard: analyzer.NewStandardAnalyzer(), - index.AnalyzerURL: newURLAnalyzer(), - } -} - var _ index.Store = (*store)(nil) // StoreOpts wraps options to create an inverted index repository. @@ -135,7 +123,7 @@ func (s *store) Batch(batch index.Batch) error { tf.StoreValue() } if f.Key.Analyzer != index.AnalyzerUnspecified { - tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer]) + tf = tf.WithAnalyzer(analyzer.Analyzers[f.Key.Analyzer]) } doc.AddField(tf) if i == 0 { @@ -163,7 +151,7 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) { } indexConfig.CacheMaxBytes = opts.CacheMaxBytes config := bluge.DefaultConfigWithIndexConfig(indexConfig) - config.DefaultSearchAnalyzer = Analyzers[index.AnalyzerKeyword] + config.DefaultSearchAnalyzer = analyzer.Analyzers[index.AnalyzerKeyword] config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0) config = config.WithPrepareMergeCallback(opts.PrepareMergeCallback) w, err := bluge.OpenWriter(config) @@ -371,11 +359,11 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string, opts *modelv1.C } func getMatchOptions(analyzerOnIndexRule string, opts *modelv1.Condition_MatchOption) (*analysis.Analyzer, bluge.MatchQueryOperator) { - analyzer := Analyzers[analyzerOnIndexRule] + a := analyzer.Analyzers[analyzerOnIndexRule] operator := bluge.MatchQueryOperatorOr if opts != nil { if opts.Analyzer != index.AnalyzerUnspecified { - analyzer = Analyzers[opts.Analyzer] + a = analyzer.Analyzers[opts.Analyzer] } if opts.Operator != modelv1.Condition_MatchOption_OPERATOR_UNSPECIFIED { if opts.Operator == modelv1.Condition_MatchOption_OPERATOR_AND { @@ -383,7 +371,7 @@ func getMatchOptions(analyzerOnIndexRule string, opts *modelv1.Condition_MatchOp } } } - return analyzer, bluge.MatchQueryOperator(operator) + return a, bluge.MatchQueryOperator(operator) } func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, timestamps posting.List, err error) { diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index 1b2335aa..e7c4d013 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -33,6 +33,7 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/analyzer" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -109,7 +110,7 @@ func toDoc(d index.Document, toParseFieldNames bool) (*bluge.Document, []string) tf.Sortable() } if f.Key.Analyzer != index.AnalyzerUnspecified { - tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer]) + tf = tf.WithAnalyzer(analyzer.Analyzers[f.Key.Analyzer]) } } else { tf = bluge.NewStoredOnlyField(k, f.GetBytes()) diff --git a/pkg/query/logical/stream/index_filter.go b/pkg/query/logical/stream/index_filter.go index be3f090b..8cce87ad 100644 --- a/pkg/query/logical/stream/index_filter.go +++ b/pkg/query/logical/stream/index_filter.go @@ -34,6 +34,7 @@ import ( func buildLocalFilter(criteria *modelv1.Criteria, schema logical.Schema, entityDict map[string]int, entity []*modelv1.TagValue, + indexRuleType databasev1.IndexRule_Type, ) (index.Filter, [][]*modelv1.TagValue, error) { if criteria == nil { return nil, [][]*modelv1.TagValue{entity}, nil @@ -48,7 +49,7 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema logical.Schema, if parsedEntity != nil { return nil, parsedEntity, nil } - if ok, indexRule := schema.IndexDefined(cond.Name); ok { + if ok, indexRule := schema.IndexDefined(cond.Name); ok && indexRule.Type == indexRuleType { return parseConditionToFilter(cond, indexRule, expr, entity) } return ENode, [][]*modelv1.TagValue{entity}, nil @@ -58,16 +59,16 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema logical.Schema, return nil, nil, errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) } if le.GetLeft() == nil { - return buildLocalFilter(le.Right, schema, entityDict, entity) + return buildLocalFilter(le.Right, schema, entityDict, entity, indexRuleType) } if le.GetRight() == nil { - return buildLocalFilter(le.Left, schema, entityDict, entity) + return buildLocalFilter(le.Left, schema, entityDict, entity, indexRuleType) } - left, leftEntities, err := buildLocalFilter(le.Left, schema, entityDict, entity) + left, leftEntities, err := buildLocalFilter(le.Left, schema, entityDict, entity, indexRuleType) if err != nil { return nil, nil, err } - right, rightEntities, err := buildLocalFilter(le.Right, schema, entityDict, entity) + right, rightEntities, err := buildLocalFilter(le.Right, schema, entityDict, entity, indexRuleType) if err != nil { return nil, nil, err } @@ -75,8 +76,11 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema logical.Schema, if entities == nil { return nil, nil, nil } - if left == nil && right == nil { - return nil, entities, nil + if left == nil { + return right, entities, nil + } + if right == nil { + return left, entities, nil } if left == ENode && right == ENode { return ENode, entities, nil @@ -113,7 +117,10 @@ func parseConditionToFilter(cond *modelv1.Condition, indexRule *databasev1.Index case modelv1.Condition_BINARY_OP_EQ: return newEq(indexRule, expr), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_MATCH: - return newMatch(indexRule, expr, cond.MatchOption), [][]*modelv1.TagValue{entity}, nil + if indexRule.Type == databasev1.IndexRule_TYPE_INVERTED { + return newMatch(indexRule, expr, cond.MatchOption), [][]*modelv1.TagValue{entity}, nil + } + return nil, nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v for skipping index", cond) case modelv1.Condition_BINARY_OP_NE: return newNot(indexRule, newEq(indexRule, expr)), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_HAVING: @@ -264,6 +271,19 @@ func (an *andNode) Execute(searcher index.GetSearcher, seriesID common.SeriesID, return execute(searcher, seriesID, an.node, an, tr) } +func (an *andNode) ShouldSkip(tagFamilyFilters index.FilterOp) (bool, error) { + for _, sn := range an.node.SubNodes { + shouldSkip, err := sn.ShouldSkip(tagFamilyFilters) + if err != nil { + return shouldSkip, err + } + if shouldSkip { + return true, nil + } + } + return false, nil +} + func (an *andNode) MarshalJSON() ([]byte, error) { data := make(map[string]interface{}, 1) data["and"] = an.node.SubNodes @@ -309,6 +329,19 @@ func (on *orNode) Execute(searcher index.GetSearcher, seriesID common.SeriesID, return execute(searcher, seriesID, on.node, on, tr) } +func (on *orNode) ShouldSkip(tagFamilyFilters index.FilterOp) (bool, error) { + for _, sn := range on.node.SubNodes { + shouldSkip, err := sn.ShouldSkip(tagFamilyFilters) + if err != nil { + return shouldSkip, err + } + if !shouldSkip { + return false, nil + } + } + return true, nil +} + func (on *orNode) MarshalJSON() ([]byte, error) { data := make(map[string]interface{}, 1) data["or"] = on.node.SubNodes @@ -400,6 +433,10 @@ func (eq *eq) Execute(searcher index.GetSearcher, seriesID common.SeriesID, tr * return s.MatchTerms(eq.Expr.Field(eq.Key.toIndex(seriesID, tr))) } +func (eq *eq) ShouldSkip(tagFamilyFilters index.FilterOp) (bool, error) { + return !tagFamilyFilters.Eq(eq.Key.Tags[0], eq.Expr.String()), nil +} + func (eq *eq) MarshalJSON() ([]byte, error) { data := make(map[string]interface{}, 1) data["eq"] = eq.leaf @@ -442,6 +479,10 @@ func (match *match) Execute(searcher index.GetSearcher, seriesID common.SeriesID ) } +func (match *match) ShouldSkip(_ index.FilterOp) (bool, error) { + return false, nil +} + func (match *match) MarshalJSON() ([]byte, error) { data := make(map[string]interface{}, 1) data["match"] = match.leaf @@ -474,6 +515,10 @@ func (r *rangeOp) Execute(searcher index.GetSearcher, seriesID common.SeriesID, return s.Range(r.Key.toIndex(seriesID, tr), r.Opts) } +func (r *rangeOp) ShouldSkip(tagFamilyFilters index.FilterOp) (bool, error) { + return tagFamilyFilters.Range(r.Key.Tags[0], r.Opts) +} + func (r *rangeOp) MarshalJSON() ([]byte, error) { data := make(map[string]interface{}, 1) var builder strings.Builder @@ -521,6 +566,10 @@ func (an emptyNode) String() string { return "empty" } +func (an emptyNode) ShouldSkip(_ index.FilterOp) (bool, error) { + return false, nil +} + type bypassList struct{} func (bl bypassList) Contains(_ uint64) bool { diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go b/pkg/query/logical/stream/stream_plan_indexscan_local.go index 3374d8fe..7e465172 100644 --- a/pkg/query/logical/stream/stream_plan_indexscan_local.go +++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go @@ -46,7 +46,8 @@ var ( type localIndexScan struct { schema logical.Schema - filter index.Filter + invertedFilter index.Filter + skippingFilter index.Filter result model.StreamQueryResult ec executor.StreamExecutionContext order *logical.OrderBy @@ -94,7 +95,8 @@ func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, erro Name: i.metadata.GetName(), TimeRange: &i.timeRange, Entities: i.entities, - Filter: i.filter, + InvertedFilter: i.invertedFilter, + SkippingFilter: i.skippingFilter, Order: orderBy, TagProjection: i.projectionTags, MaxElementSize: i.maxElementSize, @@ -110,7 +112,7 @@ func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, erro func (i *localIndexScan) String() string { return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=%s; orderBy=%s; limit=%d", i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(), - i.filter, logical.FormatTagRefs(", ", i.projectionTagRefs...), i.order, i.maxElementSize) + i.invertedFilter, logical.FormatTagRefs(", ", i.projectionTagRefs...), i.order, i.maxElementSize) } func (i *localIndexScan) Children() []logical.Plan { diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go b/pkg/query/logical/stream/stream_plan_tag_filter.go index b1a100df..69834adb 100644 --- a/pkg/query/logical/stream/stream_plan_tag_filter.go +++ b/pkg/query/logical/stream/stream_plan_tag_filter.go @@ -23,6 +23,7 @@ import ( "time" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/pkg/index" @@ -56,7 +57,11 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) (logical.Plan, error) entity[idx] = pbv1.AnyTagValue } var err error - ctx.filter, ctx.entities, err = buildLocalFilter(uis.criteria, s, entityDict, entity) + ctx.invertedFilter, ctx.entities, err = buildLocalFilter(uis.criteria, s, entityDict, entity, databasev1.IndexRule_TYPE_INVERTED) + if err != nil { + return nil, err + } + ctx.skippingFilter, _, err = buildLocalFilter(uis.criteria, s, entityDict, entity, databasev1.IndexRule_TYPE_SKIPPING) if err != nil { return nil, err } @@ -97,7 +102,8 @@ func (uis *unresolvedTagFilter) selectIndexScanner(ctx *analyzeContext, ec execu projectionTagRefs: ctx.projTagsRefs, projectionTags: ctx.projectionTags, metadata: uis.metadata, - filter: ctx.filter, + invertedFilter: ctx.invertedFilter, + skippingFilter: ctx.skippingFilter, entities: ctx.entities, l: logger.GetLogger("query", "stream", "local-index"), ec: ec, @@ -119,7 +125,8 @@ func tagFilter(startTime, endTime time.Time, metadata *commonv1.Metadata, criter type analyzeContext struct { s logical.Schema - filter index.Filter + invertedFilter index.Filter + skippingFilter index.Filter entities [][]*modelv1.TagValue projectionTags []model.TagProjection globalConditions []interface{} diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go index 445ddfcc..2433fa70 100644 --- a/pkg/query/logical/tag_filter.go +++ b/pkg/query/logical/tag_filter.go @@ -22,10 +22,12 @@ import ( "fmt" "strings" + "github.com/blugelabs/bluge/analysis" "github.com/pkg/errors" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index/analyzer" ) var errUnsupportedLogicalOperation = errors.New("unsupported logical operation") @@ -77,7 +79,7 @@ func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) { return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false) } -// BuildTagFilter returns a TagFilter if predicates doesn't match any indices. +// BuildTagFilter returns a TagFilter. func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, indexChecker IndexChecker, hasGlobalIndex bool) (TagFilter, error) { if criteria == nil { return DummyFilter, nil @@ -85,17 +87,17 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, index switch criteria.GetExp().(type) { case *modelv1.Criteria_Condition: cond := criteria.GetCondition() - expr, err := parseExpr(cond.Value) + var expr ComparableExpr + var err error + _, indexRule := indexChecker.IndexRuleDefined(cond.Name) + expr, err = parseExpr(cond.Value, analyzer.Analyzers[indexRule.GetAnalyzer()]) if err != nil { return nil, err } - if ok, _ := indexChecker.IndexDefined(cond.Name); ok { - return DummyFilter, nil - } if _, ok := entityDict[cond.Name]; ok { return DummyFilter, nil } - return parseFilter(cond, expr) + return parseFilter(cond, expr, indexChecker) case *modelv1.Criteria_Le: le := criteria.GetLe() left, err := BuildTagFilter(le.Left, entityDict, indexChecker, hasGlobalIndex) @@ -126,7 +128,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, index return nil, ErrInvalidCriteriaType } -func parseFilter(cond *modelv1.Condition, expr ComparableExpr) (TagFilter, error) { +func parseFilter(cond *modelv1.Condition, expr ComparableExpr, indexChecker IndexChecker) (TagFilter, error) { switch cond.Op { case modelv1.Condition_BINARY_OP_GT: return newRangeTag(cond.Name, rangeOpts{ @@ -148,6 +150,8 @@ func parseFilter(cond *modelv1.Condition, expr ComparableExpr) (TagFilter, error }), nil case modelv1.Condition_BINARY_OP_EQ: return newEqTag(cond.Name, expr), nil + case modelv1.Condition_BINARY_OP_MATCH: + return newMatchTag(cond.Name, expr, indexChecker), nil case modelv1.Condition_BINARY_OP_NE: return newNotTag(newEqTag(cond.Name, expr)), nil case modelv1.Condition_BINARY_OP_HAVING: @@ -163,7 +167,21 @@ func parseFilter(cond *modelv1.Condition, expr ComparableExpr) (TagFilter, error } } -func parseExpr(value *modelv1.TagValue) (ComparableExpr, error) { +func parseExpr(value *modelv1.TagValue, analyzer *analysis.Analyzer) (ComparableExpr, error) { + if analyzer != nil { + if _, ok := value.Value.(*modelv1.TagValue_Str); ok { + tokenStream := analyzer.Analyze([]byte(value.GetStr().Value)) + strArr := make([]string, 0, len(tokenStream)) + for _, token := range tokenStream { + strArr = append(strArr, string(token.Term)) + } + return &strArrLiteral{ + arr: strArr, + }, nil + } + return nil, errors.WithMessagef(ErrUnsupportedConditionValue, "tag filter parses %v", value) + } + switch v := value.Value.(type) { case *modelv1.TagValue_Str: return &strLiteral{v.Str.GetValue()}, nil @@ -213,7 +231,7 @@ func (n *logicalNode) append(sub TagFilter) *logicalNode { return n } -func matchTag(accessor TagValueIndexAccessor, registry TagSpecRegistry, n *logicalNode, lp logicalNodeOP) (bool, error) { +func isMatch(accessor TagValueIndexAccessor, registry TagSpecRegistry, n *logicalNode, lp logicalNodeOP) (bool, error) { var result *bool for _, sn := range n.SubNodes { r, err := sn.Match(accessor, registry) @@ -252,7 +270,7 @@ func (an *andLogicalNode) merge(bb ...bool) bool { } func (an *andLogicalNode) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) { - return matchTag(accessor, registry, an.logicalNode, an) + return isMatch(accessor, registry, an.logicalNode, an) } func (an *andLogicalNode) MarshalJSON() ([]byte, error) { @@ -287,7 +305,7 @@ func (on *orLogicalNode) merge(bb ...bool) bool { } func (on *orLogicalNode) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) { - return matchTag(accessor, registry, on.logicalNode, on) + return isMatch(accessor, registry, on.logicalNode, on) } func (on *orLogicalNode) MarshalJSON() ([]byte, error) { @@ -356,7 +374,7 @@ func newInTag(tagName string, values LiteralExpr) *inTag { } func (h *inTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) { - expr, err := tagExpr(accessor, registry, h.Name) + expr, err := tagExpr(accessor, registry, h.Name, nil) if err != nil { return false, err } @@ -377,7 +395,7 @@ func newEqTag(tagName string, values LiteralExpr) *eqTag { } func (eq *eqTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) { - expr, err := tagExpr(accessor, registry, eq.Name) + expr, err := tagExpr(accessor, registry, eq.Name, nil) if err != nil { return false, err } @@ -416,7 +434,7 @@ func newRangeTag(tagName string, opts rangeOpts) *rangeTag { } func (r *rangeTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) { - expr, err := tagExpr(accessor, registry, r.Name) + expr, err := tagExpr(accessor, registry, r.Name, nil) if err != nil { return false, err } @@ -475,7 +493,7 @@ func (r *rangeTag) MarshalJSON() ([]byte, error) { builder.WriteString(")") } } - data["key"] = r.tagLeaf + data["key"] = r.tagLeaf.Name data["range"] = builder.String() return json.Marshal(data) } @@ -484,15 +502,49 @@ func (r *rangeTag) String() string { return convert.JSONToString(r) } -func tagExpr(accessor TagValueIndexAccessor, registry TagSpecRegistry, tagName string) (ComparableExpr, error) { +func tagExpr(accessor TagValueIndexAccessor, registry TagSpecRegistry, tagName string, analyzer *analysis.Analyzer) (ComparableExpr, error) { if tagSpec := registry.FindTagSpecByName(tagName); tagSpec != nil { if tagVal := accessor.GetTagValue(tagSpec.TagFamilyIdx, tagSpec.TagIdx); tagVal != nil { - return parseExpr(tagVal) + return parseExpr(tagVal, analyzer) } } return nil, errTagNotDefined } +type matchTag struct { + *tagLeaf + indexChecker IndexChecker +} + +func newMatchTag(tagName string, values LiteralExpr, indexChecker IndexChecker) *matchTag { + return &matchTag{ + tagLeaf: &tagLeaf{ + Name: tagName, + Expr: values, + }, + indexChecker: indexChecker, + } +} + +func (m *matchTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) { + _, indexRule := m.indexChecker.IndexRuleDefined(m.Name) + expr, err := tagExpr(accessor, registry, m.Name, analyzer.Analyzers[indexRule.GetAnalyzer()]) + if err != nil { + return false, err + } + return expr.Contains(m.Expr), nil +} + +func (m *matchTag) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["match"] = m.tagLeaf + return json.Marshal(data) +} + +func (m *matchTag) String() string { + return convert.JSONToString(m) +} + type havingTag struct { *tagLeaf } @@ -507,7 +559,7 @@ func newHavingTag(tagName string, values LiteralExpr) *havingTag { } func (h *havingTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) { - expr, err := tagExpr(accessor, registry, h.Name) + expr, err := tagExpr(accessor, registry, h.Name, nil) if err != nil { return false, err } diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index 40b4a1a7..39a7858b 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -87,7 +87,8 @@ type StreamQueryOptions struct { Name string TimeRange *timestamp.TimeRange Entities [][]*modelv1.TagValue - Filter index.Filter + InvertedFilter index.Filter + SkippingFilter index.Filter Order *index.OrderBy TagProjection []TagProjection MaxElementSize int @@ -98,7 +99,8 @@ func (s *StreamQueryOptions) Reset() { s.Name = "" s.TimeRange = nil s.Entities = nil - s.Filter = nil + s.InvertedFilter = nil + s.SkippingFilter = nil s.Order = nil s.TagProjection = nil s.MaxElementSize = 0 @@ -117,7 +119,8 @@ func (s *StreamQueryOptions) CopyFrom(other *StreamQueryOptions) { s.Entities = nil } - s.Filter = other.Filter + s.InvertedFilter = other.InvertedFilter + s.SkippingFilter = other.SkippingFilter s.Order = other.Order // Deep copy if TagProjection is a slice diff --git a/pkg/test/stream/testdata/index_rules/db.type.json b/pkg/test/stream/testdata/index_rules/db.type.json index f23fd03f..8e39ca3b 100644 --- a/pkg/test/stream/testdata/index_rules/db.type.json +++ b/pkg/test/stream/testdata/index_rules/db.type.json @@ -7,6 +7,6 @@ "tags": [ "db.type" ], - "type": "TYPE_INVERTED", + "type": "TYPE_SKIPPING", "updated_at": "2021-04-15T01:30:15.01Z" } diff --git a/pkg/test/stream/testdata/index_rules/endpoint_id.json b/pkg/test/stream/testdata/index_rules/endpoint_id.json index 89272177..a82ec86a 100644 --- a/pkg/test/stream/testdata/index_rules/endpoint_id.json +++ b/pkg/test/stream/testdata/index_rules/endpoint_id.json @@ -7,6 +7,6 @@ "tags": [ "endpoint_id" ], - "type": "TYPE_INVERTED", + "type": "TYPE_SKIPPING", "updated_at": "2021-04-15T01:30:15.01Z" } diff --git a/pkg/test/stream/testdata/index_rules/http.method.json b/pkg/test/stream/testdata/index_rules/http.method.json index b6ce4e9b..23bc230e 100644 --- a/pkg/test/stream/testdata/index_rules/http.method.json +++ b/pkg/test/stream/testdata/index_rules/http.method.json @@ -7,6 +7,6 @@ "tags": [ "http.method" ], - "type": "TYPE_INVERTED", + "type": "TYPE_SKIPPING", "updated_at": "2021-04-15T01:30:15.01Z" } diff --git a/pkg/test/stream/testdata/index_rules/mq.broker.json b/pkg/test/stream/testdata/index_rules/mq.broker.json index 95180828..1fd748d6 100644 --- a/pkg/test/stream/testdata/index_rules/mq.broker.json +++ b/pkg/test/stream/testdata/index_rules/mq.broker.json @@ -7,6 +7,6 @@ "tags": [ "mq.broker" ], - "type": "TYPE_INVERTED", + "type": "TYPE_SKIPPING", "updated_at": "2021-04-15T01:30:15.01Z" } diff --git a/pkg/test/stream/testdata/index_rules/mq.queue.json b/pkg/test/stream/testdata/index_rules/mq.queue.json index de4aa2eb..beb77bb4 100644 --- a/pkg/test/stream/testdata/index_rules/mq.queue.json +++ b/pkg/test/stream/testdata/index_rules/mq.queue.json @@ -7,6 +7,6 @@ "tags": [ "mq.queue" ], - "type": "TYPE_INVERTED", + "type": "TYPE_SKIPPING", "updated_at": "2021-04-15T01:30:15.01Z" } diff --git a/pkg/test/stream/testdata/index_rules/mq.topic.json b/pkg/test/stream/testdata/index_rules/mq.topic.json index c292a582..cc5544dd 100644 --- a/pkg/test/stream/testdata/index_rules/mq.topic.json +++ b/pkg/test/stream/testdata/index_rules/mq.topic.json @@ -7,6 +7,6 @@ "tags": [ "mq.topic" ], - "type": "TYPE_INVERTED", + "type": "TYPE_SKIPPING", "updated_at": "2021-04-15T01:30:15.01Z" } diff --git a/pkg/test/stream/testdata/index_rules/status_code.json b/pkg/test/stream/testdata/index_rules/status_code.json index dcf04473..ccaa9f8d 100644 --- a/pkg/test/stream/testdata/index_rules/status_code.json +++ b/pkg/test/stream/testdata/index_rules/status_code.json @@ -7,6 +7,6 @@ "tags": [ "status_code" ], - "type": "TYPE_INVERTED", + "type": "TYPE_SKIPPING", "updated_at": "2021-04-15T01:30:15.01Z" } diff --git a/pkg/test/stream/testdata/index_rules/trace_id.json b/pkg/test/stream/testdata/index_rules/trace_id.json index add7fc33..df92f0f9 100644 --- a/pkg/test/stream/testdata/index_rules/trace_id.json +++ b/pkg/test/stream/testdata/index_rules/trace_id.json @@ -7,6 +7,6 @@ "tags": [ "trace_id" ], - "type": "TYPE_INVERTED", + "type": "TYPE_SKIPPING", "updated_at": "2021-04-15T01:30:15.01Z" }