This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 7dd1afa2 Return the tagValue of sorted tag (#465)
7dd1afa2 is described below

commit 7dd1afa2ddde2ff3f8b5e508cac631a087e0c09a
Author: Huang Youliang <[email protected]>
AuthorDate: Thu Jun 13 15:01:06 2024 +0800

    Return the tagValue of sorted tag (#465)
    
    * Return the tagValue of sorted tag
    
    * Update IndexRule in schema.proto
    
    * Persist values of sorted tags
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
 api/proto/banyandb/database/v1/schema.proto |  2 ++
 banyand/internal/storage/index.go           |  2 +-
 banyand/stream/index.go                     |  9 +--------
 banyand/stream/iter.go                      |  2 +-
 banyand/stream/write.go                     |  6 ++++--
 docs/api-reference.md                       |  1 +
 pkg/index/index.go                          | 11 ++++++-----
 pkg/index/inverted/inverted.go              | 27 ++++++++++++++++++---------
 pkg/index/inverted/sort.go                  |  4 ++--
 pkg/index/inverted/sort_test.go             |  9 +++++++--
 pkg/index/testcases/duration.go             |  2 +-
 pkg/query/logical/common.go                 |  1 +
 pkg/query/logical/plan.go                   |  3 +++
 13 files changed, 48 insertions(+), 31 deletions(-)

diff --git a/api/proto/banyandb/database/v1/schema.proto 
b/api/proto/banyandb/database/v1/schema.proto
index a7d391fd..7941866c 100644
--- a/api/proto/banyandb/database/v1/schema.proto
+++ b/api/proto/banyandb/database/v1/schema.proto
@@ -171,6 +171,8 @@ message IndexRule {
   }
   // analyzer analyzes tag value to support the full-text searching for 
TYPE_INVERTED indices.
   Analyzer analyzer = 5;
+  // no_sort indicates whether the index is not for sorting.
+  bool no_sort = 6;
 }
 
 // Subject defines which stream or measure would generate indices
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index 6980369d..ed2b5ec5 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -216,7 +216,7 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, filter
 
        var sortedSeriesList pbv1.SeriesList
        for iter.Next() {
-               seriesID, _ := iter.Val()
+               seriesID, _, _ := iter.Val()
                if !pl.Contains(seriesID) {
                        continue
                }
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index 32d48d14..e1a1d42a 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -63,16 +63,9 @@ func (e *elementIndex) Sort(sids []common.SeriesID, fieldKey 
index.FieldKey, ord
 }
 
 func (e *elementIndex) Write(docs index.Documents) error {
-       applied := make(chan struct{})
-       err := e.store.Batch(index.Batch{
+       return e.store.Batch(index.Batch{
                Documents: docs,
-               Applied:   applied,
        })
-       if err != nil {
-               return err
-       }
-       <-applied
-       return nil
 }
 
 func (e *elementIndex) Search(_ context.Context, seriesList pbv1.SeriesList, 
filter index.Filter,
diff --git a/banyand/stream/iter.go b/banyand/stream/iter.go
index b2a75edb..39412ca2 100644
--- a/banyand/stream/iter.go
+++ b/banyand/stream/iter.go
@@ -78,7 +78,7 @@ func (s *searcherIterator) Next() bool {
                s.err = io.EOF
                return false
        }
-       itemID, seriesID := s.fieldIterator.Val()
+       itemID, seriesID, _ := s.fieldIterator.Val()
        if !s.timeFilter(itemID) {
                return s.Next()
        }
diff --git a/banyand/stream/write.go b/banyand/stream/write.go
index 3ed4960f..c17b8df4 100644
--- a/banyand/stream/write.go
+++ b/banyand/stream/write.go
@@ -153,7 +153,8 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
                                                        Analyzer:    r.Analyzer,
                                                        SeriesID:    series.ID,
                                                },
-                                               Term: encodeTagValue.value,
+                                               Term:   encodeTagValue.value,
+                                               NoSort: r.GetNoSort(),
                                        })
                                } else {
                                        for _, val := range 
encodeTagValue.valueArr {
@@ -163,7 +164,8 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
                                                                Analyzer:    
r.Analyzer,
                                                                SeriesID:    
series.ID,
                                                        },
-                                                       Term: val,
+                                                       Term:   val,
+                                                       NoSort: r.GetNoSort(),
                                                })
                                        }
                                }
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 5ac35290..56e5cbcc 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -971,6 +971,7 @@ IndexRule should bind to a subject through an 
IndexRuleBinding to generate prope
 | type | [IndexRule.Type](#banyandb-database-v1-IndexRule-Type) |  | type is 
the IndexType of this IndexObject. |
 | updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  | 
updated_at indicates when the IndexRule is updated |
 | analyzer | [IndexRule.Analyzer](#banyandb-database-v1-IndexRule-Analyzer) |  
| analyzer analyzes tag value to support the full-text searching for 
TYPE_INVERTED indices. |
+| no_sort | [bool](#bool) |  | no_sort indicates whether the index is not for 
sorting. |
 
 
 
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 92b5efa3..43883931 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -84,8 +84,9 @@ func (f FieldKey) Equal(other FieldKey) bool {
 
 // Field is a indexed item in a document.
 type Field struct {
-       Term []byte
-       Key  FieldKey
+       Term   []byte
+       Key    FieldKey
+       NoSort bool
 }
 
 // Marshal encodes f to bytes.
@@ -165,7 +166,7 @@ func (r RangeOpts) Between(value []byte) int {
 // FieldIterator allows iterating over a field's posting values.
 type FieldIterator interface {
        Next() bool
-       Val() (uint64, common.SeriesID)
+       Val() (uint64, common.SeriesID, []byte)
        Close() error
 }
 
@@ -178,8 +179,8 @@ func (i *dummyIterator) Next() bool {
        return false
 }
 
-func (i *dummyIterator) Val() (uint64, common.SeriesID) {
-       return 0, 0
+func (i *dummyIterator) Val() (uint64, common.SeriesID, []byte) {
+       return 0, 0, nil
 }
 
 func (i *dummyIterator) Close() error {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 561dc77e..ceb9e739 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -235,13 +235,13 @@ func (s *store) MatchTerms(field index.Field) (list 
posting.List, err error) {
        if err != nil {
                return nil, err
        }
-       iter := newBlugeMatchIterator(documentMatchIterator, reader)
+       iter := newBlugeMatchIterator(documentMatchIterator, reader, false)
        defer func() {
                err = multierr.Append(err, iter.Close())
        }()
        list = roaring.NewPostingList()
        for iter.Next() {
-               docID, _ := iter.Val()
+               docID, _, _ := iter.Val()
                list.Insert(docID)
        }
        return list, err
@@ -269,13 +269,13 @@ func (s *store) Match(fieldKey index.FieldKey, matches 
[]string) (posting.List,
        if err != nil {
                return nil, err
        }
-       iter := newBlugeMatchIterator(documentMatchIterator, reader)
+       iter := newBlugeMatchIterator(documentMatchIterator, reader, false)
        defer func() {
                err = multierr.Append(err, iter.Close())
        }()
        list := roaring.NewPostingList()
        for iter.Next() {
-               docID, _ := iter.Val()
+               docID, _, _ := iter.Val()
                list.Insert(docID)
        }
        return list, err
@@ -288,7 +288,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts 
index.RangeOpts) (list posti
        }
        list = roaring.NewPostingList()
        for iter.Next() {
-               docID, _ := iter.Val()
+               docID, _, _ := iter.Val()
                list.Insert(docID)
        }
        err = multierr.Append(err, iter.Close())
@@ -363,7 +363,10 @@ func (s *store) run() {
                                                
docIDBuffer.Write(convert.Uint64ToBytes(d.DocID))
                                                doc := 
bluge.NewDocument(docIDBuffer.String())
                                                for _, f := range d.Fields {
-                                                       tf := 
bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Term).Sortable()
+                                                       tf := 
bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Term)
+                                                       if !f.NoSort {
+                                                               
tf.StoreValue().Sortable()
+                                                       }
                                                        if f.Key.Analyzer != 
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
                                                                tf = 
tf.WithAnalyzer(analyzers[f.Key.Analyzer])
                                                        }
@@ -411,14 +414,17 @@ type blugeMatchIterator struct {
        delegated search.DocumentMatchIterator
        err       error
        closer    io.Closer
+       term      []byte
        docID     uint64
        seriesID  common.SeriesID
+       sorted    bool
 }
 
-func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer 
io.Closer) blugeMatchIterator {
+func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer 
io.Closer, sorted bool) blugeMatchIterator {
        return blugeMatchIterator{
                delegated: delegated,
                closer:    closer,
+               sorted:    sorted,
        }
 }
 
@@ -440,6 +446,9 @@ func (bmi *blugeMatchIterator) Next() bool {
                                // value = seriesID(8bytes)+docID(8bytes)
                                bmi.docID = convert.BytesToUint64(value[8:])
                                bmi.seriesID = 
common.SeriesID(convert.BytesToUint64(value[:8]))
+                               if bmi.sorted {
+                                       bmi.term = index.UnmarshalTerm(value)
+                               }
                        }
                }
                return true
@@ -447,8 +456,8 @@ func (bmi *blugeMatchIterator) Next() bool {
        return bmi.err == nil
 }
 
-func (bmi *blugeMatchIterator) Val() (uint64, common.SeriesID) {
-       return bmi.docID, bmi.seriesID
+func (bmi *blugeMatchIterator) Val() (uint64, common.SeriesID, []byte) {
+       return bmi.docID, bmi.seriesID, bmi.term
 }
 
 func (bmi *blugeMatchIterator) Close() error {
diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go
index 5d967292..72291ab9 100644
--- a/pkg/index/inverted/sort.go
+++ b/pkg/index/inverted/sort.go
@@ -110,7 +110,7 @@ func (si *sortIterator) loadCurrent() bool {
                return false
        }
 
-       iter := newBlugeMatchIterator(documentMatchIterator, nil)
+       iter := newBlugeMatchIterator(documentMatchIterator, nil, true)
        si.current = &iter
        if si.next() {
                return true
@@ -127,7 +127,7 @@ func (si *sortIterator) next() bool {
        return false
 }
 
-func (si *sortIterator) Val() (uint64, common.SeriesID) {
+func (si *sortIterator) Val() (uint64, common.SeriesID, []byte) {
        return si.current.Val()
 }
 
diff --git a/pkg/index/inverted/sort_test.go b/pkg/index/inverted/sort_test.go
index 3db32159..6fe759aa 100644
--- a/pkg/index/inverted/sort_test.go
+++ b/pkg/index/inverted/sort_test.go
@@ -167,8 +167,11 @@ func TestStore_Sort(t *testing.T) {
                        is.NotNil(iter)
                        var got result
                        for iter.Next() {
-                               docID, _ := iter.Val()
+                               docID, _, term := iter.Val()
                                got.items = append(got.items, docID)
+                               if term != nil {
+                                       got.terms = append(got.terms, term)
+                               }
                        }
                        for i := 0; i < 10; i++ {
                                is.False(iter.Next())
@@ -178,7 +181,8 @@ func TestStore_Sort(t *testing.T) {
                                pl := data[w]
                                wants.items = append(wants.items, 
pl.ToSlice()...)
                        }
-                       tester.Equal(wants, got, tt.name)
+                       tester.Equal(wants.items, got.items, tt.name)
+                       tester.Equal(len(got.items), len(got.terms), tt.name)
                })
        }
 }
@@ -190,6 +194,7 @@ type args struct {
 
 type result struct {
        items []uint64
+       terms [][]byte
 }
 
 func setUpDuration(t *require.Assertions, store index.Writer) 
map[int]posting.List {
diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go
index 0eaaa4da..944b2e4c 100644
--- a/pkg/index/testcases/duration.go
+++ b/pkg/index/testcases/duration.go
@@ -303,7 +303,7 @@ func RunDuration(t *testing.T, data map[int]posting.List, 
store SimpleStore) {
                        is.NotNil(iter)
                        var got result
                        for iter.Next() {
-                               docID, _ := iter.Val()
+                               docID, _, _ := iter.Val()
                                got.items = append(got.items, docID)
                        }
                        for i := 0; i < 10; i++ {
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 69bf90aa..88b77220 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -29,6 +29,7 @@ var (
        errUnsupportedConditionValue = errors.New("unsupported condition value 
type")
        errInvalidCriteriaType       = errors.New("invalid criteria type")
        errIndexNotDefined           = errors.New("index is not define for the 
tag")
+       errIndexSortingUnsupported   = errors.New("index does not support 
sorting")
 )
 
 // Tag represents the combination of  tag family and tag name.
diff --git a/pkg/query/logical/plan.go b/pkg/query/logical/plan.go
index b3658f8a..5417b6d8 100644
--- a/pkg/query/logical/plan.go
+++ b/pkg/query/logical/plan.go
@@ -72,6 +72,9 @@ func ParseOrderBy(s Schema, indexRuleName string, sort 
modelv1.Sort) (*OrderBy,
        if !defined {
                return nil, errors.Wrap(errIndexNotDefined, indexRuleName)
        }
+       if indexRule.NoSort {
+               return nil, errors.Wrap(errIndexSortingUnsupported, 
indexRuleName)
+       }
 
        projFieldSpecs, err := s.CreateTagRef(NewTags("", 
indexRule.GetTags()...))
        if err != nil {

Reply via email to