This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 7dd1afa2 Return the tagValue of sorted tag (#465)
7dd1afa2 is described below
commit 7dd1afa2ddde2ff3f8b5e508cac631a087e0c09a
Author: Huang Youliang <[email protected]>
AuthorDate: Thu Jun 13 15:01:06 2024 +0800
Return the tagValue of sorted tag (#465)
* Return the tagValue of sorted tag
* Update IndexRule in schema.proto
* Persist values of sorted tags
---------
Co-authored-by: Gao Hongtao <[email protected]>
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
api/proto/banyandb/database/v1/schema.proto | 2 ++
banyand/internal/storage/index.go | 2 +-
banyand/stream/index.go | 9 +--------
banyand/stream/iter.go | 2 +-
banyand/stream/write.go | 6 ++++--
docs/api-reference.md | 1 +
pkg/index/index.go | 11 ++++++-----
pkg/index/inverted/inverted.go | 27 ++++++++++++++++++---------
pkg/index/inverted/sort.go | 4 ++--
pkg/index/inverted/sort_test.go | 9 +++++++--
pkg/index/testcases/duration.go | 2 +-
pkg/query/logical/common.go | 1 +
pkg/query/logical/plan.go | 3 +++
13 files changed, 48 insertions(+), 31 deletions(-)
diff --git a/api/proto/banyandb/database/v1/schema.proto
b/api/proto/banyandb/database/v1/schema.proto
index a7d391fd..7941866c 100644
--- a/api/proto/banyandb/database/v1/schema.proto
+++ b/api/proto/banyandb/database/v1/schema.proto
@@ -171,6 +171,8 @@ message IndexRule {
}
// analyzer analyzes tag value to support the full-text searching for
TYPE_INVERTED indices.
Analyzer analyzer = 5;
+ // no_sort indicates whether the index is not for sorting.
+ bool no_sort = 6;
}
// Subject defines which stream or measure would generate indices
diff --git a/banyand/internal/storage/index.go
b/banyand/internal/storage/index.go
index 6980369d..ed2b5ec5 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -216,7 +216,7 @@ func (s *seriesIndex) Search(ctx context.Context, series
[]*pbv1.Series, filter
var sortedSeriesList pbv1.SeriesList
for iter.Next() {
- seriesID, _ := iter.Val()
+ seriesID, _, _ := iter.Val()
if !pl.Contains(seriesID) {
continue
}
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index 32d48d14..e1a1d42a 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -63,16 +63,9 @@ func (e *elementIndex) Sort(sids []common.SeriesID, fieldKey
index.FieldKey, ord
}
func (e *elementIndex) Write(docs index.Documents) error {
- applied := make(chan struct{})
- err := e.store.Batch(index.Batch{
+ return e.store.Batch(index.Batch{
Documents: docs,
- Applied: applied,
})
- if err != nil {
- return err
- }
- <-applied
- return nil
}
func (e *elementIndex) Search(_ context.Context, seriesList pbv1.SeriesList,
filter index.Filter,
diff --git a/banyand/stream/iter.go b/banyand/stream/iter.go
index b2a75edb..39412ca2 100644
--- a/banyand/stream/iter.go
+++ b/banyand/stream/iter.go
@@ -78,7 +78,7 @@ func (s *searcherIterator) Next() bool {
s.err = io.EOF
return false
}
- itemID, seriesID := s.fieldIterator.Val()
+ itemID, seriesID, _ := s.fieldIterator.Val()
if !s.timeFilter(itemID) {
return s.Next()
}
diff --git a/banyand/stream/write.go b/banyand/stream/write.go
index 3ed4960f..c17b8df4 100644
--- a/banyand/stream/write.go
+++ b/banyand/stream/write.go
@@ -153,7 +153,8 @@ func (w *writeCallback) handle(dst
map[string]*elementsInGroup, writeEvent *stre
Analyzer: r.Analyzer,
SeriesID: series.ID,
},
- Term: encodeTagValue.value,
+ Term: encodeTagValue.value,
+ NoSort: r.GetNoSort(),
})
} else {
for _, val := range
encodeTagValue.valueArr {
@@ -163,7 +164,8 @@ func (w *writeCallback) handle(dst
map[string]*elementsInGroup, writeEvent *stre
Analyzer:
r.Analyzer,
SeriesID:
series.ID,
},
- Term: val,
+ Term: val,
+ NoSort: r.GetNoSort(),
})
}
}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 5ac35290..56e5cbcc 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -971,6 +971,7 @@ IndexRule should bind to a subject through an
IndexRuleBinding to generate prope
| type | [IndexRule.Type](#banyandb-database-v1-IndexRule-Type) | | type is
the IndexType of this IndexObject. |
| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | |
updated_at indicates when the IndexRule is updated |
| analyzer | [IndexRule.Analyzer](#banyandb-database-v1-IndexRule-Analyzer) |
| analyzer analyzes tag value to support the full-text searching for
TYPE_INVERTED indices. |
+| no_sort | [bool](#bool) | | no_sort indicates whether the index is not for
sorting. |
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 92b5efa3..43883931 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -84,8 +84,9 @@ func (f FieldKey) Equal(other FieldKey) bool {
// Field is a indexed item in a document.
type Field struct {
- Term []byte
- Key FieldKey
+ Term []byte
+ Key FieldKey
+ NoSort bool
}
// Marshal encodes f to bytes.
@@ -165,7 +166,7 @@ func (r RangeOpts) Between(value []byte) int {
// FieldIterator allows iterating over a field's posting values.
type FieldIterator interface {
Next() bool
- Val() (uint64, common.SeriesID)
+ Val() (uint64, common.SeriesID, []byte)
Close() error
}
@@ -178,8 +179,8 @@ func (i *dummyIterator) Next() bool {
return false
}
-func (i *dummyIterator) Val() (uint64, common.SeriesID) {
- return 0, 0
+func (i *dummyIterator) Val() (uint64, common.SeriesID, []byte) {
+ return 0, 0, nil
}
func (i *dummyIterator) Close() error {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 561dc77e..ceb9e739 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -235,13 +235,13 @@ func (s *store) MatchTerms(field index.Field) (list
posting.List, err error) {
if err != nil {
return nil, err
}
- iter := newBlugeMatchIterator(documentMatchIterator, reader)
+ iter := newBlugeMatchIterator(documentMatchIterator, reader, false)
defer func() {
err = multierr.Append(err, iter.Close())
}()
list = roaring.NewPostingList()
for iter.Next() {
- docID, _ := iter.Val()
+ docID, _, _ := iter.Val()
list.Insert(docID)
}
return list, err
@@ -269,13 +269,13 @@ func (s *store) Match(fieldKey index.FieldKey, matches
[]string) (posting.List,
if err != nil {
return nil, err
}
- iter := newBlugeMatchIterator(documentMatchIterator, reader)
+ iter := newBlugeMatchIterator(documentMatchIterator, reader, false)
defer func() {
err = multierr.Append(err, iter.Close())
}()
list := roaring.NewPostingList()
for iter.Next() {
- docID, _ := iter.Val()
+ docID, _, _ := iter.Val()
list.Insert(docID)
}
return list, err
@@ -288,7 +288,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts
index.RangeOpts) (list posti
}
list = roaring.NewPostingList()
for iter.Next() {
- docID, _ := iter.Val()
+ docID, _, _ := iter.Val()
list.Insert(docID)
}
err = multierr.Append(err, iter.Close())
@@ -363,7 +363,10 @@ func (s *store) run() {
docIDBuffer.Write(convert.Uint64ToBytes(d.DocID))
doc :=
bluge.NewDocument(docIDBuffer.String())
for _, f := range d.Fields {
- tf :=
bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Term).Sortable()
+ tf :=
bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Term)
+ if !f.NoSort {
+
tf.StoreValue().Sortable()
+ }
if f.Key.Analyzer !=
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
tf =
tf.WithAnalyzer(analyzers[f.Key.Analyzer])
}
@@ -411,14 +414,17 @@ type blugeMatchIterator struct {
delegated search.DocumentMatchIterator
err error
closer io.Closer
+ term []byte
docID uint64
seriesID common.SeriesID
+ sorted bool
}
-func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer
io.Closer) blugeMatchIterator {
+func newBlugeMatchIterator(delegated search.DocumentMatchIterator, closer
io.Closer, sorted bool) blugeMatchIterator {
return blugeMatchIterator{
delegated: delegated,
closer: closer,
+ sorted: sorted,
}
}
@@ -440,6 +446,9 @@ func (bmi *blugeMatchIterator) Next() bool {
// value = seriesID(8bytes)+docID(8bytes)
bmi.docID = convert.BytesToUint64(value[8:])
bmi.seriesID =
common.SeriesID(convert.BytesToUint64(value[:8]))
+ if bmi.sorted {
+ bmi.term = index.UnmarshalTerm(value)
+ }
}
}
return true
@@ -447,8 +456,8 @@ func (bmi *blugeMatchIterator) Next() bool {
return bmi.err == nil
}
-func (bmi *blugeMatchIterator) Val() (uint64, common.SeriesID) {
- return bmi.docID, bmi.seriesID
+func (bmi *blugeMatchIterator) Val() (uint64, common.SeriesID, []byte) {
+ return bmi.docID, bmi.seriesID, bmi.term
}
func (bmi *blugeMatchIterator) Close() error {
diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go
index 5d967292..72291ab9 100644
--- a/pkg/index/inverted/sort.go
+++ b/pkg/index/inverted/sort.go
@@ -110,7 +110,7 @@ func (si *sortIterator) loadCurrent() bool {
return false
}
- iter := newBlugeMatchIterator(documentMatchIterator, nil)
+ iter := newBlugeMatchIterator(documentMatchIterator, nil, true)
si.current = &iter
if si.next() {
return true
@@ -127,7 +127,7 @@ func (si *sortIterator) next() bool {
return false
}
-func (si *sortIterator) Val() (uint64, common.SeriesID) {
+func (si *sortIterator) Val() (uint64, common.SeriesID, []byte) {
return si.current.Val()
}
diff --git a/pkg/index/inverted/sort_test.go b/pkg/index/inverted/sort_test.go
index 3db32159..6fe759aa 100644
--- a/pkg/index/inverted/sort_test.go
+++ b/pkg/index/inverted/sort_test.go
@@ -167,8 +167,11 @@ func TestStore_Sort(t *testing.T) {
is.NotNil(iter)
var got result
for iter.Next() {
- docID, _ := iter.Val()
+ docID, _, term := iter.Val()
got.items = append(got.items, docID)
+ if term != nil {
+ got.terms = append(got.terms, term)
+ }
}
for i := 0; i < 10; i++ {
is.False(iter.Next())
@@ -178,7 +181,8 @@ func TestStore_Sort(t *testing.T) {
pl := data[w]
wants.items = append(wants.items,
pl.ToSlice()...)
}
- tester.Equal(wants, got, tt.name)
+ tester.Equal(wants.items, got.items, tt.name)
+ tester.Equal(len(got.items), len(got.terms), tt.name)
})
}
}
@@ -190,6 +194,7 @@ type args struct {
type result struct {
items []uint64
+ terms [][]byte
}
func setUpDuration(t *require.Assertions, store index.Writer)
map[int]posting.List {
diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go
index 0eaaa4da..944b2e4c 100644
--- a/pkg/index/testcases/duration.go
+++ b/pkg/index/testcases/duration.go
@@ -303,7 +303,7 @@ func RunDuration(t *testing.T, data map[int]posting.List,
store SimpleStore) {
is.NotNil(iter)
var got result
for iter.Next() {
- docID, _ := iter.Val()
+ docID, _, _ := iter.Val()
got.items = append(got.items, docID)
}
for i := 0; i < 10; i++ {
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 69bf90aa..88b77220 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -29,6 +29,7 @@ var (
errUnsupportedConditionValue = errors.New("unsupported condition value
type")
errInvalidCriteriaType = errors.New("invalid criteria type")
errIndexNotDefined = errors.New("index is not define for the
tag")
+ errIndexSortingUnsupported = errors.New("index does not support
sorting")
)
// Tag represents the combination of tag family and tag name.
diff --git a/pkg/query/logical/plan.go b/pkg/query/logical/plan.go
index b3658f8a..5417b6d8 100644
--- a/pkg/query/logical/plan.go
+++ b/pkg/query/logical/plan.go
@@ -72,6 +72,9 @@ func ParseOrderBy(s Schema, indexRuleName string, sort
modelv1.Sort) (*OrderBy,
if !defined {
return nil, errors.Wrap(errIndexNotDefined, indexRuleName)
}
+ if indexRule.NoSort {
+ return nil, errors.Wrap(errIndexSortingUnsupported,
indexRuleName)
+ }
projFieldSpecs, err := s.CreateTagRef(NewTags("",
indexRule.GetTags()...))
if err != nil {