This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new ef05f8a7 Implement skipping index for stream (#665)
ef05f8a7 is described below

commit ef05f8a7714f4a54b24bbaf76b3df5b528eafccf
Author: Huang Youliang <butterbright0...@gmail.com>
AuthorDate: Tue Jul 1 21:36:34 2025 +0800

    Implement skipping index for stream (#665)
    
    * Implement skipping index writes
    
    ---------
    
    Co-authored-by: Gao Hongtao <hanahm...@gmail.com>
    Co-authored-by: 吴晟 Wu Sheng <wu.sh...@foxmail.com>
---
 CHANGES.md                                         |   1 +
 api/proto/banyandb/database/v1/schema.proto        |   1 +
 banyand/internal/storage/version.go                |   2 +-
 banyand/internal/storage/versions.yml              |   2 +-
 banyand/stream/benchmark_test.go                   |   7 +-
 banyand/stream/block.go                            |  44 +++-
 banyand/stream/block_scanner.go                    |   7 +-
 banyand/stream/block_test.go                       |  16 +-
 banyand/stream/block_writer.go                     |  32 ++-
 banyand/stream/elements.go                         |   2 +
 banyand/stream/part.go                             |  32 ++-
 banyand/stream/part_iter.go                        |  27 ++-
 banyand/stream/part_iter_test.go                   |   2 +-
 banyand/stream/query.go                            |   4 +-
 banyand/stream/query_by_idx.go                     |   2 +-
 banyand/stream/tag.go                              |  17 +-
 banyand/stream/tag_filter.go                       | 222 +++++++++++++++++++++
 banyand/stream/tag_filter_test.go                  | 161 +++++++++++++++
 banyand/stream/tag_metadata.go                     |  23 ++-
 banyand/stream/tag_metadata_test.go                |  84 +++++---
 banyand/stream/tag_test.go                         |   7 +-
 banyand/stream/tstable.go                          |   5 +-
 banyand/stream/tstable_test.go                     |   2 +-
 banyand/stream/write.go                            |  86 ++++++--
 docs/api-reference.md                              |   1 +
 pkg/encoding/bytes.go                              |  10 +-
 pkg/encoding/dictionary.go                         |   2 +-
 pkg/filter/bloom_filter.go                         | 132 ++++++++++++
 pkg/filter/bloom_filter_test.go                    | 136 +++++++++++++
 pkg/index/{inverted => analyzer}/analyzer.go       |  21 +-
 pkg/index/{inverted => analyzer}/analyzer_test.go  |   4 +-
 pkg/index/index.go                                 |   7 +
 pkg/index/inverted/inverted.go                     |  24 +--
 pkg/index/inverted/inverted_series.go              |   3 +-
 pkg/query/logical/stream/index_filter.go           |  65 +++++-
 .../logical/stream/stream_plan_indexscan_local.go  |   8 +-
 pkg/query/logical/stream/stream_plan_tag_filter.go |  13 +-
 pkg/query/logical/tag_filter.go                    |  88 ++++++--
 pkg/query/model/model.go                           |   9 +-
 pkg/test/stream/testdata/index_rules/db.type.json  |   2 +-
 .../stream/testdata/index_rules/endpoint_id.json   |   2 +-
 .../stream/testdata/index_rules/http.method.json   |   2 +-
 .../stream/testdata/index_rules/mq.broker.json     |   2 +-
 pkg/test/stream/testdata/index_rules/mq.queue.json |   2 +-
 pkg/test/stream/testdata/index_rules/mq.topic.json |   2 +-
 .../stream/testdata/index_rules/status_code.json   |   2 +-
 pkg/test/stream/testdata/index_rules/trace_id.json |   2 +-
 47 files changed, 1163 insertions(+), 164 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index aba5d5f6..99804b8a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -16,6 +16,7 @@ Release Notes.
 - Add a wait and retry to write handlers to avoid the local metadata cache 
being loaded.
 - Implement primary block cache for measure.
 - Implement versioning properties and replace physical deletion with the 
tombstone mechanism for the property database.
+- Implement skipping index for stream.
 - Add Load Balancer Feature to Liaison. 
 - Implement fadvise for large files to prevent page cache pollution.
 - Data Model: Introduce the `Trace` data model to store the trace/span data.
diff --git a/api/proto/banyandb/database/v1/schema.proto 
b/api/proto/banyandb/database/v1/schema.proto
index a7e61ce8..c6e71ebb 100644
--- a/api/proto/banyandb/database/v1/schema.proto
+++ b/api/proto/banyandb/database/v1/schema.proto
@@ -164,6 +164,7 @@ message IndexRule {
   enum Type {
     TYPE_UNSPECIFIED = 0;
     TYPE_INVERTED = 1;
+    TYPE_SKIPPING = 2;
     // TYPE_TREE is a tree index, which is used for storing hierarchical data.
     TYPE_TREE = 3;
   }
diff --git a/banyand/internal/storage/version.go 
b/banyand/internal/storage/version.go
index 8225629e..2fdac4ca 100644
--- a/banyand/internal/storage/version.go
+++ b/banyand/internal/storage/version.go
@@ -28,7 +28,7 @@ import (
 
 const (
        metadataFilename           = "metadata"
-       currentVersion             = "1.2.0"
+       currentVersion             = "1.3.0"
        compatibleVersionsKey      = "versions"
        compatibleVersionsFilename = "versions.yml"
 )
diff --git a/banyand/internal/storage/versions.yml 
b/banyand/internal/storage/versions.yml
index eb98c0e6..7f01e316 100644
--- a/banyand/internal/storage/versions.yml
+++ b/banyand/internal/storage/versions.yml
@@ -14,4 +14,4 @@
 # limitations under the License.
 
 versions:
-- 1.2.0
+- 1.3.0
diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go
index 9c7c68db..273d91b7 100644
--- a/banyand/stream/benchmark_test.go
+++ b/banyand/stream/benchmark_test.go
@@ -94,6 +94,10 @@ func (mf mockFilter) Execute(_ index.GetSearcher, seriesID 
common.SeriesID, _ *i
        return mf.index[mf.value][seriesID], roaring.DummyPostingList, nil
 }
 
+func (mf mockFilter) ShouldSkip(_ index.FilterOp) (bool, error) {
+       return false, nil
+}
+
 type databaseSupplier struct {
        database atomic.Value
 }
@@ -311,7 +315,8 @@ func generateStreamQueryOptions(p parameter, midx 
mockIndex) model.StreamQueryOp
                Name:           "benchmark",
                TimeRange:      &timeRange,
                Entities:       entities,
-               Filter:         filter,
+               InvertedFilter: filter,
+               SkippingFilter: nil,
                Order:          order,
                TagProjection:  []model.TagProjection{tagProjection},
                MaxElementSize: math.MaxInt32,
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index e4a27945..e6e6e5e0 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -18,6 +18,7 @@
 package stream
 
 import (
+       "bytes"
        "fmt"
        "sort"
 
@@ -25,7 +26,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       "github.com/apache/skywalking-banyandb/pkg/bytes"
+       pkgbytes "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -103,6 +104,28 @@ func (b *block) processTags(tf tagValues, tagFamilyIdx, i 
int, elementsLen int)
                tags[j].resizeValues(elementsLen)
                tags[j].valueType = t.valueType
                tags[j].values[i] = t.marshal()
+               if !t.indexed {
+                       continue
+               }
+               if tags[j].filter == nil {
+                       filter := generateBloomFilter()
+                       tags[j].filter = filter
+               }
+               tags[j].filter.SetN(elementsLen)
+               tags[j].filter.ResizeBits((elementsLen + 63) / 64)
+               tags[j].filter.Add(t.value)
+               if t.valueType == pbv1.ValueTypeInt64 {
+                       if len(tags[j].min) == 0 {
+                               tags[j].min = t.value
+                       } else if bytes.Compare(t.value, tags[j].min) == -1 {
+                               tags[j].min = t.value
+                       }
+                       if len(tags[j].max) == 0 {
+                               tags[j].max = t.value
+                       } else if bytes.Compare(t.value, tags[j].max) == 1 {
+                               tags[j].max = t.value
+                       }
+               }
        }
 }
 
@@ -159,12 +182,12 @@ func (b *block) validate() {
 }
 
 func (b *block) marshalTagFamily(tf tagFamily, bm *blockMetadata, ww *writers) 
{
-       hw, w := ww.getTagMetadataWriterAndTagWriter(tf.name)
+       hw, w, fw := ww.getWriters(tf.name)
        cc := tf.tags
        cfm := generateTagFamilyMetadata()
        cmm := cfm.resizeTagMetadata(len(cc))
        for i := range cc {
-               cc[i].mustWriteTo(&cmm[i], w)
+               cc[i].mustWriteTo(&cmm[i], w, fw)
        }
        bb := bigValuePool.Generate()
        defer bigValuePool.Release(bb)
@@ -186,7 +209,7 @@ func (b *block) unmarshalTagFamily(decoder 
*encoding.BytesBlockDecoder, tfIndex
                return
        }
        bb := bigValuePool.Generate()
-       bb.Buf = bytes.ResizeExact(bb.Buf, int(tagFamilyMetadataBlock.size))
+       bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(tagFamilyMetadataBlock.size))
        fs.MustReadData(metaReader, int64(tagFamilyMetadataBlock.offset), 
bb.Buf)
        tfm := generateTagFamilyMetadata()
        defer releaseTagFamilyMetadata(tfm)
@@ -221,7 +244,7 @@ func (b *block) unmarshalTagFamilyFromSeqReaders(decoder 
*encoding.BytesBlockDec
                logger.Panicf("offset %d must be equal to bytesRead %d", 
columnFamilyMetadataBlock.offset, metaReader.bytesRead)
        }
        bb := bigValuePool.Generate()
-       bb.Buf = bytes.ResizeExact(bb.Buf, int(columnFamilyMetadataBlock.size))
+       bb.Buf = pkgbytes.ResizeExact(bb.Buf, 
int(columnFamilyMetadataBlock.size))
        metaReader.mustReadFull(bb.Buf)
        tfm := generateTagFamilyMetadata()
        defer releaseTagFamilyMetadata(tfm)
@@ -330,7 +353,7 @@ func mustWriteTimestampsTo(tm *timestampsMetadata, 
timestamps []int64, elementID
 func mustReadTimestampsFrom(timestamps []int64, elementIDs []uint64, tm 
*timestampsMetadata, count int, reader fs.Reader) ([]int64, []uint64) {
        bb := bigValuePool.Generate()
        defer bigValuePool.Release(bb)
-       bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size))
+       bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(tm.size))
        fs.MustReadData(reader, int64(tm.offset), bb.Buf)
        return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, 
count, reader.Path(), bb.Buf)
 }
@@ -359,7 +382,7 @@ func mustSeqReadTimestampsFrom(timestamps []int64, 
elementIDs []uint64, tm *time
        }
        bb := bigValuePool.Generate()
        defer bigValuePool.Release(bb)
-       bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size))
+       bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(tm.size))
        reader.mustReadFull(bb.Buf)
        return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, 
count, reader.Path(), bb.Buf)
 }
@@ -373,6 +396,13 @@ func generateBlock() *block {
 }
 
 func releaseBlock(b *block) {
+       for _, tf := range b.tagFamilies {
+               for _, t := range tf.tags {
+                       if t.filter != nil {
+                               releaseBloomFilter(t.filter)
+                       }
+               }
+       }
        b.reset()
        blockPool.Put(b)
 }
diff --git a/banyand/stream/block_scanner.go b/banyand/stream/block_scanner.go
index 889adb64..601b3af0 100644
--- a/banyand/stream/block_scanner.go
+++ b/banyand/stream/block_scanner.go
@@ -162,7 +162,7 @@ func getBlockScanner(ctx context.Context, segment 
storage.Segment[*tsTable, *opt
 }
 
 func search(ctx context.Context, qo queryOptions, seriesList 
[]common.SeriesID, tw *tsTable, tr *index.RangeOpts) (pl posting.List, plTS 
posting.List, err error) {
-       if qo.Filter == nil || qo.Filter == logicalstream.ENode {
+       if qo.InvertedFilter == nil || qo.InvertedFilter == logicalstream.ENode 
{
                return nil, nil, nil
        }
        tracer := query.GetTracer(ctx)
@@ -184,7 +184,7 @@ func search(ctx context.Context, qo queryOptions, 
seriesList []common.SeriesID,
        for i := range seriesList {
                sid[i] = uint64(seriesList[i])
        }
-       pl, plTS, err = tw.Index().Search(ctx, sid, qo.Filter, tr)
+       pl, plTS, err = tw.Index().Search(ctx, sid, qo.InvertedFilter, tr)
        if err != nil {
                return nil, nil, err
        }
@@ -222,7 +222,7 @@ func (bsn *blockScanner) scan(ctx context.Context, blockCh 
chan *blockScanResult
        defer releaseBlockMetadataArray(bma)
        ti := generateTstIter()
        defer releaseTstIter(ti)
-       ti.init(bma, parts, bsn.qo.sortedSids, bsn.qo.minTimestamp, 
bsn.qo.maxTimestamp)
+       ti.init(bma, parts, bsn.qo.sortedSids, bsn.qo.minTimestamp, 
bsn.qo.maxTimestamp, bsn.qo.SkippingFilter)
        batch := generateBlockScanResultBatch()
        if ti.Error() != nil {
                batch.err = fmt.Errorf("cannot init tstIter: %w", ti.Error())
@@ -285,7 +285,6 @@ func (bsn *blockScanner) scan(ctx context.Context, blockCh 
chan *blockScanResult
                select {
                case blockCh <- batch:
                case <-ctx.Done():
-
                        releaseBlockScanResultBatch(batch)
                }
                return
diff --git a/banyand/stream/block_test.go b/banyand/stream/block_test.go
index 8d784cd3..8b94e76b 100644
--- a/banyand/stream/block_test.go
+++ b/banyand/stream/block_test.go
@@ -250,13 +250,14 @@ func Test_mustWriteAndReadTimestamps(t *testing.T) {
 }
 
 func Test_marshalAndUnmarshalTagFamily(t *testing.T) {
-       metaBuffer, dataBuffer := &bytes.Buffer{}, &bytes.Buffer{}
+       metaBuffer, dataBuffer, filterBuffer := &bytes.Buffer{}, 
&bytes.Buffer{}, &bytes.Buffer{}
        ww := &writers{
-               mustCreateTagFamilyWriters: func(_ string) (fs.Writer, 
fs.Writer) {
-                       return metaBuffer, dataBuffer
+               mustCreateTagFamilyWriters: func(_ string) (fs.Writer, 
fs.Writer, fs.Writer) {
+                       return metaBuffer, dataBuffer, filterBuffer
                },
                tagFamilyMetadataWriters: make(map[string]*writer),
                tagFamilyWriters:         make(map[string]*writer),
+               tagFamilyFilterWriters:   make(map[string]*writer),
        }
        b := &conventionalBlock
        tagProjection := toTagProjection(*b)
@@ -286,7 +287,7 @@ func Test_marshalAndUnmarshalTagFamily(t *testing.T) {
        unmarshaled.unmarshalTagFamily(decoder, tfIndex, name, 
bm.getTagFamilyMetadata(name), tagProjection[name], metaBuffer, dataBuffer, 1)
 
        if diff := cmp.Diff(unmarshaled.tagFamilies[0], b.tagFamilies[0],
-               cmp.AllowUnexported(tagFamily{}, tag{}),
+               cmp.AllowUnexported(tagFamily{}, tag{}, tagFilter{}),
        ); diff != "" {
                t.Errorf("block.unmarshalTagFamily() (-got +want):\n%s", diff)
        }
@@ -306,7 +307,7 @@ func Test_marshalAndUnmarshalTagFamily(t *testing.T) {
        unmarshaled2.unmarshalTagFamilyFromSeqReaders(decoder, tfIndex, name, 
bm.getTagFamilyMetadata(name), metaReader, valueReader)
 
        if diff := cmp.Diff(unmarshaled2.tagFamilies[0], b.tagFamilies[0],
-               cmp.AllowUnexported(tagFamily{}, tag{}),
+               cmp.AllowUnexported(tagFamily{}, tag{}, tagFilter{}),
        ); diff != "" {
                t.Errorf("block.unmarshalTagFamilyFromSeqReaders() (-got 
+want):\n%s", diff)
        }
@@ -317,11 +318,12 @@ func Test_marshalAndUnmarshalBlock(t *testing.T) {
        timestampWriter := &writer{}
        timestampWriter.init(timestampBuffer)
        ww := &writers{
-               mustCreateTagFamilyWriters: func(_ string) (fs.Writer, 
fs.Writer) {
-                       return &bytes.Buffer{}, &bytes.Buffer{}
+               mustCreateTagFamilyWriters: func(_ string) (fs.Writer, 
fs.Writer, fs.Writer) {
+                       return &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}
                },
                tagFamilyMetadataWriters: make(map[string]*writer),
                tagFamilyWriters:         make(map[string]*writer),
+               tagFamilyFilterWriters:   make(map[string]*writer),
                timestampsWriter:         *timestampWriter,
        }
        p := &part{
diff --git a/banyand/stream/block_writer.go b/banyand/stream/block_writer.go
index d20a3ea9..33fc0764 100644
--- a/banyand/stream/block_writer.go
+++ b/banyand/stream/block_writer.go
@@ -57,7 +57,7 @@ func (w *writer) MustClose() {
        w.reset()
 }
 
-type mustCreateTagFamilyWriters func(name string) (fs.Writer, fs.Writer)
+type mustCreateTagFamilyWriters func(name string) (fs.Writer, fs.Writer, 
fs.Writer)
 
 type writers struct {
        mustCreateTagFamilyWriters mustCreateTagFamilyWriters
@@ -65,6 +65,7 @@ type writers struct {
        primaryWriter              writer
        tagFamilyMetadataWriters   map[string]*writer
        tagFamilyWriters           map[string]*writer
+       tagFamilyFilterWriters     map[string]*writer
        timestampsWriter           writer
 }
 
@@ -82,6 +83,10 @@ func (sw *writers) reset() {
                w.reset()
                delete(sw.tagFamilyWriters, i)
        }
+       for i, w := range sw.tagFamilyFilterWriters {
+               w.reset()
+               delete(sw.tagFamilyFilterWriters, i)
+       }
 }
 
 func (sw *writers) totalBytesWritten() uint64 {
@@ -93,6 +98,9 @@ func (sw *writers) totalBytesWritten() uint64 {
        for _, w := range sw.tagFamilyWriters {
                n += w.bytesWritten
        }
+       for _, w := range sw.tagFamilyFilterWriters {
+               n += w.bytesWritten
+       }
        return n
 }
 
@@ -107,22 +115,29 @@ func (sw *writers) MustClose() {
        for _, w := range sw.tagFamilyWriters {
                w.MustClose()
        }
+       for _, w := range sw.tagFamilyFilterWriters {
+               w.MustClose()
+       }
 }
 
-func (sw *writers) getTagMetadataWriterAndTagWriter(tagName string) (*writer, 
*writer) {
+func (sw *writers) getWriters(tagName string) (*writer, *writer, *writer) {
        thw, ok := sw.tagFamilyMetadataWriters[tagName]
        tw := sw.tagFamilyWriters[tagName]
+       tfw := sw.tagFamilyFilterWriters[tagName]
        if ok {
-               return thw, tw
+               return thw, tw, tfw
        }
-       hw, w := sw.mustCreateTagFamilyWriters(tagName)
+       hw, w, fw := sw.mustCreateTagFamilyWriters(tagName)
        thw = new(writer)
        thw.init(hw)
        tw = new(writer)
        tw.init(w)
+       tfw = new(writer)
+       tfw.init(fw)
        sw.tagFamilyMetadataWriters[tagName] = thw
        sw.tagFamilyWriters[tagName] = tw
-       return thw, tw
+       sw.tagFamilyFilterWriters[tagName] = tfw
+       return thw, tw, tfw
 }
 
 type blockWriter struct {
@@ -172,11 +187,13 @@ func (bw *blockWriter) MustInitForMemPart(mp *memPart) {
 func (bw *blockWriter) mustInitForFilePart(fileSystem fs.FileSystem, path 
string, shouldCache bool) {
        bw.reset()
        fileSystem.MkdirPanicIfExist(path, storage.DirPerm)
-       bw.writers.mustCreateTagFamilyWriters = func(name string) (fs.Writer, 
fs.Writer) {
+       bw.writers.mustCreateTagFamilyWriters = func(name string) (fs.Writer, 
fs.Writer, fs.Writer) {
                metaPath := filepath.Join(path, 
name+tagFamiliesMetadataFilenameExt)
                dataPath := filepath.Join(path, name+tagFamiliesFilenameExt)
+               fitlerPath := filepath.Join(path, 
name+tagFamiliesFilterFilenameExt)
                return fs.MustCreateFile(fileSystem, metaPath, 
storage.FilePerm, shouldCache),
-                       fs.MustCreateFile(fileSystem, dataPath, 
storage.FilePerm, shouldCache)
+                       fs.MustCreateFile(fileSystem, dataPath, 
storage.FilePerm, shouldCache),
+                       fs.MustCreateFile(fileSystem, fitlerPath, 
storage.FilePerm, shouldCache)
        }
 
        bw.writers.metaWriter.init(fs.MustCreateFile(fileSystem, 
filepath.Join(path, metaFilename), storage.FilePerm, shouldCache))
@@ -280,6 +297,7 @@ func generateBlockWriter() *blockWriter {
                        writers: writers{
                                tagFamilyMetadataWriters: 
make(map[string]*writer),
                                tagFamilyWriters:         
make(map[string]*writer),
+                               tagFamilyFilterWriters:   
make(map[string]*writer),
                        },
                }
        }
diff --git a/banyand/stream/elements.go b/banyand/stream/elements.go
index 9d15cbfc..c040e23f 100644
--- a/banyand/stream/elements.go
+++ b/banyand/stream/elements.go
@@ -35,12 +35,14 @@ type tagValue struct {
        value     []byte
        valueArr  [][]byte
        valueType pbv1.ValueType
+       indexed   bool
 }
 
 func (t *tagValue) reset() {
        t.tag = ""
        t.value = nil
        t.valueArr = nil
+       t.indexed = false
 }
 
 func (t *tagValue) size() int {
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index 69e36b74..eaacae09 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -40,6 +40,7 @@ const (
        elementIndexFilename           = "idx"
        tagFamiliesMetadataFilenameExt = ".tfm"
        tagFamiliesFilenameExt         = ".tf"
+       tagFamiliesFilterFilenameExt   = ".tff"
 )
 
 type part struct {
@@ -48,6 +49,7 @@ type part struct {
        fileSystem           fs.FileSystem
        tagFamilyMetadata    map[string]fs.Reader
        tagFamilies          map[string]fs.Reader
+       tagFamilyFilter      map[string]fs.Reader
        path                 string
        primaryBlockMetadata []primaryBlockMetadata
        partMetadata         partMetadata
@@ -62,6 +64,9 @@ func (p *part) close() {
        for _, tfh := range p.tagFamilyMetadata {
                fs.MustClose(tfh)
        }
+       for _, tff := range p.tagFamilyFilter {
+               fs.MustClose(tff)
+       }
 }
 
 func (p *part) String() string {
@@ -80,9 +85,11 @@ func openMemPart(mp *memPart) *part {
        if mp.tagFamilies != nil {
                p.tagFamilies = make(map[string]fs.Reader)
                p.tagFamilyMetadata = make(map[string]fs.Reader)
+               p.tagFamilyFilter = make(map[string]fs.Reader)
                for name, tf := range mp.tagFamilies {
                        p.tagFamilies[name] = tf
                        p.tagFamilyMetadata[name] = mp.tagFamilyMetadata[name]
+                       p.tagFamilyFilter[name] = mp.tagFamilyFilter[name]
                }
        }
        return &p
@@ -91,27 +98,32 @@ func openMemPart(mp *memPart) *part {
 type memPart struct {
        tagFamilyMetadata map[string]*bytes.Buffer
        tagFamilies       map[string]*bytes.Buffer
+       tagFamilyFilter   map[string]*bytes.Buffer
        meta              bytes.Buffer
        primary           bytes.Buffer
        timestamps        bytes.Buffer
        partMetadata      partMetadata
 }
 
-func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer, 
fs.Writer) {
+func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer, 
fs.Writer, fs.Writer) {
        if mp.tagFamilies == nil {
                mp.tagFamilies = make(map[string]*bytes.Buffer)
                mp.tagFamilyMetadata = make(map[string]*bytes.Buffer)
+               mp.tagFamilyFilter = make(map[string]*bytes.Buffer)
        }
        tf, ok := mp.tagFamilies[name]
        tfh := mp.tagFamilyMetadata[name]
+       tff := mp.tagFamilyFilter[name]
        if ok {
                tf.Reset()
                tfh.Reset()
-               return tfh, tf
+               tff.Reset()
+               return tfh, tf, tff
        }
        mp.tagFamilies[name] = &bytes.Buffer{}
        mp.tagFamilyMetadata[name] = &bytes.Buffer{}
-       return mp.tagFamilyMetadata[name], mp.tagFamilies[name]
+       mp.tagFamilyFilter[name] = &bytes.Buffer{}
+       return mp.tagFamilyMetadata[name], mp.tagFamilies[name], 
mp.tagFamilyFilter[name]
 }
 
 func (mp *memPart) reset() {
@@ -129,6 +141,11 @@ func (mp *memPart) reset() {
                        tfh.Reset()
                }
        }
+       if mp.tagFamilyFilter != nil {
+               for _, tff := range mp.tagFamilyFilter {
+                       tff.Reset()
+               }
+       }
 }
 
 func (mp *memPart) mustInitFromElements(es *elements) {
@@ -176,6 +193,9 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path 
string) {
        for name, tfh := range mp.tagFamilyMetadata {
                fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path, 
name+tagFamiliesMetadataFilenameExt), storage.FilePerm)
        }
+       for name, tfh := range mp.tagFamilyFilter {
+               fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path, 
name+tagFamiliesFilterFilenameExt), storage.FilePerm)
+       }
 
        mp.partMetadata.mustWriteMetadata(fileSystem, path)
 
@@ -287,6 +307,12 @@ func mustOpenFilePart(id uint64, root string, fileSystem 
fs.FileSystem) *part {
                        }
                        p.tagFamilies[removeExt(e.Name(), 
tagFamiliesFilenameExt)] = mustOpenReader(path.Join(partPath, e.Name()), 
fileSystem)
                }
+               if filepath.Ext(e.Name()) == tagFamiliesFilterFilenameExt {
+                       if p.tagFamilyFilter == nil {
+                               p.tagFamilyFilter = make(map[string]fs.Reader)
+                       }
+                       p.tagFamilyFilter[removeExt(e.Name(), 
tagFamiliesFilterFilenameExt)] = mustOpenReader(path.Join(partPath, e.Name()), 
fileSystem)
+               }
        }
        return &p
 }
diff --git a/banyand/stream/part_iter.go b/banyand/stream/part_iter.go
index f4277032..1e1de90d 100644
--- a/banyand/stream/part_iter.go
+++ b/banyand/stream/part_iter.go
@@ -28,8 +28,10 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/pool"
+       logicalstream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
 )
 
 type partIter struct {
@@ -37,6 +39,7 @@ type partIter struct {
        p                    *part
        curBlock             *blockMetadata
        sids                 []common.SeriesID
+       blockFilter          index.Filter
        primaryBlockMetadata []primaryBlockMetadata
        bms                  []blockMetadata
        compressedPrimaryBuf []byte
@@ -50,6 +53,7 @@ func (pi *partIter) reset() {
        pi.curBlock = nil
        pi.p = nil
        pi.sids = nil
+       pi.blockFilter = nil
        pi.sidIdx = 0
        pi.primaryBlockMetadata = nil
        pi.bms = nil
@@ -58,13 +62,14 @@ func (pi *partIter) reset() {
        pi.err = nil
 }
 
-func (pi *partIter) init(bma *blockMetadataArray, p *part, sids 
[]common.SeriesID, minTimestamp, maxTimestamp int64) {
+func (pi *partIter) init(bma *blockMetadataArray, p *part, sids 
[]common.SeriesID, minTimestamp, maxTimestamp int64, blockFilter index.Filter) {
        pi.reset()
        pi.curBlock = &blockMetadata{}
        pi.p = p
 
        pi.bms = bma.arr
        pi.sids = sids
+       pi.blockFilter = blockFilter
        pi.minTimestamp = minTimestamp
        pi.maxTimestamp = maxTimestamp
 
@@ -221,6 +226,7 @@ func (pi *partIter) findBlock() bool {
                        bhs = bhs[1:]
                        continue
                }
+
                if bm.timestamps.min > pi.maxTimestamp {
                        if !pi.nextSeriesID() {
                                return false
@@ -228,6 +234,25 @@ func (pi *partIter) findBlock() bool {
                        continue
                }
 
+               if pi.blockFilter != nil && pi.blockFilter != 
logicalstream.ENode {
+                       shouldSkip, err := func() (bool, error) {
+                               tfs := generateTagFamilyFilters()
+                               defer releaseTagFamilyFilters(tfs)
+                               tfs.unmarshal(bm.tagFamilies, 
pi.p.tagFamilyMetadata, pi.p.tagFamilyFilter)
+                               return pi.blockFilter.ShouldSkip(tfs)
+                       }()
+                       if err != nil {
+                               pi.err = err
+                               return false
+                       }
+                       if shouldSkip {
+                               if !pi.nextSeriesID() {
+                                       return false
+                               }
+                               continue
+                       }
+               }
+
                pi.curBlock = bm
 
                pi.bms = bhs[1:]
diff --git a/banyand/stream/part_iter_test.go b/banyand/stream/part_iter_test.go
index ef42be37..6eb63da0 100644
--- a/banyand/stream/part_iter_test.go
+++ b/banyand/stream/part_iter_test.go
@@ -115,7 +115,7 @@ func Test_partIter_nextBlock(t *testing.T) {
                        verifyPart := func(p *part) {
                                defer p.close()
                                pi := partIter{}
-                               pi.init(bma, p, tt.sids, tt.opt.minTimestamp, 
tt.opt.maxTimestamp)
+                               pi.init(bma, p, tt.sids, tt.opt.minTimestamp, 
tt.opt.maxTimestamp, nil)
 
                                var got []blockMetadata
                                for pi.nextBlock() {
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 0bb1a7e9..0f8b787d 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -282,13 +282,13 @@ func (qo *queryOptions) copyFrom(other *queryOptions) {
 func indexSearch(ctx context.Context, sqo model.StreamQueryOptions,
        tabs []*tsTable, seriesList []uint64, tr *index.RangeOpts,
 ) (posting.List, posting.List, error) {
-       if sqo.Filter == nil || sqo.Filter == logicalstream.ENode {
+       if sqo.InvertedFilter == nil || sqo.InvertedFilter == 
logicalstream.ENode {
                return nil, nil, nil
        }
        result, resultTS := roaring.NewPostingList(), roaring.NewPostingList()
        for _, tw := range tabs {
                index := tw.Index()
-               pl, plTS, err := index.Search(ctx, seriesList, sqo.Filter, tr)
+               pl, plTS, err := index.Search(ctx, seriesList, 
sqo.InvertedFilter, tr)
                if err != nil {
                        return nil, nil, err
                }
diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go
index ae3e70af..3d5ee61e 100644
--- a/banyand/stream/query_by_idx.go
+++ b/banyand/stream/query_by_idx.go
@@ -83,7 +83,7 @@ func (qr *idxResult) scanParts(ctx context.Context, qo 
queryOptions) error {
        ti := generateTstIter()
        defer releaseTstIter(ti)
        sids := qo.sortedSids
-       ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
+       ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp, 
qo.SkippingFilter)
        if ti.Error() != nil {
                return fmt.Errorf("cannot init tstIter: %w", ti.Error())
        }
diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go
index cb8a3d7c..7499747f 100644
--- a/banyand/stream/tag.go
+++ b/banyand/stream/tag.go
@@ -26,6 +26,7 @@ import (
 )
 
 type tag struct {
+       tagFilter
        name      string
        values    [][]byte
        valueType pbv1.ValueType
@@ -39,6 +40,8 @@ func (t *tag) reset() {
                values[i] = nil
        }
        t.values = values[:0]
+
+       t.tagFilter.reset()
 }
 
 func (t *tag) resizeValues(valuesLen int) [][]byte {
@@ -51,7 +54,7 @@ func (t *tag) resizeValues(valuesLen int) [][]byte {
        return values
 }
 
-func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer) {
+func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer, tagFilterWriter 
*writer) {
        tm.reset()
 
        tm.name = t.name
@@ -68,6 +71,18 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter 
*writer) {
        }
        tm.offset = tagWriter.bytesWritten
        tagWriter.MustWrite(bb.Buf)
+
+       if t.filter != nil {
+               bb.Reset()
+               bb.Buf = encodeBloomFilter(bb.Buf[:0], t.filter)
+               if tm.valueType == pbv1.ValueTypeInt64 {
+                       tm.min = t.min
+                       tm.max = t.max
+               }
+               tm.filterBlock.size = uint64(len(bb.Buf))
+               tm.filterBlock.offset = tagFilterWriter.bytesWritten
+               tagFilterWriter.MustWrite(bb.Buf)
+       }
 }
 
 func (t *tag) mustReadValues(decoder *encoding.BytesBlockDecoder, reader 
fs.Reader, cm tagMetadata, count uint64) {
diff --git a/banyand/stream/tag_filter.go b/banyand/stream/tag_filter.go
new file mode 100644
index 00000000..c609e7d8
--- /dev/null
+++ b/banyand/stream/tag_filter.go
@@ -0,0 +1,222 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "bytes"
+       "fmt"
+
+       pkgbytes "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/filter"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+)
+
+func encodeBloomFilter(dst []byte, bf *filter.BloomFilter) []byte {
+       dst = encoding.Int64ToBytes(dst, int64(bf.N()))
+       dst = encoding.EncodeUint64Block(dst, bf.Bits())
+       return dst
+}
+
+func decodeBloomFilter(src []byte, bf *filter.BloomFilter) *filter.BloomFilter 
{
+       n := encoding.BytesToInt64(src)
+       bf.SetN(int(n))
+
+       m := n * filter.B
+       bits := make([]uint64, 0)
+       bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:], 
uint64((m+63)/64))
+       if err != nil {
+               logger.Panicf("failed to decode Bloom filter: %v", err)
+       }
+       bf.SetBits(bits)
+
+       return bf
+}
+
+func generateBloomFilter() *filter.BloomFilter {
+       v := bloomFilterPool.Get()
+       if v == nil {
+               return filter.NewBloomFilter(0)
+       }
+       return v
+}
+
+func releaseBloomFilter(bf *filter.BloomFilter) {
+       bf.Reset()
+       bloomFilterPool.Put(bf)
+}
+
+var bloomFilterPool = pool.Register[*filter.BloomFilter]("stream-bloomFilter")
+
+type tagFilter struct {
+       filter *filter.BloomFilter
+       min    []byte
+       max    []byte
+}
+
+func (tf *tagFilter) reset() {
+       tf.filter = nil
+       tf.min = tf.min[:0]
+       tf.max = tf.max[:0]
+}
+
+func generateTagFilter() *tagFilter {
+       v := tagFilterPool.Get()
+       if v == nil {
+               return &tagFilter{}
+       }
+       return v
+}
+
+func releaseTagFilter(tf *tagFilter) {
+       releaseBloomFilter(tf.filter)
+       tf.reset()
+       tagFilterPool.Put(tf)
+}
+
+var tagFilterPool = pool.Register[*tagFilter]("stream-tagFilter")
+
+type tagFamilyFilter map[string]*tagFilter
+
+func (tff *tagFamilyFilter) reset() {
+       clear(*tff)
+}
+
+func (tff tagFamilyFilter) unmarshal(tagFamilyMetadataBlock *dataBlock, 
metaReader, filterReader fs.Reader) {
+       bb := bigValuePool.Generate()
+       bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(tagFamilyMetadataBlock.size))
+       fs.MustReadData(metaReader, int64(tagFamilyMetadataBlock.offset), 
bb.Buf)
+       tfm := generateTagFamilyMetadata()
+       defer releaseTagFamilyMetadata(tfm)
+       err := tfm.unmarshal(bb.Buf)
+       if err != nil {
+               logger.Panicf("%s: cannot unmarshal tagFamilyMetadata: %v", 
metaReader.Path(), err)
+       }
+       bigValuePool.Release(bb)
+       for _, tm := range tfm.tagMetadata {
+               if tm.filterBlock.size == 0 {
+                       continue
+               }
+               bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(tm.filterBlock.size))
+               fs.MustReadData(filterReader, int64(tm.filterBlock.offset), 
bb.Buf)
+               bf := generateBloomFilter()
+               bf = decodeBloomFilter(bb.Buf, bf)
+               tf := generateTagFilter()
+               tf.filter = bf
+               if tm.valueType == pbv1.ValueTypeInt64 {
+                       tf.min = tm.min
+                       tf.max = tm.max
+               }
+               tff[tm.name] = tf
+       }
+}
+
+func generateTagFamilyFilter() *tagFamilyFilter {
+       v := tagFamilyFilterPool.Get()
+       if v == nil {
+               return &tagFamilyFilter{}
+       }
+       return v
+}
+
+func releaseTagFamilyFilter(tff *tagFamilyFilter) {
+       for _, tf := range *tff {
+               releaseTagFilter(tf)
+       }
+       tff.reset()
+       tagFamilyFilterPool.Put(tff)
+}
+
+var tagFamilyFilterPool = 
pool.Register[*tagFamilyFilter]("stream-tagFamilyFilter")
+
+type tagFamilyFilters struct {
+       tagFamilyFilters []*tagFamilyFilter
+}
+
+func (tfs *tagFamilyFilters) reset() {
+       tfs.tagFamilyFilters = tfs.tagFamilyFilters[:0]
+}
+
+func (tfs *tagFamilyFilters) unmarshal(tagFamilies map[string]*dataBlock, 
metaReader, filterReader map[string]fs.Reader) {
+       for tf := range tagFamilies {
+               tff := generateTagFamilyFilter()
+               tff.unmarshal(tagFamilies[tf], metaReader[tf], filterReader[tf])
+               tfs.tagFamilyFilters = append(tfs.tagFamilyFilters, tff)
+       }
+}
+
+func (tfs *tagFamilyFilters) Eq(tagName string, tagValue string) bool {
+       for _, tff := range tfs.tagFamilyFilters {
+               if tf, ok := (*tff)[tagName]; ok {
+                       return tf.filter.MightContain([]byte(tagValue))
+               }
+       }
+       return true
+}
+
+func (tfs *tagFamilyFilters) Range(tagName string, rangeOpts index.RangeOpts) 
(bool, error) {
+       for _, tff := range tfs.tagFamilyFilters {
+               if tf, ok := (*tff)[tagName]; ok {
+                       if rangeOpts.Lower != nil {
+                               lower, ok := 
rangeOpts.Lower.(*index.FloatTermValue)
+                               if !ok {
+                                       return false, fmt.Errorf("lower is not 
a float value: %v", rangeOpts.Lower)
+                               }
+                               value := make([]byte, 0)
+                               value = encoding.Int64ToBytes(value, 
int64(lower.Value))
+                               if bytes.Compare(tf.max, value) == -1 || 
!rangeOpts.IncludesLower && bytes.Equal(tf.max, value) {
+                                       return false, nil
+                               }
+                       }
+                       if rangeOpts.Upper != nil {
+                               upper, ok := 
rangeOpts.Upper.(*index.FloatTermValue)
+                               if !ok {
+                                       return false, fmt.Errorf("upper is not 
a float value: %v", rangeOpts.Upper)
+                               }
+                               value := make([]byte, 0)
+                               value = encoding.Int64ToBytes(value, 
int64(upper.Value))
+                               if bytes.Compare(tf.min, value) == 1 || 
!rangeOpts.IncludesUpper && bytes.Equal(tf.min, value) {
+                                       return false, nil
+                               }
+                       }
+               }
+       }
+       return true, nil
+}
+
+func generateTagFamilyFilters() *tagFamilyFilters {
+       v := tagFamilyFiltersPool.Get()
+       if v == nil {
+               return &tagFamilyFilters{}
+       }
+       return v
+}
+
+func releaseTagFamilyFilters(tfs *tagFamilyFilters) {
+       for _, tff := range tfs.tagFamilyFilters {
+               releaseTagFamilyFilter(tff)
+       }
+       tfs.reset()
+       tagFamilyFiltersPool.Put(tfs)
+}
+
+var tagFamilyFiltersPool = 
pool.Register[*tagFamilyFilters]("stream-tagFamilyFilters")
diff --git a/banyand/stream/tag_filter_test.go 
b/banyand/stream/tag_filter_test.go
new file mode 100644
index 00000000..b381ed78
--- /dev/null
+++ b/banyand/stream/tag_filter_test.go
@@ -0,0 +1,161 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/skywalking-banyandb/pkg/filter"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+func TestEncodeAndDecodeBloomFilter(t *testing.T) {
+       assert := assert.New(t)
+
+       bf := filter.NewBloomFilter(3)
+
+       items := [][]byte{
+               []byte("skywalking"),
+               []byte("banyandb"),
+               []byte(""),
+               []byte("hello"),
+               []byte("world"),
+       }
+
+       for i := 0; i < 3; i++ {
+               bf.Add(items[i])
+       }
+
+       buf := make([]byte, 0)
+       buf = encodeBloomFilter(buf, bf)
+       bf2 := filter.NewBloomFilter(0)
+       bf2 = decodeBloomFilter(buf, bf2)
+
+       for i := 0; i < 3; i++ {
+               mightContain := bf2.MightContain(items[i])
+               assert.True(mightContain)
+       }
+
+       for i := 3; i < 5; i++ {
+               mightContain := bf2.MightContain(items[i])
+               assert.False(mightContain)
+       }
+}
+
+type mockReader struct {
+       data []byte
+}
+
+func (mr *mockReader) Path() string {
+       return "mock"
+}
+
+func (mr *mockReader) Read(offset int64, buffer []byte) (int, error) {
+       if offset >= int64(len(mr.data)) {
+               return 0, nil
+       }
+       n := copy(buffer, mr.data[offset:])
+       return n, nil
+}
+
+func (mr *mockReader) SequentialRead() fs.SeqReader {
+       return nil
+}
+
+func (mr *mockReader) Close() error {
+       return nil
+}
+
+func generateMetaAndFilter(tagCount int, itemsPerTag int) ([]byte, []byte) {
+       tfm := generateTagFamilyMetadata()
+       defer releaseTagFamilyMetadata(tfm)
+       filterBuf := bytes.Buffer{}
+
+       for i := 0; i < tagCount; i++ {
+               bf := filter.NewBloomFilter(itemsPerTag)
+               for j := 0; j < itemsPerTag; j++ {
+                       item := make([]byte, 8)
+                       binary.BigEndian.PutUint64(item, 
uint64(i*itemsPerTag+j))
+                       bf.Add(item)
+               }
+               buf := make([]byte, 0)
+               buf = encodeBloomFilter(buf, bf)
+
+               tm := &tagMetadata{
+                       name:      fmt.Sprintf("tag_%d", i),
+                       valueType: pbv1.ValueTypeInt64,
+                       min:       make([]byte, 8),
+                       max:       make([]byte, 8),
+               }
+               binary.BigEndian.PutUint64(tm.min, uint64(i*itemsPerTag))
+               binary.BigEndian.PutUint64(tm.max, 
uint64(i*itemsPerTag+itemsPerTag-1))
+               tm.filterBlock.offset = uint64(filterBuf.Len())
+               tm.filterBlock.size = uint64(len(buf))
+               tfm.tagMetadata = append(tfm.tagMetadata, *tm)
+
+               filterBuf.Write(buf)
+       }
+
+       metaBuf := make([]byte, 0)
+       metaBuf = tfm.marshal(metaBuf)
+       return metaBuf, filterBuf.Bytes()
+}
+
+func BenchmarkTagFamilyFiltersUnmarshal(b *testing.B) {
+       testCases := []struct {
+               tagFamilyCount int
+               tagCount       int
+               itemsPerTag    int
+       }{
+               {1, 5, 10},
+               {2, 10, 100},
+               {3, 15, 1000},
+       }
+
+       for _, tc := range testCases {
+               b.Run(fmt.Sprintf("tagFamilies=%d_tags=%d_items=%d", 
tc.tagFamilyCount, tc.tagCount, tc.itemsPerTag), func(b *testing.B) {
+                       tagFamilies := make(map[string]*dataBlock)
+                       metaReaders := make(map[string]fs.Reader)
+                       filterReaders := make(map[string]fs.Reader)
+                       for i := 0; i < tc.tagFamilyCount; i++ {
+                               familyName := fmt.Sprintf("tagFamily_%d", i)
+                               metaBuf, filterBuf := 
generateMetaAndFilter(tc.tagCount, tc.itemsPerTag)
+                               tagFamilies[familyName] = &dataBlock{
+                                       offset: 0,
+                                       size:   uint64(len(metaBuf)),
+                               }
+                               metaReaders[familyName] = &mockReader{data: 
metaBuf}
+                               filterReaders[familyName] = &mockReader{data: 
filterBuf}
+                       }
+
+                       b.ResetTimer()
+                       b.ReportAllocs()
+                       for i := 0; i < b.N; i++ {
+                               tfs := generateTagFamilyFilters()
+                               tfs.unmarshal(tagFamilies, metaReaders, 
filterReaders)
+                               releaseTagFamilyFilters(tfs)
+                       }
+               })
+       }
+}
diff --git a/banyand/stream/tag_metadata.go b/banyand/stream/tag_metadata.go
index 8b8629f4..b30aa766 100644
--- a/banyand/stream/tag_metadata.go
+++ b/banyand/stream/tag_metadata.go
@@ -28,26 +28,38 @@ import (
 
 type tagMetadata struct {
        name string
+       min  []byte
+       max  []byte
        dataBlock
-       valueType pbv1.ValueType
+       valueType   pbv1.ValueType
+       filterBlock dataBlock
 }
 
 func (tm *tagMetadata) reset() {
        tm.name = ""
        tm.valueType = 0
        tm.dataBlock.reset()
+       tm.min = nil
+       tm.max = nil
+       tm.filterBlock.reset()
 }
 
 func (tm *tagMetadata) copyFrom(src *tagMetadata) {
        tm.name = src.name
        tm.valueType = src.valueType
        tm.dataBlock.copyFrom(&src.dataBlock)
+       tm.min = append(tm.min[:0], src.min...)
+       tm.max = append(tm.max[:0], src.max...)
+       tm.filterBlock.copyFrom(&src.filterBlock)
 }
 
 func (tm *tagMetadata) marshal(dst []byte) []byte {
        dst = encoding.EncodeBytes(dst, convert.StringToBytes(tm.name))
        dst = append(dst, byte(tm.valueType))
        dst = tm.dataBlock.marshal(dst)
+       dst = encoding.EncodeBytes(dst, tm.min)
+       dst = encoding.EncodeBytes(dst, tm.max)
+       dst = tm.filterBlock.marshal(dst)
        return dst
 }
 
@@ -63,6 +75,15 @@ func (tm *tagMetadata) unmarshal(src []byte) ([]byte, error) 
{
        tm.valueType = pbv1.ValueType(src[0])
        src = src[1:]
        src = tm.dataBlock.unmarshal(src)
+       src, tm.min, err = encoding.DecodeBytes(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal tagMetadata.min: %w", 
err)
+       }
+       src, tm.max, err = encoding.DecodeBytes(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal tagMetadata.max: %w", 
err)
+       }
+       src = tm.filterBlock.unmarshal(src)
        return src, nil
 }
 
diff --git a/banyand/stream/tag_metadata_test.go 
b/banyand/stream/tag_metadata_test.go
index fe35057e..c0399771 100644
--- a/banyand/stream/tag_metadata_test.go
+++ b/banyand/stream/tag_metadata_test.go
@@ -27,9 +27,12 @@ import (
 
 func Test_tagMetadata_reset(t *testing.T) {
        tm := &tagMetadata{
-               name:      "test",
-               valueType: pbv1.ValueTypeStr,
-               dataBlock: dataBlock{offset: 1, size: 10},
+               name:        "test",
+               valueType:   pbv1.ValueTypeStr,
+               dataBlock:   dataBlock{offset: 1, size: 10},
+               filterBlock: dataBlock{offset: 2, size: 20},
+               min:         []byte{1, 2, 3},
+               max:         []byte{4, 5, 6},
        }
 
        tm.reset()
@@ -37,13 +40,19 @@ func Test_tagMetadata_reset(t *testing.T) {
        assert.Equal(t, "", tm.name)
        assert.Equal(t, pbv1.ValueType(0), tm.valueType)
        assert.Equal(t, dataBlock{}, tm.dataBlock)
+       assert.Equal(t, dataBlock{}, tm.filterBlock)
+       assert.Nil(t, tm.min)
+       assert.Nil(t, tm.max)
 }
 
 func Test_tagMetadata_copyFrom(t *testing.T) {
        src := &tagMetadata{
-               name:      "test",
-               valueType: pbv1.ValueTypeStr,
-               dataBlock: dataBlock{offset: 1, size: 10},
+               name:        "test",
+               valueType:   pbv1.ValueTypeStr,
+               dataBlock:   dataBlock{offset: 1, size: 10},
+               filterBlock: dataBlock{offset: 2, size: 20},
+               min:         []byte{1, 2, 3},
+               max:         []byte{4, 5, 6},
        }
 
        dest := &tagMetadata{}
@@ -55,9 +64,12 @@ func Test_tagMetadata_copyFrom(t *testing.T) {
 
 func Test_tagMetadata_marshal(t *testing.T) {
        original := &tagMetadata{
-               name:      "test",
-               valueType: pbv1.ValueTypeStr,
-               dataBlock: dataBlock{offset: 1, size: 10},
+               name:        "test",
+               valueType:   pbv1.ValueTypeStr,
+               dataBlock:   dataBlock{offset: 1, size: 10},
+               filterBlock: dataBlock{offset: 2, size: 20},
+               min:         []byte{1, 2, 3},
+               max:         []byte{4, 5, 6},
        }
 
        marshaled := original.marshal(nil)
@@ -74,14 +86,20 @@ func Test_tagFamilyMetadata_reset(t *testing.T) {
        tfm := &tagFamilyMetadata{
                tagMetadata: []tagMetadata{
                        {
-                               name:      "test1",
-                               valueType: pbv1.ValueTypeStr,
-                               dataBlock: dataBlock{offset: 1, size: 10},
+                               name:        "test1",
+                               valueType:   pbv1.ValueTypeStr,
+                               dataBlock:   dataBlock{offset: 1, size: 10},
+                               filterBlock: dataBlock{offset: 2, size: 20},
+                               min:         []byte{1, 2, 3},
+                               max:         []byte{4, 5, 6},
                        },
                        {
-                               name:      "test2",
-                               valueType: pbv1.ValueTypeInt64,
-                               dataBlock: dataBlock{offset: 2, size: 20},
+                               name:        "test2",
+                               valueType:   pbv1.ValueTypeInt64,
+                               dataBlock:   dataBlock{offset: 3, size: 30},
+                               filterBlock: dataBlock{offset: 4, size: 40},
+                               min:         []byte{1, 2, 3},
+                               max:         []byte{4, 5, 6},
                        },
                },
        }
@@ -95,14 +113,20 @@ func Test_tagFamilyMetadata_copyFrom(t *testing.T) {
        src := &tagFamilyMetadata{
                tagMetadata: []tagMetadata{
                        {
-                               name:      "test1",
-                               valueType: pbv1.ValueTypeStr,
-                               dataBlock: dataBlock{offset: 1, size: 10},
+                               name:        "test1",
+                               valueType:   pbv1.ValueTypeStr,
+                               dataBlock:   dataBlock{offset: 1, size: 10},
+                               filterBlock: dataBlock{offset: 2, size: 20},
+                               min:         []byte{1, 2, 3},
+                               max:         []byte{4, 5, 6},
                        },
                        {
-                               name:      "test2",
-                               valueType: pbv1.ValueTypeInt64,
-                               dataBlock: dataBlock{offset: 2, size: 20},
+                               name:        "test2",
+                               valueType:   pbv1.ValueTypeInt64,
+                               dataBlock:   dataBlock{offset: 3, size: 30},
+                               filterBlock: dataBlock{offset: 4, size: 40},
+                               min:         []byte{1, 2, 3},
+                               max:         []byte{4, 5, 6},
                        },
                },
        }
@@ -139,14 +163,20 @@ func Test_tagFamilyMetadata_marshalUnmarshal(t 
*testing.T) {
                        original: &tagFamilyMetadata{
                                tagMetadata: []tagMetadata{
                                        {
-                                               name:      "test1",
-                                               valueType: pbv1.ValueTypeStr,
-                                               dataBlock: dataBlock{offset: 1, 
size: 10},
+                                               name:        "test1",
+                                               valueType:   pbv1.ValueTypeStr,
+                                               dataBlock:   dataBlock{offset: 
1, size: 10},
+                                               filterBlock: dataBlock{offset: 
2, size: 20},
+                                               min:         []byte{1, 2, 3},
+                                               max:         []byte{4, 5, 6},
                                        },
                                        {
-                                               name:      "test2",
-                                               valueType: pbv1.ValueTypeInt64,
-                                               dataBlock: dataBlock{offset: 2, 
size: 20},
+                                               name:        "test2",
+                                               valueType:   
pbv1.ValueTypeInt64,
+                                               dataBlock:   dataBlock{offset: 
3, size: 30},
+                                               filterBlock: dataBlock{offset: 
4, size: 40},
+                                               min:         []byte{1, 2, 3},
+                                               max:         []byte{4, 5, 6},
                                        },
                                },
                        },
diff --git a/banyand/stream/tag_test.go b/banyand/stream/tag_test.go
index 070c3679..c6a88be4 100644
--- a/banyand/stream/tag_test.go
+++ b/banyand/stream/tag_test.go
@@ -63,10 +63,11 @@ func TestTag_mustWriteTo_mustReadValues(t *testing.T) {
 
        tm := &tagMetadata{}
 
-       buf := &bytes.Buffer{}
-       w := &writer{}
+       buf, filterBuf := &bytes.Buffer{}, &bytes.Buffer{}
+       w, fw := &writer{}, &writer{}
        w.init(buf)
-       original.mustWriteTo(tm, w)
+       fw.init(filterBuf)
+       original.mustWriteTo(tm, w, fw)
        assert.Equal(t, w.bytesWritten, tm.size)
        assert.Equal(t, uint64(len(buf.Buf)), tm.size)
        assert.Equal(t, uint64(0), tm.offset)
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 218f16b2..493444c4 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -35,6 +35,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/pool"
@@ -331,7 +332,7 @@ func (ti *tstIter) reset() {
        ti.nextBlockNoop = false
 }
 
-func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids 
[]common.SeriesID, minTimestamp, maxTimestamp int64) {
+func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids 
[]common.SeriesID, minTimestamp, maxTimestamp int64, blockFilter index.Filter) {
        ti.reset()
        ti.parts = parts
 
@@ -340,7 +341,7 @@ func (ti *tstIter) init(bma *blockMetadataArray, parts 
[]*part, sids []common.Se
        }
        ti.piPool = ti.piPool[:len(ti.parts)]
        for i, p := range ti.parts {
-               ti.piPool[i].init(bma, p, sids, minTimestamp, maxTimestamp)
+               ti.piPool[i].init(bma, p, sids, minTimestamp, maxTimestamp, 
blockFilter)
        }
 
        ti.piHeap = ti.piHeap[:0]
diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go
index 3945f0bb..a954c260 100644
--- a/banyand/stream/tstable_test.go
+++ b/banyand/stream/tstable_test.go
@@ -145,7 +145,7 @@ func Test_tstIter(t *testing.T) {
                pp, n := s.getParts(nil, tt.minTimestamp, tt.maxTimestamp)
                require.Equal(t, len(s.parts), n)
                ti := &tstIter{}
-               ti.init(bma, pp, tt.sids, tt.minTimestamp, tt.maxTimestamp)
+               ti.init(bma, pp, tt.sids, tt.minTimestamp, tt.maxTimestamp, nil)
                var got []blockMetadata
                for ti.nextBlock() {
                        if ti.piHeap[0].curBlock.seriesID == 0 {
diff --git a/banyand/stream/write.go b/banyand/stream/write.go
index 23503cc9..1b310c18 100644
--- a/banyand/stream/write.go
+++ b/banyand/stream/write.go
@@ -22,6 +22,7 @@ import (
        "context"
        "fmt"
        "strings"
+       "time"
 
        "google.golang.org/protobuf/proto"
 
@@ -71,18 +72,33 @@ func (w *writeCallback) CheckHealth() *common.Error {
 func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent 
*streamv1.InternalWriteRequest,
        docIDBuilder *strings.Builder,
 ) (map[string]*elementsInGroup, error) {
-       req := writeEvent.Request
-       t := req.Element.Timestamp.AsTime().Local()
+       t := writeEvent.Request.Element.Timestamp.AsTime().Local()
        if err := timestamp.Check(t); err != nil {
                return nil, fmt.Errorf("invalid timestamp: %w", err)
        }
        ts := t.UnixNano()
+       eg, err := w.prepareElementsInGroup(dst, writeEvent, ts)
+       if err != nil {
+               return nil, err
+       }
+       et, err := w.prepareElementsInTable(eg, writeEvent, ts)
+       if err != nil {
+               return nil, err
+       }
+       err = w.processElements(et, eg, writeEvent, docIDBuilder, ts)
+       if err != nil {
+               return nil, err
+       }
+       return dst, nil
+}
 
-       gn := req.Metadata.Group
+func (w *writeCallback) prepareElementsInGroup(dst 
map[string]*elementsInGroup, writeEvent *streamv1.InternalWriteRequest, ts 
int64) (*elementsInGroup, error) {
+       gn := writeEvent.Request.Metadata.Group
        tsdb, err := w.schemaRepo.loadTSDB(gn)
        if err != nil {
                return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, 
err)
        }
+
        eg, ok := dst[gn]
        if !ok {
                eg = &elementsInGroup{
@@ -96,7 +112,10 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
        if eg.latestTS < ts {
                eg.latestTS = ts
        }
+       return eg, nil
+}
 
+func (w *writeCallback) prepareElementsInTable(eg *elementsInGroup, writeEvent 
*streamv1.InternalWriteRequest, ts int64) (*elementsInTable, error) {
        var et *elementsInTable
        for i := range eg.tables {
                if eg.tables[i].timeRange.Contains(ts) {
@@ -104,25 +123,30 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
                        break
                }
        }
-       shardID := common.ShardID(writeEvent.ShardId)
+
        if et == nil {
                var segment storage.Segment[*tsTable, option]
                for _, seg := range eg.segments {
                        if seg.GetTimeRange().Contains(ts) {
                                segment = seg
+                               break
                        }
                }
                if segment == nil {
-                       segment, err = tsdb.CreateSegmentIfNotExist(t)
+                       var err error
+                       segment, err = 
eg.tsdb.CreateSegmentIfNotExist(time.Unix(0, ts))
                        if err != nil {
                                return nil, fmt.Errorf("cannot create segment: 
%w", err)
                        }
                        eg.segments = append(eg.segments, segment)
                }
+
+               shardID := common.ShardID(writeEvent.ShardId)
                tstb, err := segment.CreateTSTableIfNotExist(shardID)
                if err != nil {
                        return nil, fmt.Errorf("cannot create ts table: %w", 
err)
                }
+
                et = &elementsInTable{
                        timeRange: segment.GetTimeRange(),
                        tsTable:   tstb,
@@ -131,6 +155,14 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
                et.elements.reset()
                eg.tables = append(eg.tables, et)
        }
+       return et, nil
+}
+
+func (w *writeCallback) processElements(et *elementsInTable, eg 
*elementsInGroup, writeEvent *streamv1.InternalWriteRequest,
+       docIDBuilder *strings.Builder, ts int64,
+) error {
+       req := writeEvent.Request
+
        et.elements.timestamps = append(et.elements.timestamps, ts)
        docIDBuilder.Reset()
        docIDBuilder.WriteString(req.Metadata.Name)
@@ -138,34 +170,39 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
        docIDBuilder.WriteString(req.Element.ElementId)
        eID := convert.HashStr(docIDBuilder.String())
        et.elements.elementIDs = append(et.elements.elementIDs, eID)
+
        stm, ok := 
w.schemaRepo.loadStream(writeEvent.GetRequest().GetMetadata())
        if !ok {
-               return nil, fmt.Errorf("cannot find stream definition: %s", 
writeEvent.GetRequest().GetMetadata())
+               return fmt.Errorf("cannot find stream definition: %s", 
writeEvent.GetRequest().GetMetadata())
        }
+
        fLen := len(req.Element.GetTagFamilies())
        if fLen < 1 {
-               return nil, fmt.Errorf("%s has no tag family", req)
+               return fmt.Errorf("%s has no tag family", req)
        }
        if fLen > len(stm.schema.GetTagFamilies()) {
-               return nil, fmt.Errorf("%s has more tag families than %s", 
req.Metadata, stm.schema)
+               return fmt.Errorf("%s has more tag families than %s", 
req.Metadata, stm.schema)
        }
+
        series := &pbv1.Series{
                Subject:      req.Metadata.Name,
                EntityValues: writeEvent.EntityValues,
        }
        if err := series.Marshal(); err != nil {
-               return nil, fmt.Errorf("cannot marshal series: %w", err)
+               return fmt.Errorf("cannot marshal series: %w", err)
        }
        et.elements.seriesIDs = append(et.elements.seriesIDs, series.ID)
 
        is := stm.indexSchema.Load().(indexSchema)
-
        tagFamilies := make([]tagValues, 0, len(stm.schema.TagFamilies))
+       indexedTags := make(map[string]map[string]struct{})
+       var fields []index.Field
+
        if len(is.indexRuleLocators.TagFamilyTRule) != 
len(stm.GetSchema().GetTagFamilies()) {
-               logger.Panicf("metadata crashed, tag family rule length %d, tag 
family length %d",
+               return fmt.Errorf("metadata crashed, tag family rule length %d, 
tag family length %d",
                        len(is.indexRuleLocators.TagFamilyTRule), 
len(stm.GetSchema().GetTagFamilies()))
        }
-       var fields []index.Field
+
        for i := range stm.GetSchema().GetTagFamilies() {
                var tagFamily *modelv1.TagFamilyForWrite
                if len(req.Element.TagFamilies) <= i {
@@ -178,6 +215,7 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
                tf := tagValues{
                        tag: tagFamilySpec.Name,
                }
+               indexedTags[tagFamilySpec.Name] = make(map[string]struct{})
 
                for j := range tagFamilySpec.Tags {
                        var tagValue *modelv1.TagValue
@@ -188,21 +226,25 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
                        }
 
                        t := tagFamilySpec.Tags[j]
+                       indexed := false
                        if r, ok := tfr[t.Name]; ok && tagValue != 
pbv1.NullTagValue {
-                               fields = appendField(fields, index.FieldKey{
-                                       IndexRuleID: r.GetMetadata().GetId(),
-                                       Analyzer:    r.Analyzer,
-                                       SeriesID:    series.ID,
-                               }, t.Type, tagValue, r.GetNoSort())
+                               if r.GetType() == 
databasev1.IndexRule_TYPE_INVERTED {
+                                       fields = appendField(fields, 
index.FieldKey{
+                                               IndexRuleID: 
r.GetMetadata().GetId(),
+                                               Analyzer:    r.Analyzer,
+                                               SeriesID:    series.ID,
+                                       }, t.Type, tagValue, r.GetNoSort())
+                               } else if r.GetType() == 
databasev1.IndexRule_TYPE_SKIPPING {
+                                       indexed = true
+                               }
                        }
                        _, isEntity := is.indexRuleLocators.EntitySet[t.Name]
                        if tagFamilySpec.Tags[j].IndexedOnly || isEntity {
                                continue
                        }
-                       tf.values = append(tf.values, encodeTagValue(
-                               t.Name,
-                               t.Type,
-                               tagValue))
+                       tv := encodeTagValue(t.Name, t.Type, tagValue)
+                       tv.indexed = indexed
+                       tf.values = append(tf.values, tv)
                }
                if len(tf.values) > 0 {
                        tagFamilies = append(tagFamilies, tf)
@@ -225,7 +267,7 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
                eg.docIDsAdded[docID] = struct{}{}
        }
 
-       return dst, nil
+       return nil
 }
 
 func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp 
bus.Message) {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index a2c27cca..4a5006d6 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -1495,6 +1495,7 @@ Type determine the index structure under the hood
 | ---- | ------ | ----------- |
 | TYPE_UNSPECIFIED | 0 |  |
 | TYPE_INVERTED | 1 |  |
+| TYPE_SKIPPING | 2 |  |
 | TYPE_TREE | 3 | TYPE_TREE is a tree index, which is used for storing 
hierarchical data. |
 
 
diff --git a/pkg/encoding/bytes.go b/pkg/encoding/bytes.go
index 14a02c7c..7650bf15 100644
--- a/pkg/encoding/bytes.go
+++ b/pkg/encoding/bytes.go
@@ -49,7 +49,7 @@ func EncodeBytesBlock(dst []byte, a [][]byte) []byte {
                aLens = append(aLens, uint64(len(s)))
        }
        u64s.L = aLens
-       dst = encodeUint64Block(dst, u64s.L)
+       dst = EncodeUint64Block(dst, u64s.L)
        ReleaseUint64List(u64s)
 
        bb := bbPool.Generate()
@@ -82,7 +82,7 @@ func (bbd *BytesBlockDecoder) Decode(dst [][]byte, src 
[]byte, itemsCount uint64
 
        var tail []byte
        var err error
-       u64List.L, tail, err = decodeUint64Block(u64List.L[:0], src, itemsCount)
+       u64List.L, tail, err = DecodeUint64Block(u64List.L[:0], src, itemsCount)
        if err != nil {
                return dst, fmt.Errorf("cannot decode string lengths: %w", err)
        }
@@ -114,7 +114,8 @@ func (bbd *BytesBlockDecoder) Decode(dst [][]byte, src 
[]byte, itemsCount uint64
        return dst, nil
 }
 
-func encodeUint64Block(dst []byte, a []uint64) []byte {
+// EncodeUint64Block encodes a block of uint64 values into dst.
+func EncodeUint64Block(dst []byte, a []uint64) []byte {
        bb := bbPool.Generate()
        bb.Buf = encodeUint64List(bb.Buf[:0], a)
        dst = compressBlock(dst, bb.Buf)
@@ -122,7 +123,8 @@ func encodeUint64Block(dst []byte, a []uint64) []byte {
        return dst
 }
 
-func decodeUint64Block(dst []uint64, src []byte, itemsCount uint64) ([]uint64, 
[]byte, error) {
+// DecodeUint64Block decodes a block of uint64 values from src.
+func DecodeUint64Block(dst []uint64, src []byte, itemsCount uint64) ([]uint64, 
[]byte, error) {
        bb := bbPool.Generate()
        defer bbPool.Release(bb)
 
diff --git a/pkg/encoding/dictionary.go b/pkg/encoding/dictionary.go
index c134f86b..a878bfa2 100644
--- a/pkg/encoding/dictionary.go
+++ b/pkg/encoding/dictionary.go
@@ -99,7 +99,7 @@ func (d *Dictionary) decodeBytesBlockWithTail(src []byte, 
itemsCount uint64) ([]
 
        var tail []byte
        var err error
-       u64List.L, tail, err = decodeUint64Block(u64List.L[:0], src, itemsCount)
+       u64List.L, tail, err = DecodeUint64Block(u64List.L[:0], src, itemsCount)
        if err != nil {
                return nil, nil, fmt.Errorf("cannot decode string lengths: %w", 
err)
        }
diff --git a/pkg/filter/bloom_filter.go b/pkg/filter/bloom_filter.go
new file mode 100644
index 00000000..98ff47c4
--- /dev/null
+++ b/pkg/filter/bloom_filter.go
@@ -0,0 +1,132 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package filter defines and implements data structures and interfaces for 
skipping index.
+package filter
+
+import (
+       "sync/atomic"
+       "unsafe"
+
+       "github.com/cespare/xxhash/v2"
+)
+
+const (
+       k = 6
+       // B specifies the number of bits allocated for each item.
+       B = 15
+)
+
+// BloomFilter is a probabilistic data structure designed to test whether an 
element is a member of a set.
+type BloomFilter struct {
+       bits []uint64
+       n    int
+}
+
+// NewBloomFilter creates a new Bloom filter with the number of items n and 
false positive rate p.
+func NewBloomFilter(n int) *BloomFilter {
+       m := n * B
+       bits := make([]uint64, (m+63)/64)
+       return &BloomFilter{
+               bits,
+               n,
+       }
+}
+
+// Reset resets the Bloom filter.
+func (bf *BloomFilter) Reset() {
+       bf.bits = bf.bits[:0]
+       bf.n = 0
+}
+
+// Add adds an item to the Bloom filter.
+func (bf *BloomFilter) Add(item []byte) bool {
+       h := xxhash.Sum64(item)
+       bits := bf.bits
+       maxBits := uint64(len(bits)) * 64
+       bp := (*[8]byte)(unsafe.Pointer(&h))
+       b := bp[:]
+       isNew := false
+       for i := 0; i < k; i++ {
+               hi := xxhash.Sum64(b)
+               h++
+               idx := hi % maxBits
+               i, j := idx/64, idx%64
+               mask := uint64(1) << j
+               w := atomic.LoadUint64(&bits[i])
+               for (w & mask) == 0 {
+                       wNew := w | mask
+                       if atomic.CompareAndSwapUint64(&bits[i], w, wNew) {
+                               isNew = true
+                               break
+                       }
+                       w = atomic.LoadUint64(&bits[i])
+               }
+       }
+       return isNew
+}
+
+// MightContain checks if an item might be in the Bloom filter.
+func (bf *BloomFilter) MightContain(item []byte) bool {
+       h := xxhash.Sum64(item)
+       bits := bf.bits
+       maxBits := uint64(len(bits)) * 64
+       bp := (*[8]byte)(unsafe.Pointer(&h))
+       b := bp[:]
+       for i := 0; i < k; i++ {
+               hi := xxhash.Sum64(b)
+               h++
+               idx := hi % maxBits
+               i, j := idx/64, idx%64
+               mask := uint64(1) << j
+               w := atomic.LoadUint64(&bits[i])
+               if (w & mask) == 0 {
+                       return false
+               }
+       }
+       return true
+}
+
+// Bits returns the underlying bitset.
+func (bf *BloomFilter) Bits() []uint64 {
+       return bf.bits
+}
+
+// N returns the number of items.
+func (bf *BloomFilter) N() int {
+       return bf.n
+}
+
+// SetBits sets the underlying bitset.
+func (bf *BloomFilter) SetBits(bits []uint64) {
+       bf.bits = bits
+}
+
+// SetN sets the number of items.
+func (bf *BloomFilter) SetN(n int) {
+       bf.n = n
+}
+
+// ResizeBits resizes the underlying bitset.
+func (bf *BloomFilter) ResizeBits(n int) {
+       bits := bf.bits
+       if m := n - cap(bits); m > 0 {
+               bits = append(bits[:cap(bits)], make([]uint64, m)...)
+       }
+       bits = bits[:n]
+       bf.bits = bits
+}
diff --git a/pkg/filter/bloom_filter_test.go b/pkg/filter/bloom_filter_test.go
new file mode 100644
index 00000000..e76204b7
--- /dev/null
+++ b/pkg/filter/bloom_filter_test.go
@@ -0,0 +1,136 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package filter
+
+import (
+       "encoding/binary"
+       "fmt"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestBloomFilter(t *testing.T) {
+       assert := assert.New(t)
+
+       bf := NewBloomFilter(3)
+       assert.NotNil(bf)
+
+       items := [][]byte{
+               []byte("skywalking"),
+               []byte("banyandb"),
+               []byte(""),
+               []byte("hello"),
+               []byte("world"),
+       }
+
+       for i := 0; i < 3; i++ {
+               res := bf.Add(items[i])
+               assert.True(res)
+       }
+
+       for i := 0; i < 3; i++ {
+               mightContain := bf.MightContain(items[i])
+               assert.True(mightContain)
+       }
+
+       for i := 3; i < 5; i++ {
+               mightContain := bf.MightContain(items[i])
+               assert.False(mightContain)
+       }
+}
+
+func BenchmarkFilterAdd(b *testing.B) {
+       for _, n := range []int{1e3, 1e4, 1e5, 1e6, 1e7} {
+               data := generateTestData(n)
+               b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) {
+                       benchmarkFilterAdd(b, n, data)
+               })
+       }
+}
+
+func benchmarkFilterAdd(b *testing.B, n int, data [][]byte) {
+       b.ReportAllocs()
+       b.RunParallel(func(pb *testing.PB) {
+               bf := NewBloomFilter(n)
+               for pb.Next() {
+                       for i := 0; i < n; i++ {
+                               bf.Add(data[i])
+                       }
+               }
+       })
+}
+
+func BenchmarkFilterMightContainHit(b *testing.B) {
+       for _, n := range []int{1e3, 1e4, 1e5, 1e6, 1e7} {
+               data := generateTestData(n)
+               b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) {
+                       benchmarkFilterMightContainHit(b, n, data)
+               })
+       }
+}
+
+func benchmarkFilterMightContainHit(b *testing.B, n int, data [][]byte) {
+       b.ReportAllocs()
+       b.RunParallel(func(pb *testing.PB) {
+               bf := NewBloomFilter(n)
+               for i := 0; i < n; i++ {
+                       bf.Add(data[i])
+               }
+               for pb.Next() {
+                       for i := 0; i < n; i++ {
+                               if !bf.MightContain(data[i]) {
+                                       panic(fmt.Errorf("missing item %d", 
data[i]))
+                               }
+                       }
+               }
+       })
+}
+
+func BenchmarkFilterMightContainMiss(b *testing.B) {
+       for _, n := range []int{1e3, 1e4, 1e5, 1e6, 1e7} {
+               data := generateTestData(n)
+               b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) {
+                       benchmarkFilterMightContainMiss(b, n, data)
+               })
+       }
+}
+
+func benchmarkFilterMightContainMiss(b *testing.B, n int, data [][]byte) {
+       b.ReportAllocs()
+       b.RunParallel(func(pb *testing.PB) {
+               bf := NewBloomFilter(n)
+               for pb.Next() {
+                       for i := 0; i < n; i++ {
+                               if bf.MightContain(data[i]) {
+                                       panic(fmt.Errorf("unexpected item %d", 
data[i]))
+                               }
+                       }
+               }
+       })
+}
+
+func generateTestData(n int) [][]byte {
+       data := make([][]byte, 0)
+       for i := 0; i < n; i++ {
+               buf := make([]byte, 8)
+               binary.BigEndian.PutUint64(buf, uint64(i))
+               data = append(data, buf)
+       }
+       return data
+}
diff --git a/pkg/index/inverted/analyzer.go b/pkg/index/analyzer/analyzer.go
similarity index 70%
rename from pkg/index/inverted/analyzer.go
rename to pkg/index/analyzer/analyzer.go
index 7c9c4423..d7b4b7cd 100644
--- a/pkg/index/inverted/analyzer.go
+++ b/pkg/index/analyzer/analyzer.go
@@ -15,17 +15,34 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package inverted
+// Package analyzer provides analyzers for indexing and searching.
+package analyzer
 
 import (
        "bytes"
        "unicode"
 
        "github.com/blugelabs/bluge/analysis"
+       "github.com/blugelabs/bluge/analysis/analyzer"
        "github.com/blugelabs/bluge/analysis/tokenizer"
+
+       "github.com/apache/skywalking-banyandb/pkg/index"
 )
 
-func newURLAnalyzer() *analysis.Analyzer {
+// Analyzers is a map that associates each IndexRule_Analyzer type with a 
corresponding Analyzer.
+var Analyzers map[string]*analysis.Analyzer
+
+func init() {
+       Analyzers = map[string]*analysis.Analyzer{
+               index.AnalyzerKeyword:  analyzer.NewKeywordAnalyzer(),
+               index.AnalyzerSimple:   analyzer.NewSimpleAnalyzer(),
+               index.AnalyzerStandard: analyzer.NewStandardAnalyzer(),
+               index.AnalyzerURL:      NewURLAnalyzer(),
+       }
+}
+
+// NewURLAnalyzer creates a new URL analyzer.
+func NewURLAnalyzer() *analysis.Analyzer {
        return &analysis.Analyzer{
                Tokenizer: tokenizer.NewCharacterTokenizer(func(r rune) bool {
                        return unicode.IsLetter(r) || unicode.IsNumber(r)
diff --git a/pkg/index/inverted/analyzer_test.go 
b/pkg/index/analyzer/analyzer_test.go
similarity index 98%
rename from pkg/index/inverted/analyzer_test.go
rename to pkg/index/analyzer/analyzer_test.go
index 0deeb400..ca45b561 100644
--- a/pkg/index/inverted/analyzer_test.go
+++ b/pkg/index/analyzer/analyzer_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package inverted
+package analyzer
 
 import (
        "testing"
@@ -90,7 +90,7 @@ func TestAlphanumericFilter(t *testing.T) {
 }
 
 func TestNewURLAnalyzer(t *testing.T) {
-       analyzer := newURLAnalyzer()
+       analyzer := NewURLAnalyzer()
 
        tests := []struct {
                input    string
diff --git a/pkg/index/index.go b/pkg/index/index.go
index b1a4fa1d..9568bba3 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -442,4 +442,11 @@ type GetSearcher func(location databasev1.IndexRule_Type) 
(Searcher, error)
 type Filter interface {
        fmt.Stringer
        Execute(getSearcher GetSearcher, seriesID common.SeriesID, timeRange 
*RangeOpts) (posting.List, posting.List, error)
+       ShouldSkip(tagFamilyFilters FilterOp) (bool, error)
+}
+
+// FilterOp is an interface for filtering operations based on skipping index.
+type FilterOp interface {
+       Eq(tagName string, tagValue string) bool
+       Range(tagName string, rangeOpts RangeOpts) (bool, error)
 }
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 96dcde0e..fc503811 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -28,7 +28,6 @@ import (
        roaringpkg "github.com/RoaringBitmap/roaring"
        "github.com/blugelabs/bluge"
        "github.com/blugelabs/bluge/analysis"
-       "github.com/blugelabs/bluge/analysis/analyzer"
        blugeIndex "github.com/blugelabs/bluge/index"
        "github.com/blugelabs/bluge/numeric"
        "github.com/blugelabs/bluge/search"
@@ -40,6 +39,7 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/analyzer"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -62,18 +62,6 @@ var (
        defaultProjection       = []string{docIDField, timestampField}
 )
 
-// Analyzers is a map that associates each IndexRule_Analyzer type with a 
corresponding Analyzer.
-var Analyzers map[string]*analysis.Analyzer
-
-func init() {
-       Analyzers = map[string]*analysis.Analyzer{
-               index.AnalyzerKeyword:  analyzer.NewKeywordAnalyzer(),
-               index.AnalyzerSimple:   analyzer.NewSimpleAnalyzer(),
-               index.AnalyzerStandard: analyzer.NewStandardAnalyzer(),
-               index.AnalyzerURL:      newURLAnalyzer(),
-       }
-}
-
 var _ index.Store = (*store)(nil)
 
 // StoreOpts wraps options to create an inverted index repository.
@@ -135,7 +123,7 @@ func (s *store) Batch(batch index.Batch) error {
                                tf.StoreValue()
                        }
                        if f.Key.Analyzer != index.AnalyzerUnspecified {
-                               tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer])
+                               tf = 
tf.WithAnalyzer(analyzer.Analyzers[f.Key.Analyzer])
                        }
                        doc.AddField(tf)
                        if i == 0 {
@@ -163,7 +151,7 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) {
        }
        indexConfig.CacheMaxBytes = opts.CacheMaxBytes
        config := bluge.DefaultConfigWithIndexConfig(indexConfig)
-       config.DefaultSearchAnalyzer = Analyzers[index.AnalyzerKeyword]
+       config.DefaultSearchAnalyzer = analyzer.Analyzers[index.AnalyzerKeyword]
        config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0)
        config = config.WithPrepareMergeCallback(opts.PrepareMergeCallback)
        w, err := bluge.OpenWriter(config)
@@ -371,11 +359,11 @@ func (s *store) Match(fieldKey index.FieldKey, matches 
[]string, opts *modelv1.C
 }
 
 func getMatchOptions(analyzerOnIndexRule string, opts 
*modelv1.Condition_MatchOption) (*analysis.Analyzer, bluge.MatchQueryOperator) {
-       analyzer := Analyzers[analyzerOnIndexRule]
+       a := analyzer.Analyzers[analyzerOnIndexRule]
        operator := bluge.MatchQueryOperatorOr
        if opts != nil {
                if opts.Analyzer != index.AnalyzerUnspecified {
-                       analyzer = Analyzers[opts.Analyzer]
+                       a = analyzer.Analyzers[opts.Analyzer]
                }
                if opts.Operator != 
modelv1.Condition_MatchOption_OPERATOR_UNSPECIFIED {
                        if opts.Operator == 
modelv1.Condition_MatchOption_OPERATOR_AND {
@@ -383,7 +371,7 @@ func getMatchOptions(analyzerOnIndexRule string, opts 
*modelv1.Condition_MatchOp
                        }
                }
        }
-       return analyzer, bluge.MatchQueryOperator(operator)
+       return a, bluge.MatchQueryOperator(operator)
 }
 
 func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list 
posting.List, timestamps posting.List, err error) {
diff --git a/pkg/index/inverted/inverted_series.go 
b/pkg/index/inverted/inverted_series.go
index 1b2335aa..e7c4d013 100644
--- a/pkg/index/inverted/inverted_series.go
+++ b/pkg/index/inverted/inverted_series.go
@@ -33,6 +33,7 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/analyzer"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -109,7 +110,7 @@ func toDoc(d index.Document, toParseFieldNames bool) 
(*bluge.Document, []string)
                                tf.Sortable()
                        }
                        if f.Key.Analyzer != index.AnalyzerUnspecified {
-                               tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer])
+                               tf = 
tf.WithAnalyzer(analyzer.Analyzers[f.Key.Analyzer])
                        }
                } else {
                        tf = bluge.NewStoredOnlyField(k, f.GetBytes())
diff --git a/pkg/query/logical/stream/index_filter.go 
b/pkg/query/logical/stream/index_filter.go
index be3f090b..8cce87ad 100644
--- a/pkg/query/logical/stream/index_filter.go
+++ b/pkg/query/logical/stream/index_filter.go
@@ -34,6 +34,7 @@ import (
 
 func buildLocalFilter(criteria *modelv1.Criteria, schema logical.Schema,
        entityDict map[string]int, entity []*modelv1.TagValue,
+       indexRuleType databasev1.IndexRule_Type,
 ) (index.Filter, [][]*modelv1.TagValue, error) {
        if criteria == nil {
                return nil, [][]*modelv1.TagValue{entity}, nil
@@ -48,7 +49,7 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema 
logical.Schema,
                if parsedEntity != nil {
                        return nil, parsedEntity, nil
                }
-               if ok, indexRule := schema.IndexDefined(cond.Name); ok {
+               if ok, indexRule := schema.IndexDefined(cond.Name); ok && 
indexRule.Type == indexRuleType {
                        return parseConditionToFilter(cond, indexRule, expr, 
entity)
                }
                return ENode, [][]*modelv1.TagValue{entity}, nil
@@ -58,16 +59,16 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema 
logical.Schema,
                        return nil, nil, 
errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and 
right) of [%v] are empty", criteria)
                }
                if le.GetLeft() == nil {
-                       return buildLocalFilter(le.Right, schema, entityDict, 
entity)
+                       return buildLocalFilter(le.Right, schema, entityDict, 
entity, indexRuleType)
                }
                if le.GetRight() == nil {
-                       return buildLocalFilter(le.Left, schema, entityDict, 
entity)
+                       return buildLocalFilter(le.Left, schema, entityDict, 
entity, indexRuleType)
                }
-               left, leftEntities, err := buildLocalFilter(le.Left, schema, 
entityDict, entity)
+               left, leftEntities, err := buildLocalFilter(le.Left, schema, 
entityDict, entity, indexRuleType)
                if err != nil {
                        return nil, nil, err
                }
-               right, rightEntities, err := buildLocalFilter(le.Right, schema, 
entityDict, entity)
+               right, rightEntities, err := buildLocalFilter(le.Right, schema, 
entityDict, entity, indexRuleType)
                if err != nil {
                        return nil, nil, err
                }
@@ -75,8 +76,11 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema 
logical.Schema,
                if entities == nil {
                        return nil, nil, nil
                }
-               if left == nil && right == nil {
-                       return nil, entities, nil
+               if left == nil {
+                       return right, entities, nil
+               }
+               if right == nil {
+                       return left, entities, nil
                }
                if left == ENode && right == ENode {
                        return ENode, entities, nil
@@ -113,7 +117,10 @@ func parseConditionToFilter(cond *modelv1.Condition, 
indexRule *databasev1.Index
        case modelv1.Condition_BINARY_OP_EQ:
                return newEq(indexRule, expr), [][]*modelv1.TagValue{entity}, 
nil
        case modelv1.Condition_BINARY_OP_MATCH:
-               return newMatch(indexRule, expr, cond.MatchOption), 
[][]*modelv1.TagValue{entity}, nil
+               if indexRule.Type == databasev1.IndexRule_TYPE_INVERTED {
+                       return newMatch(indexRule, expr, cond.MatchOption), 
[][]*modelv1.TagValue{entity}, nil
+               }
+               return nil, nil, 
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v 
for skipping index", cond)
        case modelv1.Condition_BINARY_OP_NE:
                return newNot(indexRule, newEq(indexRule, expr)), 
[][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_HAVING:
@@ -264,6 +271,19 @@ func (an *andNode) Execute(searcher index.GetSearcher, 
seriesID common.SeriesID,
        return execute(searcher, seriesID, an.node, an, tr)
 }
 
+func (an *andNode) ShouldSkip(tagFamilyFilters index.FilterOp) (bool, error) {
+       for _, sn := range an.node.SubNodes {
+               shouldSkip, err := sn.ShouldSkip(tagFamilyFilters)
+               if err != nil {
+                       return shouldSkip, err
+               }
+               if shouldSkip {
+                       return true, nil
+               }
+       }
+       return false, nil
+}
+
 func (an *andNode) MarshalJSON() ([]byte, error) {
        data := make(map[string]interface{}, 1)
        data["and"] = an.node.SubNodes
@@ -309,6 +329,19 @@ func (on *orNode) Execute(searcher index.GetSearcher, 
seriesID common.SeriesID,
        return execute(searcher, seriesID, on.node, on, tr)
 }
 
+func (on *orNode) ShouldSkip(tagFamilyFilters index.FilterOp) (bool, error) {
+       for _, sn := range on.node.SubNodes {
+               shouldSkip, err := sn.ShouldSkip(tagFamilyFilters)
+               if err != nil {
+                       return shouldSkip, err
+               }
+               if !shouldSkip {
+                       return false, nil
+               }
+       }
+       return true, nil
+}
+
 func (on *orNode) MarshalJSON() ([]byte, error) {
        data := make(map[string]interface{}, 1)
        data["or"] = on.node.SubNodes
@@ -400,6 +433,10 @@ func (eq *eq) Execute(searcher index.GetSearcher, seriesID 
common.SeriesID, tr *
        return s.MatchTerms(eq.Expr.Field(eq.Key.toIndex(seriesID, tr)))
 }
 
+func (eq *eq) ShouldSkip(tagFamilyFilters index.FilterOp) (bool, error) {
+       return !tagFamilyFilters.Eq(eq.Key.Tags[0], eq.Expr.String()), nil
+}
+
 func (eq *eq) MarshalJSON() ([]byte, error) {
        data := make(map[string]interface{}, 1)
        data["eq"] = eq.leaf
@@ -442,6 +479,10 @@ func (match *match) Execute(searcher index.GetSearcher, 
seriesID common.SeriesID
        )
 }
 
+func (match *match) ShouldSkip(_ index.FilterOp) (bool, error) {
+       return false, nil
+}
+
 func (match *match) MarshalJSON() ([]byte, error) {
        data := make(map[string]interface{}, 1)
        data["match"] = match.leaf
@@ -474,6 +515,10 @@ func (r *rangeOp) Execute(searcher index.GetSearcher, 
seriesID common.SeriesID,
        return s.Range(r.Key.toIndex(seriesID, tr), r.Opts)
 }
 
+func (r *rangeOp) ShouldSkip(tagFamilyFilters index.FilterOp) (bool, error) {
+       return tagFamilyFilters.Range(r.Key.Tags[0], r.Opts)
+}
+
 func (r *rangeOp) MarshalJSON() ([]byte, error) {
        data := make(map[string]interface{}, 1)
        var builder strings.Builder
@@ -521,6 +566,10 @@ func (an emptyNode) String() string {
        return "empty"
 }
 
+func (an emptyNode) ShouldSkip(_ index.FilterOp) (bool, error) {
+       return false, nil
+}
+
 type bypassList struct{}
 
 func (bl bypassList) Contains(_ uint64) bool {
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go 
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 3374d8fe..7e465172 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -46,7 +46,8 @@ var (
 
 type localIndexScan struct {
        schema            logical.Schema
-       filter            index.Filter
+       invertedFilter    index.Filter
+       skippingFilter    index.Filter
        result            model.StreamQueryResult
        ec                executor.StreamExecutionContext
        order             *logical.OrderBy
@@ -94,7 +95,8 @@ func (i *localIndexScan) Execute(ctx context.Context) 
([]*streamv1.Element, erro
                Name:           i.metadata.GetName(),
                TimeRange:      &i.timeRange,
                Entities:       i.entities,
-               Filter:         i.filter,
+               InvertedFilter: i.invertedFilter,
+               SkippingFilter: i.skippingFilter,
                Order:          orderBy,
                TagProjection:  i.projectionTags,
                MaxElementSize: i.maxElementSize,
@@ -110,7 +112,7 @@ func (i *localIndexScan) Execute(ctx context.Context) 
([]*streamv1.Element, erro
 func (i *localIndexScan) String() string {
        return fmt.Sprintf("IndexScan: 
startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; 
projection=%s; orderBy=%s; limit=%d",
                i.timeRange.Start.Unix(), i.timeRange.End.Unix(), 
i.metadata.GetGroup(), i.metadata.GetName(),
-               i.filter, logical.FormatTagRefs(", ", i.projectionTagRefs...), 
i.order, i.maxElementSize)
+               i.invertedFilter, logical.FormatTagRefs(", ", 
i.projectionTagRefs...), i.order, i.maxElementSize)
 }
 
 func (i *localIndexScan) Children() []logical.Plan {
diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go 
b/pkg/query/logical/stream/stream_plan_tag_filter.go
index b1a100df..69834adb 100644
--- a/pkg/query/logical/stream/stream_plan_tag_filter.go
+++ b/pkg/query/logical/stream/stream_plan_tag_filter.go
@@ -23,6 +23,7 @@ import (
        "time"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
@@ -56,7 +57,11 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) 
(logical.Plan, error)
                entity[idx] = pbv1.AnyTagValue
        }
        var err error
-       ctx.filter, ctx.entities, err = buildLocalFilter(uis.criteria, s, 
entityDict, entity)
+       ctx.invertedFilter, ctx.entities, err = buildLocalFilter(uis.criteria, 
s, entityDict, entity, databasev1.IndexRule_TYPE_INVERTED)
+       if err != nil {
+               return nil, err
+       }
+       ctx.skippingFilter, _, err = buildLocalFilter(uis.criteria, s, 
entityDict, entity, databasev1.IndexRule_TYPE_SKIPPING)
        if err != nil {
                return nil, err
        }
@@ -97,7 +102,8 @@ func (uis *unresolvedTagFilter) selectIndexScanner(ctx 
*analyzeContext, ec execu
                projectionTagRefs: ctx.projTagsRefs,
                projectionTags:    ctx.projectionTags,
                metadata:          uis.metadata,
-               filter:            ctx.filter,
+               invertedFilter:    ctx.invertedFilter,
+               skippingFilter:    ctx.skippingFilter,
                entities:          ctx.entities,
                l:                 logger.GetLogger("query", "stream", 
"local-index"),
                ec:                ec,
@@ -119,7 +125,8 @@ func tagFilter(startTime, endTime time.Time, metadata 
*commonv1.Metadata, criter
 
 type analyzeContext struct {
        s                logical.Schema
-       filter           index.Filter
+       invertedFilter   index.Filter
+       skippingFilter   index.Filter
        entities         [][]*modelv1.TagValue
        projectionTags   []model.TagProjection
        globalConditions []interface{}
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index 445ddfcc..2433fa70 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -22,10 +22,12 @@ import (
        "fmt"
        "strings"
 
+       "github.com/blugelabs/bluge/analysis"
        "github.com/pkg/errors"
 
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/index/analyzer"
 )
 
 var errUnsupportedLogicalOperation = errors.New("unsupported logical 
operation")
@@ -77,7 +79,7 @@ func BuildSimpleTagFilter(criteria *modelv1.Criteria) 
(TagFilter, error) {
        return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false)
 }
 
-// BuildTagFilter returns a TagFilter if predicates doesn't match any indices.
+// BuildTagFilter returns a TagFilter.
 func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, 
indexChecker IndexChecker, hasGlobalIndex bool) (TagFilter, error) {
        if criteria == nil {
                return DummyFilter, nil
@@ -85,17 +87,17 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict 
map[string]int, index
        switch criteria.GetExp().(type) {
        case *modelv1.Criteria_Condition:
                cond := criteria.GetCondition()
-               expr, err := parseExpr(cond.Value)
+               var expr ComparableExpr
+               var err error
+               _, indexRule := indexChecker.IndexRuleDefined(cond.Name)
+               expr, err = parseExpr(cond.Value, 
analyzer.Analyzers[indexRule.GetAnalyzer()])
                if err != nil {
                        return nil, err
                }
-               if ok, _ := indexChecker.IndexDefined(cond.Name); ok {
-                       return DummyFilter, nil
-               }
                if _, ok := entityDict[cond.Name]; ok {
                        return DummyFilter, nil
                }
-               return parseFilter(cond, expr)
+               return parseFilter(cond, expr, indexChecker)
        case *modelv1.Criteria_Le:
                le := criteria.GetLe()
                left, err := BuildTagFilter(le.Left, entityDict, indexChecker, 
hasGlobalIndex)
@@ -126,7 +128,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict 
map[string]int, index
        return nil, ErrInvalidCriteriaType
 }
 
-func parseFilter(cond *modelv1.Condition, expr ComparableExpr) (TagFilter, 
error) {
+func parseFilter(cond *modelv1.Condition, expr ComparableExpr, indexChecker 
IndexChecker) (TagFilter, error) {
        switch cond.Op {
        case modelv1.Condition_BINARY_OP_GT:
                return newRangeTag(cond.Name, rangeOpts{
@@ -148,6 +150,8 @@ func parseFilter(cond *modelv1.Condition, expr 
ComparableExpr) (TagFilter, error
                }), nil
        case modelv1.Condition_BINARY_OP_EQ:
                return newEqTag(cond.Name, expr), nil
+       case modelv1.Condition_BINARY_OP_MATCH:
+               return newMatchTag(cond.Name, expr, indexChecker), nil
        case modelv1.Condition_BINARY_OP_NE:
                return newNotTag(newEqTag(cond.Name, expr)), nil
        case modelv1.Condition_BINARY_OP_HAVING:
@@ -163,7 +167,21 @@ func parseFilter(cond *modelv1.Condition, expr 
ComparableExpr) (TagFilter, error
        }
 }
 
-func parseExpr(value *modelv1.TagValue) (ComparableExpr, error) {
+func parseExpr(value *modelv1.TagValue, analyzer *analysis.Analyzer) 
(ComparableExpr, error) {
+       if analyzer != nil {
+               if _, ok := value.Value.(*modelv1.TagValue_Str); ok {
+                       tokenStream := 
analyzer.Analyze([]byte(value.GetStr().Value))
+                       strArr := make([]string, 0, len(tokenStream))
+                       for _, token := range tokenStream {
+                               strArr = append(strArr, string(token.Term))
+                       }
+                       return &strArrLiteral{
+                               arr: strArr,
+                       }, nil
+               }
+               return nil, errors.WithMessagef(ErrUnsupportedConditionValue, 
"tag filter parses %v", value)
+       }
+
        switch v := value.Value.(type) {
        case *modelv1.TagValue_Str:
                return &strLiteral{v.Str.GetValue()}, nil
@@ -213,7 +231,7 @@ func (n *logicalNode) append(sub TagFilter) *logicalNode {
        return n
 }
 
-func matchTag(accessor TagValueIndexAccessor, registry TagSpecRegistry, n 
*logicalNode, lp logicalNodeOP) (bool, error) {
+func isMatch(accessor TagValueIndexAccessor, registry TagSpecRegistry, n 
*logicalNode, lp logicalNodeOP) (bool, error) {
        var result *bool
        for _, sn := range n.SubNodes {
                r, err := sn.Match(accessor, registry)
@@ -252,7 +270,7 @@ func (an *andLogicalNode) merge(bb ...bool) bool {
 }
 
 func (an *andLogicalNode) Match(accessor TagValueIndexAccessor, registry 
TagSpecRegistry) (bool, error) {
-       return matchTag(accessor, registry, an.logicalNode, an)
+       return isMatch(accessor, registry, an.logicalNode, an)
 }
 
 func (an *andLogicalNode) MarshalJSON() ([]byte, error) {
@@ -287,7 +305,7 @@ func (on *orLogicalNode) merge(bb ...bool) bool {
 }
 
 func (on *orLogicalNode) Match(accessor TagValueIndexAccessor, registry 
TagSpecRegistry) (bool, error) {
-       return matchTag(accessor, registry, on.logicalNode, on)
+       return isMatch(accessor, registry, on.logicalNode, on)
 }
 
 func (on *orLogicalNode) MarshalJSON() ([]byte, error) {
@@ -356,7 +374,7 @@ func newInTag(tagName string, values LiteralExpr) *inTag {
 }
 
 func (h *inTag) Match(accessor TagValueIndexAccessor, registry 
TagSpecRegistry) (bool, error) {
-       expr, err := tagExpr(accessor, registry, h.Name)
+       expr, err := tagExpr(accessor, registry, h.Name, nil)
        if err != nil {
                return false, err
        }
@@ -377,7 +395,7 @@ func newEqTag(tagName string, values LiteralExpr) *eqTag {
 }
 
 func (eq *eqTag) Match(accessor TagValueIndexAccessor, registry 
TagSpecRegistry) (bool, error) {
-       expr, err := tagExpr(accessor, registry, eq.Name)
+       expr, err := tagExpr(accessor, registry, eq.Name, nil)
        if err != nil {
                return false, err
        }
@@ -416,7 +434,7 @@ func newRangeTag(tagName string, opts rangeOpts) *rangeTag {
 }
 
 func (r *rangeTag) Match(accessor TagValueIndexAccessor, registry 
TagSpecRegistry) (bool, error) {
-       expr, err := tagExpr(accessor, registry, r.Name)
+       expr, err := tagExpr(accessor, registry, r.Name, nil)
        if err != nil {
                return false, err
        }
@@ -475,7 +493,7 @@ func (r *rangeTag) MarshalJSON() ([]byte, error) {
                        builder.WriteString(")")
                }
        }
-       data["key"] = r.tagLeaf
+       data["key"] = r.tagLeaf.Name
        data["range"] = builder.String()
        return json.Marshal(data)
 }
@@ -484,15 +502,49 @@ func (r *rangeTag) String() string {
        return convert.JSONToString(r)
 }
 
-func tagExpr(accessor TagValueIndexAccessor, registry TagSpecRegistry, tagName 
string) (ComparableExpr, error) {
+func tagExpr(accessor TagValueIndexAccessor, registry TagSpecRegistry, tagName 
string, analyzer *analysis.Analyzer) (ComparableExpr, error) {
        if tagSpec := registry.FindTagSpecByName(tagName); tagSpec != nil {
                if tagVal := accessor.GetTagValue(tagSpec.TagFamilyIdx, 
tagSpec.TagIdx); tagVal != nil {
-                       return parseExpr(tagVal)
+                       return parseExpr(tagVal, analyzer)
                }
        }
        return nil, errTagNotDefined
 }
 
+type matchTag struct {
+       *tagLeaf
+       indexChecker IndexChecker
+}
+
+func newMatchTag(tagName string, values LiteralExpr, indexChecker 
IndexChecker) *matchTag {
+       return &matchTag{
+               tagLeaf: &tagLeaf{
+                       Name: tagName,
+                       Expr: values,
+               },
+               indexChecker: indexChecker,
+       }
+}
+
+func (m *matchTag) Match(accessor TagValueIndexAccessor, registry 
TagSpecRegistry) (bool, error) {
+       _, indexRule := m.indexChecker.IndexRuleDefined(m.Name)
+       expr, err := tagExpr(accessor, registry, m.Name, 
analyzer.Analyzers[indexRule.GetAnalyzer()])
+       if err != nil {
+               return false, err
+       }
+       return expr.Contains(m.Expr), nil
+}
+
+func (m *matchTag) MarshalJSON() ([]byte, error) {
+       data := make(map[string]interface{}, 1)
+       data["match"] = m.tagLeaf
+       return json.Marshal(data)
+}
+
+func (m *matchTag) String() string {
+       return convert.JSONToString(m)
+}
+
 type havingTag struct {
        *tagLeaf
 }
@@ -507,7 +559,7 @@ func newHavingTag(tagName string, values LiteralExpr) 
*havingTag {
 }
 
 func (h *havingTag) Match(accessor TagValueIndexAccessor, registry 
TagSpecRegistry) (bool, error) {
-       expr, err := tagExpr(accessor, registry, h.Name)
+       expr, err := tagExpr(accessor, registry, h.Name, nil)
        if err != nil {
                return false, err
        }
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 40b4a1a7..39a7858b 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -87,7 +87,8 @@ type StreamQueryOptions struct {
        Name           string
        TimeRange      *timestamp.TimeRange
        Entities       [][]*modelv1.TagValue
-       Filter         index.Filter
+       InvertedFilter index.Filter
+       SkippingFilter index.Filter
        Order          *index.OrderBy
        TagProjection  []TagProjection
        MaxElementSize int
@@ -98,7 +99,8 @@ func (s *StreamQueryOptions) Reset() {
        s.Name = ""
        s.TimeRange = nil
        s.Entities = nil
-       s.Filter = nil
+       s.InvertedFilter = nil
+       s.SkippingFilter = nil
        s.Order = nil
        s.TagProjection = nil
        s.MaxElementSize = 0
@@ -117,7 +119,8 @@ func (s *StreamQueryOptions) CopyFrom(other 
*StreamQueryOptions) {
                s.Entities = nil
        }
 
-       s.Filter = other.Filter
+       s.InvertedFilter = other.InvertedFilter
+       s.SkippingFilter = other.SkippingFilter
        s.Order = other.Order
 
        // Deep copy if TagProjection is a slice
diff --git a/pkg/test/stream/testdata/index_rules/db.type.json 
b/pkg/test/stream/testdata/index_rules/db.type.json
index f23fd03f..8e39ca3b 100644
--- a/pkg/test/stream/testdata/index_rules/db.type.json
+++ b/pkg/test/stream/testdata/index_rules/db.type.json
@@ -7,6 +7,6 @@
   "tags": [
     "db.type"
   ],
-  "type": "TYPE_INVERTED",
+  "type": "TYPE_SKIPPING",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
diff --git a/pkg/test/stream/testdata/index_rules/endpoint_id.json 
b/pkg/test/stream/testdata/index_rules/endpoint_id.json
index 89272177..a82ec86a 100644
--- a/pkg/test/stream/testdata/index_rules/endpoint_id.json
+++ b/pkg/test/stream/testdata/index_rules/endpoint_id.json
@@ -7,6 +7,6 @@
   "tags": [
     "endpoint_id"
   ],
-  "type": "TYPE_INVERTED",
+  "type": "TYPE_SKIPPING",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
diff --git a/pkg/test/stream/testdata/index_rules/http.method.json 
b/pkg/test/stream/testdata/index_rules/http.method.json
index b6ce4e9b..23bc230e 100644
--- a/pkg/test/stream/testdata/index_rules/http.method.json
+++ b/pkg/test/stream/testdata/index_rules/http.method.json
@@ -7,6 +7,6 @@
   "tags": [
     "http.method"
   ],
-  "type": "TYPE_INVERTED",
+  "type": "TYPE_SKIPPING",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
diff --git a/pkg/test/stream/testdata/index_rules/mq.broker.json 
b/pkg/test/stream/testdata/index_rules/mq.broker.json
index 95180828..1fd748d6 100644
--- a/pkg/test/stream/testdata/index_rules/mq.broker.json
+++ b/pkg/test/stream/testdata/index_rules/mq.broker.json
@@ -7,6 +7,6 @@
   "tags": [
     "mq.broker"
   ],
-  "type": "TYPE_INVERTED",
+  "type": "TYPE_SKIPPING",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
diff --git a/pkg/test/stream/testdata/index_rules/mq.queue.json 
b/pkg/test/stream/testdata/index_rules/mq.queue.json
index de4aa2eb..beb77bb4 100644
--- a/pkg/test/stream/testdata/index_rules/mq.queue.json
+++ b/pkg/test/stream/testdata/index_rules/mq.queue.json
@@ -7,6 +7,6 @@
   "tags": [
     "mq.queue"
   ],
-  "type": "TYPE_INVERTED",
+  "type": "TYPE_SKIPPING",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
diff --git a/pkg/test/stream/testdata/index_rules/mq.topic.json 
b/pkg/test/stream/testdata/index_rules/mq.topic.json
index c292a582..cc5544dd 100644
--- a/pkg/test/stream/testdata/index_rules/mq.topic.json
+++ b/pkg/test/stream/testdata/index_rules/mq.topic.json
@@ -7,6 +7,6 @@
   "tags": [
     "mq.topic"
   ],
-  "type": "TYPE_INVERTED",
+  "type": "TYPE_SKIPPING",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
diff --git a/pkg/test/stream/testdata/index_rules/status_code.json 
b/pkg/test/stream/testdata/index_rules/status_code.json
index dcf04473..ccaa9f8d 100644
--- a/pkg/test/stream/testdata/index_rules/status_code.json
+++ b/pkg/test/stream/testdata/index_rules/status_code.json
@@ -7,6 +7,6 @@
   "tags": [
     "status_code"
   ],
-  "type": "TYPE_INVERTED",
+  "type": "TYPE_SKIPPING",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }
diff --git a/pkg/test/stream/testdata/index_rules/trace_id.json 
b/pkg/test/stream/testdata/index_rules/trace_id.json
index add7fc33..df92f0f9 100644
--- a/pkg/test/stream/testdata/index_rules/trace_id.json
+++ b/pkg/test/stream/testdata/index_rules/trace_id.json
@@ -7,6 +7,6 @@
   "tags": [
     "trace_id"
   ],
-  "type": "TYPE_INVERTED",
+  "type": "TYPE_SKIPPING",
   "updated_at": "2021-04-15T01:30:15.01Z"
 }


Reply via email to