This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch stream-load in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit cfad43a5bb597d7d5195a123614595e6b534ab3c Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Mon Apr 22 06:32:12 2024 +0000 Won't store term in the index Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- banyand/internal/storage/index.go | 17 ++-- banyand/stream/iter.go | 69 ++++++--------- banyand/stream/write.go | 10 +-- pkg/index/index.go | 12 +-- pkg/index/inverted/inverted.go | 127 +++++++-------------------- pkg/index/inverted/sort.go | 24 +++-- pkg/index/testcases/duration.go | 39 ++------ test/docker/base-compose.yml | 10 +-- test/stress/trace/docker-compose-single.yaml | 16 ++-- 9 files changed, 104 insertions(+), 220 deletions(-) diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index af7eaed4..acb4086b 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -198,14 +198,11 @@ func (s *seriesIndex) Search(ctx context.Context, series *pbv1.Series, filter in var sortedSeriesList pbv1.SeriesList for iter.Next() { - pv := iter.Val().Value - if err = pv.Intersect(pl); err != nil { - return nil, err - } - if pv.IsEmpty() { + seriesID := iter.Val() + if !pl.Contains(seriesID) { continue } - sortedSeriesList = appendSeriesList(sortedSeriesList, seriesList, pv) + sortedSeriesList = appendSeriesList(sortedSeriesList, seriesList, common.SeriesID(seriesID)) if err != nil { return nil, err } @@ -223,12 +220,12 @@ func filterSeriesList(seriesList pbv1.SeriesList, filter posting.List) pbv1.Seri return seriesList } -func appendSeriesList(dest, src pbv1.SeriesList, filter posting.List) pbv1.SeriesList { +func appendSeriesList(dest, src pbv1.SeriesList, target common.SeriesID) pbv1.SeriesList { for i := 0; i < len(src); i++ { - if !filter.Contains(uint64(src[i].ID)) { - continue + if target == src[i].ID { + dest = append(dest, src[i]) + break } - dest = append(dest, src[i]) } return dest } diff --git a/banyand/stream/iter.go b/banyand/stream/iter.go index 5f9abb80..94be97f2 100644 --- a/banyand/stream/iter.go +++ b/banyand/stream/iter.go @@ -25,14 +25,12 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) type searcherIterator struct { fieldIterator index.FieldIterator - cur posting.Iterator err error indexFilter filterFn timeFilter filterFn @@ -64,46 +62,37 @@ func (s *searcherIterator) Next() bool { if s.err != nil { return false } - if s.cur == nil { - if s.fieldIterator.Next() { - v := s.fieldIterator.Val() - s.cur = v.Value.Iterator() - } else { - s.err = io.EOF - return false - } + if !s.fieldIterator.Next() { + s.err = io.EOF + return false + } + itemID := s.fieldIterator.Val() + if !s.timeFilter(itemID) { + return s.Next() + } + if s.indexFilter != nil && !s.indexFilter(itemID) { + return s.Next() + } + if e := s.l.Debug(); e.Enabled() { + e.Uint64("series_id", uint64(s.seriesID)).Uint64("item_id", itemID).Msg("got an item") + } + e, c, err := s.table.getElement(s.seriesID, int64(itemID), s.tagProjection) + if err != nil { + s.err = err + return false + } + sv, err := s.sortedTagLocation.getTagValue(e) + if err != nil { + s.err = err + return false } - if s.cur.Next() { - itemID := s.cur.Current() - if !s.timeFilter(itemID) { - return s.Next() - } - if s.indexFilter != nil && !s.indexFilter(itemID) { - return s.Next() - } - if e := s.l.Debug(); e.Enabled() { - e.Uint64("series_id", uint64(s.seriesID)).Uint64("item_id", itemID).Msg("got an item") - } - e, c, err := s.table.getElement(s.seriesID, int64(itemID), s.tagProjection) - if err != nil { - s.err = err - return false - } - sv, err := s.sortedTagLocation.getTagValue(e) - if err != nil { - s.err = err - return false - } - s.currItem = item{ - element: e, - count: c, - sortedTagValue: sv, - seriesID: s.seriesID, - } - return true + s.currItem = item{ + element: e, + count: c, + sortedTagValue: sv, + seriesID: s.seriesID, } - s.cur = nil - return s.Next() + return true } func (s *searcherIterator) Val() item { diff --git a/banyand/stream/write.go b/banyand/stream/write.go index 5d0533e6..d6070f13 100644 --- a/banyand/stream/write.go +++ b/banyand/stream/write.go @@ -222,12 +222,10 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { es := g.tables[j] es.tsTable.Table().mustAddElements(&es.elements) if len(es.docs) > 0 { - go func() { - index := es.tsTable.Table().Index() - if err := index.Write(es.docs); err != nil { - w.l.Error().Err(err).Msg("cannot write element index") - } - }() + index := es.tsTable.Table().Index() + if err := index.Write(es.docs); err != nil { + w.l.Error().Err(err).Msg("cannot write element index") + } } es.tsTable.DecRef() } diff --git a/pkg/index/index.go b/pkg/index/index.go index 739db07b..ac25855c 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -164,7 +164,7 @@ func (r RangeOpts) Between(value []byte) int { // FieldIterator allows iterating over a field's posting values. type FieldIterator interface { Next() bool - Val() *PostingValue + Val() uint64 Close() error } @@ -177,20 +177,14 @@ func (i *dummyIterator) Next() bool { return false } -func (i *dummyIterator) Val() *PostingValue { - return nil +func (i *dummyIterator) Val() uint64 { + return 0 } func (i *dummyIterator) Close() error { return nil } -// PostingValue is the collection of a field's values. -type PostingValue struct { - Value posting.List - Term []byte -} - // Document represents a document in a index. type Document struct { Fields []Field diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 0ab9e835..93dad96c 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -37,7 +37,6 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - pbytes "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" @@ -178,7 +177,6 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord } fk := fieldKey.MarshalIndexRule() var query bluge.Query - shouldDecodeTerm := true if fieldKey.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED { query = bluge.NewTermRangeInclusiveQuery( index.FieldStr(fieldKey, termRange.Lower), @@ -188,7 +186,6 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord ). SetField(fk) } else { - shouldDecodeTerm = false bQuery := bluge.NewBooleanQuery(). AddMust(bluge.NewTermRangeInclusiveQuery( string(termRange.Lower), @@ -208,12 +205,10 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord sortedKey = "-" + sortedKey } result := &sortIterator{ - query: query, - reader: reader, - sortedKey: sortedKey, - fk: fk, - shouldDecodeTerm: shouldDecodeTerm, - size: preLoadSize, + query: query, + reader: reader, + sortedKey: sortedKey, + size: preLoadSize, } return result, nil } @@ -229,11 +224,9 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) { } fk := field.Key.MarshalIndexRule() var query bluge.Query - shouldDecodeTerm := true if field.Key.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED { query = bluge.NewTermQuery(string(field.Marshal())).SetField(fk) } else { - shouldDecodeTerm = false bQuery := bluge.NewBooleanQuery(). AddMust(bluge.NewTermQuery(string(field.Term)).SetField(fk)) if field.Key.HasSeriesID() { @@ -245,13 +238,13 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) { if err != nil { return nil, err } - iter := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm, reader) + iter := newBlugeMatchIterator(documentMatchIterator, reader) defer func() { err = multierr.Append(err, iter.Close()) }() list = roaring.NewPostingList() for iter.Next() { - err = multierr.Append(err, list.Union(iter.Val().Value)) + list.Insert(iter.Val()) } return list, err } @@ -278,13 +271,13 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List, if err != nil { return nil, err } - iter := newBlugeMatchIterator(documentMatchIterator, fk, false, reader) + iter := newBlugeMatchIterator(documentMatchIterator, reader) defer func() { err = multierr.Append(err, iter.Close()) }() list := roaring.NewPostingList() for iter.Next() { - err = multierr.Append(err, list.Union(iter.Val().Value)) + list.Insert(iter.Val()) } return list, err } @@ -296,7 +289,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti } list = roaring.NewPostingList() for iter.Next() { - err = multierr.Append(err, list.Union(iter.Val().Value)) + list.Insert(iter.Val()) } err = multierr.Append(err, iter.Close()) return @@ -372,10 +365,10 @@ func (s *store) run() { toAddSeriesIDField := false for _, f := range d.Fields { if f.Key.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED { - doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Marshal()).StoreValue().Sortable()) + doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Marshal()).Sortable()) } else { toAddSeriesIDField = true - doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Term).StoreValue().Sortable(). + doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Term).Sortable(). WithAnalyzer(analyzers[f.Key.Analyzer])) } } @@ -419,113 +412,57 @@ func (s *store) flush() { } type blugeMatchIterator struct { - delegated search.DocumentMatchIterator - err error - closer io.Closer - current *index.PostingValue - agg *index.PostingValue - fieldKey string - shouldDecodeTerm bool - closed bool + delegated search.DocumentMatchIterator + err error + closer io.Closer + docID uint64 } -func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey string, shouldDecodeTerm bool, closer io.Closer) blugeMatchIterator { +func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer io.Closer) blugeMatchIterator { return blugeMatchIterator{ - delegated: delegated, - fieldKey: fieldKey, - shouldDecodeTerm: shouldDecodeTerm, - closer: closer, + delegated: delegated, + closer: closer, } } func (bmi *blugeMatchIterator) Next() bool { - if bmi.err != nil || bmi.closed { - return false - } - //revive:disable:empty-block - for bmi.nextTerm() { - } - //revive:enable:empty-block - if bmi.err != nil || bmi.closed { - return false - } - return true -} - -func (bmi *blugeMatchIterator) nextTerm() bool { var match *search.DocumentMatch match, bmi.err = bmi.delegated.Next() if bmi.err != nil { return false } if match == nil { - if bmi.agg == nil { - bmi.closed = true - } else { - bmi.current = bmi.agg - bmi.agg = nil - } + bmi.err = io.EOF return false } - i := 0 - var docID uint64 - var term []byte bmi.err = match.VisitStoredFields(func(field string, value []byte) bool { if field == docIDField { if len(value) == 8 { - docID = convert.BytesToUint64(value) + bmi.docID = convert.BytesToUint64(value) } else if len(value) == 16 { // value = seriesID(8bytes)+docID(8bytes) - docID = convert.BytesToUint64(value[8:]) - } - i++ - } - if field == bmi.fieldKey { - v := pbytes.Copy(value) - if bmi.shouldDecodeTerm { - term = index.UnmarshalTerm(v) - } else { - term = v + bmi.docID = convert.BytesToUint64(value[8:]) } - i++ - } - return i < 2 - }) - if i != 2 { - // ignore invalid data - // TODO: add metric to cumulate ignored docs - return true - } - if bmi.err != nil { - return false - } - if bmi.agg == nil { - bmi.agg = &index.PostingValue{ - Term: term, - Value: roaring.NewPostingListWithInitialData(docID), } return true - } - if bytes.Equal(bmi.agg.Term, term) { - bmi.agg.Value.Insert(docID) - return true - } - bmi.current = bmi.agg - bmi.agg = &index.PostingValue{ - Term: term, - Value: roaring.NewPostingListWithInitialData(docID), - } - return false + }) + return bmi.err == nil } -func (bmi *blugeMatchIterator) Val() *index.PostingValue { - return bmi.current +func (bmi *blugeMatchIterator) Val() uint64 { + return bmi.docID } func (bmi *blugeMatchIterator) Close() error { - bmi.closed = true if bmi.closer == nil { + if errors.Is(bmi.err, io.EOF) { + return nil + } return bmi.err } + err := bmi.closer.Close() + if errors.Is(bmi.err, io.EOF) { + return err + } return errors.Join(bmi.err, bmi.closer.Close()) } diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go index 12835eca..85e878e1 100644 --- a/pkg/index/inverted/sort.go +++ b/pkg/index/inverted/sort.go @@ -25,20 +25,16 @@ import ( "math" "github.com/blugelabs/bluge" - - "github.com/apache/skywalking-banyandb/pkg/index" ) type sortIterator struct { - query bluge.Query - err error - reader *bluge.Reader - current *blugeMatchIterator - sortedKey string - fk string - size int - skipped int - shouldDecodeTerm bool + query bluge.Query + err error + reader *bluge.Reader + current *blugeMatchIterator + sortedKey string + size int + skipped int } func (si *sortIterator) Next() bool { @@ -73,7 +69,7 @@ func (si *sortIterator) loadCurrent() bool { return false } - iter := newBlugeMatchIterator(documentMatchIterator, si.fk, si.shouldDecodeTerm, nil) + iter := newBlugeMatchIterator(documentMatchIterator, nil) si.current = &iter if si.next() { return true @@ -84,13 +80,13 @@ func (si *sortIterator) loadCurrent() bool { func (si *sortIterator) next() bool { if si.current.Next() { - si.skipped += si.current.Val().Value.Len() + si.skipped++ return true } return false } -func (si *sortIterator) Val() *index.PostingValue { +func (si *sortIterator) Val() uint64 { return si.current.Val() } diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go index ebbc9d70..9ffdc550 100644 --- a/pkg/index/testcases/duration.go +++ b/pkg/index/testcases/duration.go @@ -52,8 +52,7 @@ type args struct { } type result struct { - items []int - key int + items []uint64 } // RunDuration executes duration related cases. @@ -301,49 +300,23 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) { } }() is.NotNil(iter) - got := make([]result, 0) - var currResult result + var got result for iter.Next() { - key := int(convert.BytesToInt64(iter.Val().Term)) - if currResult.key != key { - if currResult.key != 0 { - got = append(got, currResult) - currResult = result{} - } - currResult.key = key - } - currResult.items = append(currResult.items, toArray(iter.Val().Value)...) - } - if len(currResult.items) > 0 { - got = append(got, currResult) + got.items = append(got.items, iter.Val()) } for i := 0; i < 10; i++ { is.False(iter.Next()) } - wants := make([]result, 0, len(tt.want)) + var wants result for _, w := range tt.want { - wants = append(wants, result{ - key: w, - items: toArray(data[w]), - }) + pl := data[w] + wants.items = append(wants.items, pl.ToSlice()...) } tester.Equal(wants, got, tt.name) }) } } -func toArray(list posting.List) []int { - ints := make([]int, 0, list.Len()) - iter := list.Iterator() - defer func(iter posting.Iterator) { - _ = iter.Close() - }(iter) - for iter.Next() { - ints = append(ints, int(iter.Current())) - } - return ints -} - // SetUpDuration initializes data for testing duration related cases. func SetUpDuration(t *assert.Assertions, store index.Writer) map[int]posting.List { r := map[int]posting.List{ diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml index 0a0d345c..1d1a32eb 100644 --- a/test/docker/base-compose.yml +++ b/test/docker/base-compose.yml @@ -20,11 +20,11 @@ services: - 2121 - 6060 command: standalone - # healthcheck: - # test: ["CMD", "./bydbctl", "health", "--config=-", "--addr=http://banyandb:17913"] - # interval: 30s - # timeout: 30s - # retries: 120 + healthcheck: + test: ["CMD", "./bydbctl", "health", "--config=-", "--addr=http://banyandb:17913"] + interval: 30s + timeout: 30s + retries: 120 liaison: hostname: liaison diff --git a/test/stress/trace/docker-compose-single.yaml b/test/stress/trace/docker-compose-single.yaml index dc868d70..e990ad3b 100644 --- a/test/stress/trace/docker-compose-single.yaml +++ b/test/stress/trace/docker-compose-single.yaml @@ -37,11 +37,11 @@ services: - 17913:17913 - 6060:6060 - 2121:2121 - # deploy: - # resources: - # limits: - # cpus: "4" - # memory: 8G + deploy: + resources: + limits: + cpus: "4" + memory: 8G networks: - test - monitoring @@ -63,9 +63,9 @@ services: - ./log4j2.xml:/skywalking/config/log4j2.xml networks: - test - # depends_on: - # banyandb: - # condition: service_healthy + depends_on: + banyandb: + condition: service_healthy prometheus: image: prom/prometheus:latest