This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch stream-load
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit cfad43a5bb597d7d5195a123614595e6b534ab3c
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Mon Apr 22 06:32:12 2024 +0000

    Won't store term in the index
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 banyand/internal/storage/index.go            |  17 ++--
 banyand/stream/iter.go                       |  69 ++++++---------
 banyand/stream/write.go                      |  10 +--
 pkg/index/index.go                           |  12 +--
 pkg/index/inverted/inverted.go               | 127 +++++++--------------------
 pkg/index/inverted/sort.go                   |  24 +++--
 pkg/index/testcases/duration.go              |  39 ++------
 test/docker/base-compose.yml                 |  10 +--
 test/stress/trace/docker-compose-single.yaml |  16 ++--
 9 files changed, 104 insertions(+), 220 deletions(-)

diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index af7eaed4..acb4086b 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -198,14 +198,11 @@ func (s *seriesIndex) Search(ctx context.Context, series 
*pbv1.Series, filter in
 
        var sortedSeriesList pbv1.SeriesList
        for iter.Next() {
-               pv := iter.Val().Value
-               if err = pv.Intersect(pl); err != nil {
-                       return nil, err
-               }
-               if pv.IsEmpty() {
+               seriesID := iter.Val()
+               if !pl.Contains(seriesID) {
                        continue
                }
-               sortedSeriesList = appendSeriesList(sortedSeriesList, 
seriesList, pv)
+               sortedSeriesList = appendSeriesList(sortedSeriesList, 
seriesList, common.SeriesID(seriesID))
                if err != nil {
                        return nil, err
                }
@@ -223,12 +220,12 @@ func filterSeriesList(seriesList pbv1.SeriesList, filter 
posting.List) pbv1.Seri
        return seriesList
 }
 
-func appendSeriesList(dest, src pbv1.SeriesList, filter posting.List) 
pbv1.SeriesList {
+func appendSeriesList(dest, src pbv1.SeriesList, target common.SeriesID) 
pbv1.SeriesList {
        for i := 0; i < len(src); i++ {
-               if !filter.Contains(uint64(src[i].ID)) {
-                       continue
+               if target == src[i].ID {
+                       dest = append(dest, src[i])
+                       break
                }
-               dest = append(dest, src[i])
        }
        return dest
 }
diff --git a/banyand/stream/iter.go b/banyand/stream/iter.go
index 5f9abb80..94be97f2 100644
--- a/banyand/stream/iter.go
+++ b/banyand/stream/iter.go
@@ -25,14 +25,12 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/index"
-       "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 type searcherIterator struct {
        fieldIterator     index.FieldIterator
-       cur               posting.Iterator
        err               error
        indexFilter       filterFn
        timeFilter        filterFn
@@ -64,46 +62,37 @@ func (s *searcherIterator) Next() bool {
        if s.err != nil {
                return false
        }
-       if s.cur == nil {
-               if s.fieldIterator.Next() {
-                       v := s.fieldIterator.Val()
-                       s.cur = v.Value.Iterator()
-               } else {
-                       s.err = io.EOF
-                       return false
-               }
+       if !s.fieldIterator.Next() {
+               s.err = io.EOF
+               return false
+       }
+       itemID := s.fieldIterator.Val()
+       if !s.timeFilter(itemID) {
+               return s.Next()
+       }
+       if s.indexFilter != nil && !s.indexFilter(itemID) {
+               return s.Next()
+       }
+       if e := s.l.Debug(); e.Enabled() {
+               e.Uint64("series_id", uint64(s.seriesID)).Uint64("item_id", 
itemID).Msg("got an item")
+       }
+       e, c, err := s.table.getElement(s.seriesID, int64(itemID), 
s.tagProjection)
+       if err != nil {
+               s.err = err
+               return false
+       }
+       sv, err := s.sortedTagLocation.getTagValue(e)
+       if err != nil {
+               s.err = err
+               return false
        }
-       if s.cur.Next() {
-               itemID := s.cur.Current()
-               if !s.timeFilter(itemID) {
-                       return s.Next()
-               }
-               if s.indexFilter != nil && !s.indexFilter(itemID) {
-                       return s.Next()
-               }
-               if e := s.l.Debug(); e.Enabled() {
-                       e.Uint64("series_id", 
uint64(s.seriesID)).Uint64("item_id", itemID).Msg("got an item")
-               }
-               e, c, err := s.table.getElement(s.seriesID, int64(itemID), 
s.tagProjection)
-               if err != nil {
-                       s.err = err
-                       return false
-               }
-               sv, err := s.sortedTagLocation.getTagValue(e)
-               if err != nil {
-                       s.err = err
-                       return false
-               }
-               s.currItem = item{
-                       element:        e,
-                       count:          c,
-                       sortedTagValue: sv,
-                       seriesID:       s.seriesID,
-               }
-               return true
+       s.currItem = item{
+               element:        e,
+               count:          c,
+               sortedTagValue: sv,
+               seriesID:       s.seriesID,
        }
-       s.cur = nil
-       return s.Next()
+       return true
 }
 
 func (s *searcherIterator) Val() item {
diff --git a/banyand/stream/write.go b/banyand/stream/write.go
index 5d0533e6..d6070f13 100644
--- a/banyand/stream/write.go
+++ b/banyand/stream/write.go
@@ -222,12 +222,10 @@ func (w *writeCallback) Rev(message bus.Message) (resp 
bus.Message) {
                        es := g.tables[j]
                        es.tsTable.Table().mustAddElements(&es.elements)
                        if len(es.docs) > 0 {
-                               go func() {
-                                       index := es.tsTable.Table().Index()
-                                       if err := index.Write(es.docs); err != 
nil {
-                                               
w.l.Error().Err(err).Msg("cannot write element index")
-                                       }
-                               }()
+                               index := es.tsTable.Table().Index()
+                               if err := index.Write(es.docs); err != nil {
+                                       w.l.Error().Err(err).Msg("cannot write 
element index")
+                               }
                        }
                        es.tsTable.DecRef()
                }
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 739db07b..ac25855c 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -164,7 +164,7 @@ func (r RangeOpts) Between(value []byte) int {
 // FieldIterator allows iterating over a field's posting values.
 type FieldIterator interface {
        Next() bool
-       Val() *PostingValue
+       Val() uint64
        Close() error
 }
 
@@ -177,20 +177,14 @@ func (i *dummyIterator) Next() bool {
        return false
 }
 
-func (i *dummyIterator) Val() *PostingValue {
-       return nil
+func (i *dummyIterator) Val() uint64 {
+       return 0
 }
 
 func (i *dummyIterator) Close() error {
        return nil
 }
 
-// PostingValue is the collection of a field's values.
-type PostingValue struct {
-       Value posting.List
-       Term  []byte
-}
-
 // Document represents a document in a index.
 type Document struct {
        Fields       []Field
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 0ab9e835..93dad96c 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -37,7 +37,6 @@ import (
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       pbytes "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -178,7 +177,6 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts, ord
        }
        fk := fieldKey.MarshalIndexRule()
        var query bluge.Query
-       shouldDecodeTerm := true
        if fieldKey.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
                query = bluge.NewTermRangeInclusiveQuery(
                        index.FieldStr(fieldKey, termRange.Lower),
@@ -188,7 +186,6 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts, ord
                ).
                        SetField(fk)
        } else {
-               shouldDecodeTerm = false
                bQuery := bluge.NewBooleanQuery().
                        AddMust(bluge.NewTermRangeInclusiveQuery(
                                string(termRange.Lower),
@@ -208,12 +205,10 @@ func (s *store) Iterator(fieldKey index.FieldKey, 
termRange index.RangeOpts, ord
                sortedKey = "-" + sortedKey
        }
        result := &sortIterator{
-               query:            query,
-               reader:           reader,
-               sortedKey:        sortedKey,
-               fk:               fk,
-               shouldDecodeTerm: shouldDecodeTerm,
-               size:             preLoadSize,
+               query:     query,
+               reader:    reader,
+               sortedKey: sortedKey,
+               size:      preLoadSize,
        }
        return result, nil
 }
@@ -229,11 +224,9 @@ func (s *store) MatchTerms(field index.Field) (list 
posting.List, err error) {
        }
        fk := field.Key.MarshalIndexRule()
        var query bluge.Query
-       shouldDecodeTerm := true
        if field.Key.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
                query = bluge.NewTermQuery(string(field.Marshal())).SetField(fk)
        } else {
-               shouldDecodeTerm = false
                bQuery := bluge.NewBooleanQuery().
                        
AddMust(bluge.NewTermQuery(string(field.Term)).SetField(fk))
                if field.Key.HasSeriesID() {
@@ -245,13 +238,13 @@ func (s *store) MatchTerms(field index.Field) (list 
posting.List, err error) {
        if err != nil {
                return nil, err
        }
-       iter := newBlugeMatchIterator(documentMatchIterator, fk, 
shouldDecodeTerm, reader)
+       iter := newBlugeMatchIterator(documentMatchIterator, reader)
        defer func() {
                err = multierr.Append(err, iter.Close())
        }()
        list = roaring.NewPostingList()
        for iter.Next() {
-               err = multierr.Append(err, list.Union(iter.Val().Value))
+               list.Insert(iter.Val())
        }
        return list, err
 }
@@ -278,13 +271,13 @@ func (s *store) Match(fieldKey index.FieldKey, matches 
[]string) (posting.List,
        if err != nil {
                return nil, err
        }
-       iter := newBlugeMatchIterator(documentMatchIterator, fk, false, reader)
+       iter := newBlugeMatchIterator(documentMatchIterator, reader)
        defer func() {
                err = multierr.Append(err, iter.Close())
        }()
        list := roaring.NewPostingList()
        for iter.Next() {
-               err = multierr.Append(err, list.Union(iter.Val().Value))
+               list.Insert(iter.Val())
        }
        return list, err
 }
@@ -296,7 +289,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts 
index.RangeOpts) (list posti
        }
        list = roaring.NewPostingList()
        for iter.Next() {
-               err = multierr.Append(err, list.Union(iter.Val().Value))
+               list.Insert(iter.Val())
        }
        err = multierr.Append(err, iter.Close())
        return
@@ -372,10 +365,10 @@ func (s *store) run() {
                                                toAddSeriesIDField := false
                                                for _, f := range d.Fields {
                                                        if f.Key.Analyzer == 
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
-                                                               
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), 
f.Marshal()).StoreValue().Sortable())
+                                                               
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), 
f.Marshal()).Sortable())
                                                        } else {
                                                                
toAddSeriesIDField = true
-                                                               
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), 
f.Term).StoreValue().Sortable().
+                                                               
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), 
f.Term).Sortable().
                                                                        
WithAnalyzer(analyzers[f.Key.Analyzer]))
                                                        }
                                                }
@@ -419,113 +412,57 @@ func (s *store) flush() {
 }
 
 type blugeMatchIterator struct {
-       delegated        search.DocumentMatchIterator
-       err              error
-       closer           io.Closer
-       current          *index.PostingValue
-       agg              *index.PostingValue
-       fieldKey         string
-       shouldDecodeTerm bool
-       closed           bool
+       delegated search.DocumentMatchIterator
+       err       error
+       closer    io.Closer
+       docID     uint64
 }
 
-func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey 
string, shouldDecodeTerm bool, closer io.Closer) blugeMatchIterator {
+func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer 
io.Closer) blugeMatchIterator {
        return blugeMatchIterator{
-               delegated:        delegated,
-               fieldKey:         fieldKey,
-               shouldDecodeTerm: shouldDecodeTerm,
-               closer:           closer,
+               delegated: delegated,
+               closer:    closer,
        }
 }
 
 func (bmi *blugeMatchIterator) Next() bool {
-       if bmi.err != nil || bmi.closed {
-               return false
-       }
-       //revive:disable:empty-block
-       for bmi.nextTerm() {
-       }
-       //revive:enable:empty-block
-       if bmi.err != nil || bmi.closed {
-               return false
-       }
-       return true
-}
-
-func (bmi *blugeMatchIterator) nextTerm() bool {
        var match *search.DocumentMatch
        match, bmi.err = bmi.delegated.Next()
        if bmi.err != nil {
                return false
        }
        if match == nil {
-               if bmi.agg == nil {
-                       bmi.closed = true
-               } else {
-                       bmi.current = bmi.agg
-                       bmi.agg = nil
-               }
+               bmi.err = io.EOF
                return false
        }
-       i := 0
-       var docID uint64
-       var term []byte
        bmi.err = match.VisitStoredFields(func(field string, value []byte) bool 
{
                if field == docIDField {
                        if len(value) == 8 {
-                               docID = convert.BytesToUint64(value)
+                               bmi.docID = convert.BytesToUint64(value)
                        } else if len(value) == 16 {
                                // value = seriesID(8bytes)+docID(8bytes)
-                               docID = convert.BytesToUint64(value[8:])
-                       }
-                       i++
-               }
-               if field == bmi.fieldKey {
-                       v := pbytes.Copy(value)
-                       if bmi.shouldDecodeTerm {
-                               term = index.UnmarshalTerm(v)
-                       } else {
-                               term = v
+                               bmi.docID = convert.BytesToUint64(value[8:])
                        }
-                       i++
-               }
-               return i < 2
-       })
-       if i != 2 {
-               // ignore invalid data
-               // TODO: add metric to cumulate ignored docs
-               return true
-       }
-       if bmi.err != nil {
-               return false
-       }
-       if bmi.agg == nil {
-               bmi.agg = &index.PostingValue{
-                       Term:  term,
-                       Value: roaring.NewPostingListWithInitialData(docID),
                }
                return true
-       }
-       if bytes.Equal(bmi.agg.Term, term) {
-               bmi.agg.Value.Insert(docID)
-               return true
-       }
-       bmi.current = bmi.agg
-       bmi.agg = &index.PostingValue{
-               Term:  term,
-               Value: roaring.NewPostingListWithInitialData(docID),
-       }
-       return false
+       })
+       return bmi.err == nil
 }
 
-func (bmi *blugeMatchIterator) Val() *index.PostingValue {
-       return bmi.current
+func (bmi *blugeMatchIterator) Val() uint64 {
+       return bmi.docID
 }
 
 func (bmi *blugeMatchIterator) Close() error {
-       bmi.closed = true
        if bmi.closer == nil {
+               if errors.Is(bmi.err, io.EOF) {
+                       return nil
+               }
                return bmi.err
        }
+       err := bmi.closer.Close()
+       if errors.Is(bmi.err, io.EOF) {
+               return err
+       }
        return errors.Join(bmi.err, bmi.closer.Close())
 }
diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go
index 12835eca..85e878e1 100644
--- a/pkg/index/inverted/sort.go
+++ b/pkg/index/inverted/sort.go
@@ -25,20 +25,16 @@ import (
        "math"
 
        "github.com/blugelabs/bluge"
-
-       "github.com/apache/skywalking-banyandb/pkg/index"
 )
 
 type sortIterator struct {
-       query            bluge.Query
-       err              error
-       reader           *bluge.Reader
-       current          *blugeMatchIterator
-       sortedKey        string
-       fk               string
-       size             int
-       skipped          int
-       shouldDecodeTerm bool
+       query     bluge.Query
+       err       error
+       reader    *bluge.Reader
+       current   *blugeMatchIterator
+       sortedKey string
+       size      int
+       skipped   int
 }
 
 func (si *sortIterator) Next() bool {
@@ -73,7 +69,7 @@ func (si *sortIterator) loadCurrent() bool {
                return false
        }
 
-       iter := newBlugeMatchIterator(documentMatchIterator, si.fk, 
si.shouldDecodeTerm, nil)
+       iter := newBlugeMatchIterator(documentMatchIterator, nil)
        si.current = &iter
        if si.next() {
                return true
@@ -84,13 +80,13 @@ func (si *sortIterator) loadCurrent() bool {
 
 func (si *sortIterator) next() bool {
        if si.current.Next() {
-               si.skipped += si.current.Val().Value.Len()
+               si.skipped++
                return true
        }
        return false
 }
 
-func (si *sortIterator) Val() *index.PostingValue {
+func (si *sortIterator) Val() uint64 {
        return si.current.Val()
 }
 
diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go
index ebbc9d70..9ffdc550 100644
--- a/pkg/index/testcases/duration.go
+++ b/pkg/index/testcases/duration.go
@@ -52,8 +52,7 @@ type args struct {
 }
 
 type result struct {
-       items []int
-       key   int
+       items []uint64
 }
 
 // RunDuration executes duration related cases.
@@ -301,49 +300,23 @@ func RunDuration(t *testing.T, data map[int]posting.List, 
store SimpleStore) {
                                }
                        }()
                        is.NotNil(iter)
-                       got := make([]result, 0)
-                       var currResult result
+                       var got result
                        for iter.Next() {
-                               key := 
int(convert.BytesToInt64(iter.Val().Term))
-                               if currResult.key != key {
-                                       if currResult.key != 0 {
-                                               got = append(got, currResult)
-                                               currResult = result{}
-                                       }
-                                       currResult.key = key
-                               }
-                               currResult.items = append(currResult.items, 
toArray(iter.Val().Value)...)
-                       }
-                       if len(currResult.items) > 0 {
-                               got = append(got, currResult)
+                               got.items = append(got.items, iter.Val())
                        }
                        for i := 0; i < 10; i++ {
                                is.False(iter.Next())
                        }
-                       wants := make([]result, 0, len(tt.want))
+                       var wants result
                        for _, w := range tt.want {
-                               wants = append(wants, result{
-                                       key:   w,
-                                       items: toArray(data[w]),
-                               })
+                               pl := data[w]
+                               wants.items = append(wants.items, 
pl.ToSlice()...)
                        }
                        tester.Equal(wants, got, tt.name)
                })
        }
 }
 
-func toArray(list posting.List) []int {
-       ints := make([]int, 0, list.Len())
-       iter := list.Iterator()
-       defer func(iter posting.Iterator) {
-               _ = iter.Close()
-       }(iter)
-       for iter.Next() {
-               ints = append(ints, int(iter.Current()))
-       }
-       return ints
-}
-
 // SetUpDuration initializes data for testing duration related cases.
 func SetUpDuration(t *assert.Assertions, store index.Writer) 
map[int]posting.List {
        r := map[int]posting.List{
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index 0a0d345c..1d1a32eb 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -20,11 +20,11 @@ services:
       - 2121
       - 6060
     command: standalone
-    # healthcheck:
-    #   test: ["CMD", "./bydbctl", "health", "--config=-", 
"--addr=http://banyandb:17913";]
-    #   interval: 30s
-    #   timeout: 30s
-    #   retries: 120
+    healthcheck:
+      test: ["CMD", "./bydbctl", "health", "--config=-", 
"--addr=http://banyandb:17913";]
+      interval: 30s
+      timeout: 30s
+      retries: 120
 
   liaison:
     hostname: liaison
diff --git a/test/stress/trace/docker-compose-single.yaml 
b/test/stress/trace/docker-compose-single.yaml
index dc868d70..e990ad3b 100644
--- a/test/stress/trace/docker-compose-single.yaml
+++ b/test/stress/trace/docker-compose-single.yaml
@@ -37,11 +37,11 @@ services:
     - 17913:17913
     - 6060:6060
     - 2121:2121
-    # deploy:
-    #   resources:
-    #     limits:
-    #       cpus: "4"
-    #       memory: 8G
+    deploy:
+      resources:
+        limits:
+          cpus: "4"
+          memory: 8G
     networks:
       - test
       - monitoring
@@ -63,9 +63,9 @@ services:
       - ./log4j2.xml:/skywalking/config/log4j2.xml
     networks:
       - test
-    # depends_on:
-    #   banyandb:
-    #     condition: service_healthy
+    depends_on:
+      banyandb:
+        condition: service_healthy
 
   prometheus:
     image: prom/prometheus:latest

Reply via email to