This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch storage-column in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5df41fc942456848968f023379d893ea897e700b Author: Gao Hongtao <[email protected]> AuthorDate: Sat Nov 18 06:12:48 2023 +0000 Add the series index and attach it to tsdb Signed-off-by: Gao Hongtao <[email protected]> --- banyand/internal/storage/index.go | 239 +++++++++++++++++++++++++ banyand/internal/storage/index_test.go | 134 ++++++++++++++ banyand/internal/storage/series.go | 269 +++++++++++++++++++++++++++++ banyand/internal/storage/series_test.go | 178 +++++++++++++++++++ banyand/internal/storage/storage.go | 116 +++++++++++++ banyand/internal/storage/tsdb.go | 192 ++++++++++++++++++++ pkg/convert/number.go | 27 +++ pkg/fs/file_system.go | 2 + pkg/fs/local_file_system.go | 55 +++++- pkg/index/index.go | 21 +++ pkg/index/inverted/inverted.go | 93 ++++++---- pkg/index/inverted/inverted_series.go | 129 ++++++++++++++ pkg/index/inverted/inverted_series_test.go | 214 +++++++++++++++++++++++ pkg/index/lsm/lsm.go | 5 + 14 files changed, 1637 insertions(+), 37 deletions(-) diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go new file mode 100644 index 00000000..f496f283 --- /dev/null +++ b/banyand/internal/storage/index.go @@ -0,0 +1,239 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package storage + +import ( + "context" + "path" + + "github.com/pkg/errors" + "go.uber.org/multierr" + + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/index/posting" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// OrderBy specifies the order of the result. +type OrderBy struct { + Index *databasev1.IndexRule + Sort modelv1.Sort +} + +type seriesIndex struct { + store index.SeriesStore + l *logger.Logger + position common.Position +} + +func newSeriesIndex(ctx context.Context, root string) (*seriesIndex, error) { + si := &seriesIndex{ + l: logger.Fetch(ctx, "series_index"), + } + var err error + if si.store, err = inverted.NewStore(inverted.StoreOpts{ + Path: path.Join(root, "idx"), + Logger: si.l, + }); err != nil { + return nil, err + } + return si, nil +} + +var entityKey = index.FieldKey{} + +func (s *seriesIndex) createPrimary(series *Series) (*Series, error) { + if err := series.marshal(); err != nil { + return nil, err + } + id, err := s.store.Search(series.Buffer) + if err != nil { + return nil, err + } + if id > 0 { + return series, nil + } + evv := make([]byte, len(series.Buffer)) + copy(evv, series.Buffer) + if err := s.store.Create(index.Series{ + ID: series.ID, + EntityValues: evv, + }); err != nil { + return nil, errors.WithMessagef(err, "create entity values -> seriesID: %s", string(evv)) + } + return series, nil +} + +func (s *seriesIndex) createSecondary(docs index.Documents) error { + return s.store.Batch(docs) +} + +var rangeOpts = index.RangeOpts{} + +func (s *seriesIndex) SearchPrimary(ctx context.Context, series *Series) (SeriesList, error) { + var hasAny, hasWildcard bool + var prefixIndex int + + for i, tv := range series.EntityValues { + if tv == nil { + return nil, errors.New("nil tag value") + } + if tv.Value == AnyEntry { + if !hasAny { + hasAny = true + prefixIndex = i + } + continue + } + if hasAny { + hasWildcard = true + break + } + } + + var err error + + if hasAny { + var ss []index.Series + if hasWildcard { + if err = series.marshal(); err != nil { + return nil, err + } + ss, err = s.store.SearchWildcard(series.Buffer) + if err != nil { + return nil, err + } + return convertIndexSeriesToSeriesList(ss) + } + series.EntityValues = series.EntityValues[:prefixIndex] + if err = series.marshal(); err != nil { + return nil, err + } + ss, err = s.store.SearchPrefix(series.Buffer) + if err != nil { + return nil, err + } + return convertIndexSeriesToSeriesList(ss) + } + if err = series.marshal(); err != nil { + return nil, err + } + var seriesID common.SeriesID + seriesID, err = s.store.Search(series.Buffer) + if err != nil { + return nil, err + } + if seriesID > 0 { + series.ID = seriesID + return SeriesList{series}, nil + } + return nil, nil +} + +func convertIndexSeriesToSeriesList(indexSeries []index.Series) (SeriesList, error) { + seriesList := make(SeriesList, 0, len(indexSeries)) + for _, s := range indexSeries { + var series Series + series.ID = s.ID + if err := series.unmarshal(s.EntityValues); err != nil { + return nil, err + } + seriesList = append(seriesList, &series) + } + return seriesList, nil +} + +func (s *seriesIndex) SearchSecondary(ctx context.Context, series *Series, filter index.Filter, order *OrderBy) (SeriesList, error) { + seriesList, err := s.SearchPrimary(ctx, series) + if err != nil { + return nil, err + } + + pl := seriesList.toList() + if filter != nil { + var plFilter posting.List + plFilter, err = filter.Execute(func(ruleType databasev1.IndexRule_Type) (index.Searcher, error) { + return s.store, nil + }, 0) + if err != nil { + return nil, err + } + if err = pl.Intersect(plFilter); err != nil { + return nil, err + } + } + + if order == nil { + return filterSeriesList(seriesList, pl), nil + } + + fieldKey := index.FieldKey{ + IndexRuleID: order.Index.GetMetadata().Id, + } + iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort) + if err != nil { + return nil, err + } + defer func() { + err = multierr.Append(err, iter.Close()) + }() + + var sortedSeriesList SeriesList + for iter.Next() { + pv := iter.Val().Value + if err = pv.Intersect(pl); err != nil { + return nil, err + } + if pv.IsEmpty() { + continue + } + sortedSeriesList = appendSeriesList(sortedSeriesList, seriesList, pv) + if err != nil { + return nil, err + } + } + return sortedSeriesList, err +} + +func filterSeriesList(seriesList SeriesList, filter posting.List) SeriesList { + for i := 0; i < len(seriesList); i++ { + if !filter.Contains(uint64(seriesList[i].ID)) { + seriesList = append(seriesList[:i], seriesList[i+1:]...) + i-- + } + } + return seriesList +} + +func appendSeriesList(dest, src SeriesList, filter posting.List) SeriesList { + for i := 0; i < len(src); i++ { + if !filter.Contains(uint64(src[i].ID)) { + continue + } + dest = append(dest, src[i]) + } + return dest +} + +func (s *seriesIndex) Close() error { + return s.store.Close() +} diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go new file mode 100644 index 00000000..3b400aca --- /dev/null +++ b/banyand/internal/storage/index_test.go @@ -0,0 +1,134 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package storage + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +var testSeriesPool SeriesPool + +func TestSeriesIndex_Primary(t *testing.T) { + ctx := context.Background() + tester := assert.New(t) + path, fn := setUp(require.New(t)) + si, err := newSeriesIndex(ctx, path) + tester.NoError(err) + defer func() { + tester.NoError(si.Close()) + fn() + }() + for i := 0; i < 100; i++ { + series := testSeriesPool.Get() + series.Subject = "service_instance_latency" + series.EntityValues = []*modelv1.TagValue{ + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: fmt.Sprintf("svc_%d", i)}}}, + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: fmt.Sprintf("svc_%d_instance_%d", i, i)}}}, + } + + // Initialize test data + series, err = si.createPrimary(series) + if err != nil { + t.Fatalf("Failed to create primary series: %v", err) + } + tester.True(series.ID > 0) + testSeriesPool.Put(series) + } + // Restart the index + tester.NoError(si.Close()) + si, err = newSeriesIndex(ctx, path) + tester.NoError(err) + tests := []struct { + name string + subject string + entityValues []*modelv1.TagValue + expected []*modelv1.TagValue + }{ + { + name: "Search", + subject: "service_instance_latency", + entityValues: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}}, + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1_instance_1"}}}, + }, + expected: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}}, + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1_instance_1"}}}, + }, + }, + { + name: "Prefix", + subject: "service_instance_latency", + entityValues: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}}, + {Value: AnyEntry}, + }, + expected: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}}, + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1_instance_1"}}}, + }, + }, + { + name: "Wildcard", + subject: "service_instance_latency", + entityValues: []*modelv1.TagValue{ + {Value: AnyEntry}, + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1_instance_1"}}}, + }, + expected: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1"}}}, + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_1_instance_1"}}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + seriesQuery := testSeriesPool.Get() + defer testSeriesPool.Put(seriesQuery) + seriesQuery.Subject = tt.subject + seriesQuery.EntityValues = tt.entityValues + sl, err := si.SearchPrimary(ctx, seriesQuery) + tester.NoError(err) + tester.Equal(1, len(sl)) + tester.Equal(tt.subject, sl[0].Subject) + tester.Equal(tt.expected[0].GetStr().GetValue(), sl[0].EntityValues[0].GetStr().GetValue()) + tester.Equal(tt.expected[1].GetStr().GetValue(), sl[0].EntityValues[1].GetStr().GetValue()) + tester.True(sl[0].ID > 0) + }) + } +} + +func setUp(t *require.Assertions) (tempDir string, deferFunc func()) { + t.NoError(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })) + tempDir, deferFunc = test.Space(t) + return tempDir, deferFunc +} diff --git a/banyand/internal/storage/series.go b/banyand/internal/storage/series.go new file mode 100644 index 00000000..a5c81fd1 --- /dev/null +++ b/banyand/internal/storage/series.go @@ -0,0 +1,269 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package storage + +import ( + "bytes" + "sort" + "sync" + + "github.com/pkg/errors" + + "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index/posting" + "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" +) + +type Series struct { + Subject string + EntityValues []*modelv1.TagValue + Buffer []byte + ID common.SeriesID +} + +func (s *Series) marshal() error { + s.Buffer = marshalEntityValue(s.Buffer, convert.StringToBytes(s.Subject)) + var err error + for _, tv := range s.EntityValues { + if s.Buffer, err = marshalTagValue(s.Buffer, tv); err != nil { + return errors.WithMessage(err, "marshal subject and entity values") + } + } + s.ID = common.SeriesID(convert.Hash(s.Buffer)) + return nil +} + +func (s *Series) unmarshal(src []byte) error { + var err error + s.Buffer = s.Buffer[:0] + if s.Buffer, src, err = unmarshalEntityValue(s.Buffer, src); err != nil { + return errors.WithMessage(err, "unmarshal subject") + } + s.Subject = string(s.Buffer) + for len(src) > 0 { + s.Buffer = s.Buffer[:0] + var tv *modelv1.TagValue + if s.Buffer, src, tv, err = unmarshalTagValue(s.Buffer, src); err != nil { + return errors.WithMessage(err, "unmarshal tag value") + } + s.EntityValues = append(s.EntityValues, tv) + } + return nil +} + +func (s *Series) reset() { + s.ID = 0 + s.Subject = "" + s.EntityValues = s.EntityValues[:0] + s.Buffer = s.Buffer[:0] +} + +type SeriesPool struct { + pool sync.Pool +} + +func (sp *SeriesPool) Get() *Series { + sv := sp.pool.Get() + if sv == nil { + return &Series{} + } + return sv.(*Series) +} + +func (sp *SeriesPool) Put(s *Series) { + s.reset() + sp.pool.Put(s) +} + +// AnyEntry is the `*` for a regular expression. It could match "any" Entry in an Entity. +var AnyEntry = &modelv1.TagValue_Null{} + +const ( + entityDelimiter = '|' + escape = '\\' +) + +var anyWildcard = []byte{'*'} + +func marshalEntityValue(dest, src []byte) []byte { + if src == nil { + dest = append(dest, entityDelimiter) + return dest + } + if bytes.IndexByte(src, entityDelimiter) < 0 && bytes.IndexByte(src, escape) < 0 { + dest = append(dest, src...) + dest = append(dest, entityDelimiter) + return dest + } + for _, b := range src { + if b == entityDelimiter || b == escape { + dest = append(dest, escape) + } + dest = append(dest, b) + } + dest = append(dest, entityDelimiter) + return dest +} + +func unmarshalEntityValue(dest, src []byte) ([]byte, []byte, error) { + if len(src) == 0 { + return nil, nil, errors.New("empty entity value") + } + if src[0] == entityDelimiter { + return dest, src[1:], nil + } + for len(src) > 0 { + if src[0] == escape { + if len(src) < 2 { + return nil, nil, errors.New("invalid escape character") + } + src = src[1:] + dest = append(dest, src[0]) + } else if src[0] == entityDelimiter { + return dest, src[1:], nil + } else { + dest = append(dest, src[0]) + } + src = src[1:] + } + return nil, nil, errors.New("invalid entity value") +} + +func marshalTagValue(dest []byte, tv *modelv1.TagValue) ([]byte, error) { + if tv.Value == AnyEntry { + dest = marshalEntityValue(dest, anyWildcard) + return dest, nil + } + switch tv.Value.(type) { + case *modelv1.TagValue_Str: + dest = append(dest, 0) + dest = marshalEntityValue(dest, convert.StringToBytes(tv.GetStr().Value)) + case *modelv1.TagValue_Int: + dest = append(dest, 1) + dest = convert.AppendInt64ToBytes(dest, tv.GetInt().Value) + dest = marshalEntityValue(dest, nil) + case *modelv1.TagValue_BinaryData: + dest = append(dest, 2) + dest = marshalEntityValue(dest, tv.GetBinaryData()) + default: + return nil, errors.New("unsupported tag value type: " + tv.String()) + } + return dest, nil +} + +func unmarshalTagValue(dest []byte, src []byte) ([]byte, []byte, *modelv1.TagValue, error) { + if len(src) == 0 { + return nil, nil, nil, errors.New("empty tag value") + } + var err error + switch src[0] { + case 0: + if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != nil { + return nil, nil, nil, errors.WithMessage(err, "unmarshal string tag value") + } + return dest, src, &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: string(dest), + }, + }, + }, nil + case 1: + if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != nil { + return nil, nil, nil, errors.WithMessage(err, "unmarshal int tag value") + } + return dest, src, &modelv1.TagValue{ + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: convert.BytesToInt64(dest), + }, + }, + }, nil + case 2: + if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != nil { + return nil, nil, nil, errors.WithMessage(err, "unmarshal binary tag value") + } + data := make([]byte, len(dest)) + copy(data, dest) + return dest, src, &modelv1.TagValue{ + Value: &modelv1.TagValue_BinaryData{ + BinaryData: data, + }, + }, nil + } + return nil, nil, nil, errors.New("unsupported tag value type") +} + +// SeriesList is a collection of Series. +type SeriesList []*Series + +func (a SeriesList) Len() int { + return len(a) +} + +func (a SeriesList) Less(i, j int) bool { + return a[i].ID < a[j].ID +} + +func (a SeriesList) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +// Merge other SeriesList with this one to create a new SeriesList. +func (a SeriesList) Merge(other SeriesList) SeriesList { + if len(other) == 0 { + return a + } + sort.Sort(other) + if len(a) == 0 { + return other + } + final := SeriesList{} + i := 0 + j := 0 + for i < len(a) && j < len(other) { + if a[i].ID < other[j].ID { + final = append(final, a[i]) + i++ + } else { + // deduplication + if a[i].ID == other[j].ID { + i++ + } + final = append(final, other[j]) + j++ + } + } + for ; i < len(a); i++ { + final = append(final, a[i]) + } + for ; j < len(other); j++ { + final = append(final, other[j]) + } + return final +} + +func (a SeriesList) toList() posting.List { + pl := roaring.NewPostingList() + for _, v := range a { + pl.Insert(uint64(v.ID)) + } + return pl +} diff --git a/banyand/internal/storage/series_test.go b/banyand/internal/storage/series_test.go new file mode 100644 index 00000000..6862a340 --- /dev/null +++ b/banyand/internal/storage/series_test.go @@ -0,0 +1,178 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" +) + +func TestMarshalAndUnmarshalEntityValue(t *testing.T) { + tests := []struct { + name string + src []byte + }{ + { + name: "plain text", + src: []byte("plainText"), + }, + { + name: "text with entityDelimiter", + src: []byte("text|with|delimiter"), + }, + { + name: "text with escape", + src: []byte("text\\with\\escape"), + }, + { + name: "text with both special characters", + src: []byte("text|with\\both"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dest := []byte{} + + // Test marshalEntityValue + marshaled := marshalEntityValue(dest, tt.src) + + // Add assertions + assert.NotNil(t, marshaled) + + // Test unmarshalEntityValue + unmarshaledDest, unmarshaledSrc, err := unmarshalEntityValue(dest, marshaled) + + // Add assertions + assert.NoError(t, err) + assert.NotNil(t, unmarshaledDest) + assert.NotNil(t, unmarshaledSrc) + + // Check that unmarshaling the marshaled value gives the original value + assert.Equal(t, tt.src, unmarshaledDest) + }) + } +} + +func TestMarshalAndUnmarshalTagValue(t *testing.T) { + tests := []struct { + src *modelv1.TagValue + name string + }{ + { + name: "string value", + src: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "stringValue"}}}, + }, + { + name: "int value", + src: &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 123}}}, + }, + { + name: "binary data", + src: &modelv1.TagValue{Value: &modelv1.TagValue_BinaryData{BinaryData: []byte("binaryData")}}, + }, + { + name: "unsupported type", + src: &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dest := []byte{} + // Test marshalTagValue + dest, err := marshalTagValue(dest, tt.src) + if tt.name == "unsupported type" { + assert.Error(t, err) + dest = dest[:0] + _, _, _, err = unmarshalTagValue(dest, []byte("unsupported type")) + assert.Error(t, err) + return + } + marshaled := make([]byte, len(dest)) + copy(marshaled, dest) + + // Add assertions + assert.NoError(t, err) + assert.True(t, len(marshaled) > 0) + + // Test unmarshalTagValue + dest = dest[:0] + dest, marshaled, unmarshaled, err := unmarshalTagValue(dest, marshaled) + + // Add assertions + assert.NoError(t, err) + assert.True(t, len(dest) > 0) + assert.True(t, len(marshaled) == 0) + assert.NotNil(t, unmarshaled) + + // Check that unmarshaling the marshaled value gives the original value + assert.Equal(t, tt.src, unmarshaled) + }) + } +} + +func TestMarshalAndUnmarshalSeries(t *testing.T) { + tests := []struct { + src *Series + name string + }{ + { + name: "series with entity values", + src: &Series{Subject: "subject", EntityValues: []*modelv1.TagValue{{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "stringValue"}}}}}, + }, + { + name: "series without entity values", + src: &Series{Subject: "subject", EntityValues: []*modelv1.TagValue{}}, + }, + { + name: "series with multiple entity values", + src: &Series{Subject: "subject", EntityValues: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "stringValue"}}}, + {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 123}}}, + }}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test Series.Marshal + err := tt.src.marshal() + + // Add assertions + assert.NoError(t, err) + assert.True(t, len(tt.src.Buffer) > 0) + marshaled := make([]byte, len(tt.src.Buffer)) + copy(marshaled, tt.src.Buffer) + + // Test Series.Unmarshal + tt.src.reset() + err = tt.src.unmarshal(marshaled) + + // Add assertions + assert.NoError(t, err) + + // Check that unmarshaling the marshaled value gives the original value + assert.Equal(t, tt.src.Subject, tt.src.Subject) + assert.Equal(t, tt.src.EntityValues, tt.src.EntityValues) + }) + } +} diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go new file mode 100644 index 00000000..ff0c151a --- /dev/null +++ b/banyand/internal/storage/storage.go @@ -0,0 +1,116 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package tsdb implements a time-series-based storage engine. +// It provides: +// - Partition data based on a time axis. +// - Sharding data based on a series id which represents a unique entity of stream/measure +// - Retrieving data based on index.Filter. +// - Cleaning expired data, or the data retention. +package storage + +import ( + "io" + "time" + + "github.com/pkg/errors" + + "github.com/apache/skywalking-banyandb/api/common" +) + +const ( + shardPathPrefix = "shard" + shardTemplate = shardPathPrefix + "-%d" + metadataPath = "metadata" + segTemplate = "seg-%s" + + hourFormat = "2006010215" + dayFormat = "20060102" + + dirPerm = 0o700 +) + +var ( + // ErrUnknownShard indicates that the shard is not found. + ErrUnknownShard = errors.New("unknown shard") + errOpenDatabase = errors.New("fails to open the database") +) + +// Supplier allows getting a tsdb's runtime. +type Supplier interface { + SupplyTSDB() Database +} + +// Database allows listing and getting shard details. +type Database interface { + io.Closer + CreateShardsAndGetByID(id common.ShardID) (Shard, error) + Shards() []Shard + Shard(id common.ShardID) (Shard, error) +} + +// Shard allows accessing data of tsdb. +type Shard interface { + io.Closer + ID() common.ShardID + // Series() SeriesDatabase +} + +// IntervalUnit denotes the unit of a time point. +type IntervalUnit int + +// Available IntervalUnits. HOUR and DAY are adequate for the APM scenario. +const ( + HOUR IntervalUnit = iota + DAY +) + +func (iu IntervalUnit) String() string { + switch iu { + case HOUR: + return "hour" + case DAY: + return "day" + } + panic("invalid interval unit") +} + +// IntervalRule defines a length of two points in time. +type IntervalRule struct { + Unit IntervalUnit + Num int +} + +func (ir IntervalRule) nextTime(current time.Time) time.Time { + switch ir.Unit { + case HOUR: + return current.Add(time.Hour * time.Duration(ir.Num)) + case DAY: + return current.AddDate(0, 0, ir.Num) + } + panic("invalid interval unit") +} + +func (ir IntervalRule) estimatedDuration() time.Duration { + switch ir.Unit { + case HOUR: + return time.Hour * time.Duration(ir.Num) + case DAY: + return 24 * time.Hour * time.Duration(ir.Num) + } + panic("invalid interval unit") +} diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go new file mode 100644 index 00000000..aab9c8c1 --- /dev/null +++ b/banyand/internal/storage/tsdb.go @@ -0,0 +1,192 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package tsdb implements a time-series-based storage engine. +// It provides: +// - Partition data based on a time axis. +// - Sharding data based on a series id which represents a unique entity of stream/measure +// - Retrieving data based on index.Filter. +// - Cleaning expired data, or the data retention. +package storage + +import ( + "context" + "path/filepath" + "sync" + "sync/atomic" + + "github.com/pkg/errors" + "go.uber.org/multierr" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// IndexGranularity denotes the granularity of the local index. +type IndexGranularity int + +// The options of the local index granularity. +const ( + IndexGranularityBlock IndexGranularity = iota + IndexGranularitySeries +) + +var _ Database = (*database)(nil) + +// DatabaseOpts wraps options to create a tsdb. +type DatabaseOpts struct { + Location string + SegmentInterval IntervalRule + TTL IntervalRule + IndexGranularity IndexGranularity + ShardNum uint32 +} + +type ( + // SegID is the kind of a segment. + SegID uint32 +) + +func GenerateSegID(unit IntervalUnit, suffix int) SegID { + return SegID(unit)<<31 | ((SegID(suffix) << 1) >> 1) +} + +func parseSuffix(id SegID) int { + return int((id << 1) >> 1) +} + +func segIDToBytes(id SegID) []byte { + return convert.Uint32ToBytes(uint32(id)) +} + +func readSegID(data []byte, offset int) (SegID, int) { + end := offset + 4 + return SegID(convert.BytesToUint32(data[offset:end])), end +} + +type database struct { + fileSystem fs.FileSystem + logger *logger.Logger + index *seriesIndex + location string + sLst []Shard + segmentSize IntervalRule + ttl IntervalRule + sync.RWMutex + shardNum uint32 + shardCreationState uint32 +} + +func (d *database) CreateShardsAndGetByID(id common.ShardID) (Shard, error) { + if atomic.LoadUint32(&d.shardCreationState) != 0 { + return d.shard(id) + } + d.Lock() + defer d.Unlock() + if atomic.LoadUint32(&d.shardCreationState) != 0 { + return d.shard(id) + } + loadedShardsNum := len(d.sLst) + if loadedShardsNum < int(d.shardNum) { + _, err := createDatabase(d, loadedShardsNum) + if err != nil { + return nil, errors.WithMessage(err, "create the database failed") + } + } + atomic.StoreUint32(&d.shardCreationState, 1) + return d.shard(id) +} + +func (d *database) Shards() []Shard { + d.RLock() + defer d.RUnlock() + return d.sLst +} + +func (d *database) Shard(id common.ShardID) (Shard, error) { + d.RLock() + defer d.RUnlock() + return d.shard(id) +} + +func (d *database) shard(id common.ShardID) (Shard, error) { + if uint(id) >= uint(len(d.sLst)) { + return nil, ErrUnknownShard + } + return d.sLst[id], nil +} + +func (d *database) Close() error { + d.Lock() + defer d.Unlock() + var err error + for _, s := range d.sLst { + innerErr := s.Close() + if innerErr != nil { + err = multierr.Append(err, innerErr) + } + } + return err +} + +// OpenDatabase returns a new tsdb runtime. This constructor will create a new database if it's absent, +// or load an existing one. +func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) { + if opts.SegmentInterval.Num == 0 { + return nil, errors.Wrap(errOpenDatabase, "segment interval is absent") + } + if opts.TTL.Num == 0 { + return nil, errors.Wrap(errOpenDatabase, "ttl is absent") + } + p := common.GetPosition(ctx) + l := logger.Fetch(ctx, p.Database) + fileSystem := fs.NewLocalFileSystemWithLogger(l) + path := filepath.Clean(opts.Location) + fileSystem.Mkdir(path, dirPerm) + si, err := newSeriesIndex(ctx, path) + if err != nil { + return nil, errors.Wrap(errOpenDatabase, errors.WithMessage(err, "create series index failed").Error()) + } + db := &database{ + location: path, + shardNum: opts.ShardNum, + logger: logger.Fetch(ctx, p.Database), + segmentSize: opts.SegmentInterval, + ttl: opts.TTL, + fileSystem: fileSystem, + index: si, + } + db.logger.Info().Str("path", opts.Location).Msg("initialized") + return db, nil +} + +func createDatabase(db *database, startID int) (Database, error) { + var err error + for i := startID; i < int(db.shardNum); i++ { + db.logger.Info().Int("shard_id", i).Msg("creating a shard") + // so, errNewShard := OpenShard(common.ShardID(i), + // db.location, db.segmentSize, db.blockSize, db.ttl, defaultBlockQueueSize, defaultMaxBlockQueueSize, db.enableWAL) + // if errNewShard != nil { + // err = multierr.Append(err, errNewShard) + // continue + // } + // db.sLst = append(db.sLst, so) + } + return db, err +} diff --git a/pkg/convert/number.go b/pkg/convert/number.go index 0074c588..55e8fbc2 100644 --- a/pkg/convert/number.go +++ b/pkg/convert/number.go @@ -29,6 +29,19 @@ func Uint64ToBytes(u uint64) []byte { return bs } +func AppendUint64ToBytes(dest []byte, u uint64) []byte { + // Use BigEndian to marshal uint64. + return append(dest, + byte(u>>56), + byte(u>>48), + byte(u>>40), + byte(u>>32), + byte(u>>24), + byte(u>>16), + byte(u>>8), + byte(u)) +} + // Int64ToBytes converts int64 to bytes. func Int64ToBytes(i int64) []byte { abs := i @@ -44,6 +57,20 @@ func Int64ToBytes(i int64) []byte { return Uint64ToBytes(u) } +func AppendInt64ToBytes(dest []byte, i int64) []byte { + abs := i + if i < 0 { + abs = -abs + } + u := uint64(abs) + if i >= 0 { + u |= 1 << 63 + } else { + u = 1<<63 - u + } + return AppendUint64ToBytes(dest, u) +} + // Uint32ToBytes converts uint32 to bytes. func Uint32ToBytes(u uint32) []byte { bs := make([]byte, 4) diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go index 23f91d83..8d028fbf 100644 --- a/pkg/fs/file_system.go +++ b/pkg/fs/file_system.go @@ -54,6 +54,8 @@ type File interface { // FileSystem operation interface. type FileSystem interface { + // Mkdir creates a new directory with the specified name and permission. + Mkdir(path string, permission Mode) // Create and open the file by specified name and mode. CreateFile(name string, permission Mode) (File, error) // Flush mode, which flushes all data to one file. diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go index 76df0b1f..fdba4e88 100644 --- a/pkg/fs/local_file_system.go +++ b/pkg/fs/local_file_system.go @@ -24,12 +24,13 @@ import ( "fmt" "io" "os" + "path/filepath" "github.com/apache/skywalking-banyandb/pkg/logger" ) -// LocalFileSystem implements the File System interface. -type LocalFileSystem struct { +// localFileSystem implements the File System interface. +type localFileSystem struct { logger *logger.Logger } @@ -40,11 +41,17 @@ type LocalFile struct { // NewLocalFileSystem is used to create the Local File system. func NewLocalFileSystem() FileSystem { - return &LocalFileSystem{ + return &localFileSystem{ logger: logger.GetLogger(moduleName), } } +func NewLocalFileSystemWithLogger(parent *logger.Logger) FileSystem { + return &localFileSystem{ + logger: parent.Named(moduleName), + } +} + func readErrorHandle(operation string, err error, name string, size int) (int, error) { switch { case errors.Is(err, io.EOF): @@ -67,8 +74,44 @@ func readErrorHandle(operation string, err error, name string, size int) (int, e } } +// Mkdir implements FileSystem. +func (fs *localFileSystem) Mkdir(path string, permission Mode) { + if fs.pathExist(path) { + return + } + if err := os.MkdirAll(path, 0o755); err != nil { + fs.logger.Panic().Str("path", path).Err(err).Msg("failed to create directory") + } + parentDirPath := filepath.Dir(path) + fs.syncPath(parentDirPath) +} + +func (fs *localFileSystem) pathExist(path string) bool { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return false + } + fs.logger.Panic().Str("path", path).Err(err).Msg("failed to stat path") + } + return true +} + +func (fs *localFileSystem) syncPath(path string) { + d, err := os.Open(path) + if err != nil { + fs.logger.Panic().Str("path", path).Err(err).Msg("failed to open directory") + } + if err := d.Sync(); err != nil { + _ = d.Close() + fs.logger.Panic().Str("path", path).Err(err).Msg("failed to sync directory") + } + if err := d.Close(); err != nil { + fs.logger.Panic().Str("path", path).Err(err).Msg("ailed to sync directory") + } +} + // CreateFile is used to create and open the file by specified name and mode. -func (fs *LocalFileSystem) CreateFile(name string, permission Mode) (File, error) { +func (fs *localFileSystem) CreateFile(name string, permission Mode) (File, error) { file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) switch { case err == nil: @@ -94,7 +137,7 @@ func (fs *LocalFileSystem) CreateFile(name string, permission Mode) (File, error } // Write flushes all data to one file. -func (fs *LocalFileSystem) Write(buffer []byte, name string, permission Mode) (int, error) { +func (fs *localFileSystem) Write(buffer []byte, name string, permission Mode) (int, error) { file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) if err != nil { switch { @@ -129,7 +172,7 @@ func (fs *LocalFileSystem) Write(buffer []byte, name string, permission Mode) (i } // DeleteFile is used to delete the file. -func (fs *LocalFileSystem) DeleteFile(name string) error { +func (fs *localFileSystem) DeleteFile(name string) error { err := os.Remove(name) switch { case err == nil: diff --git a/pkg/index/index.go b/pkg/index/index.go index 1667d85b..27eacb88 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -191,9 +191,17 @@ type PostingValue struct { Term []byte } +type Document struct { + Fields []Field + DocID uint64 +} + +type Documents []Document + // Writer allows writing fields and docID in a document to a index. type Writer interface { Write(fields []Field, docID uint64) error + Batch(docs Documents) error } // FieldIterable allows building a FieldIterator. @@ -218,6 +226,19 @@ type Store interface { SizeOnDisk() int64 } +type Series struct { + ID common.SeriesID + EntityValues []byte +} + +type SeriesStore interface { + Store + Create(Series) error + Search([]byte) (common.SeriesID, error) + SearchPrefix([]byte) ([]Series, error) + SearchWildcard([]byte) ([]Series, error) +} + // GetSearcher returns a searcher associated with input index rule type. type GetSearcher func(location databasev1.IndexRule_Type) (Searcher, error) diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 2437a627..f40283b5 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -50,6 +50,7 @@ const ( docIDField = "_id" batchSize = 1024 seriesIDField = "series_id" + entityField = "entity" idField = "id" ) @@ -77,11 +78,6 @@ type StoreOpts struct { BatchWaitSec int64 } -type doc struct { - fields []index.Field - docID uint64 -} - type flushEvent struct { onComplete chan struct{} } @@ -95,8 +91,20 @@ type store struct { batchInterval time.Duration } +func (s *store) Batch(docs index.Documents) error { + if !s.closer.AddRunning() { + return nil + } + defer s.closer.Done() + select { + case <-s.closer.CloseNotify(): + case s.ch <- docs: + } + return nil +} + // NewStore create a new inverted index repository. -func NewStore(opts StoreOpts) (index.Store, error) { +func NewStore(opts StoreOpts) (index.SeriesStore, error) { indexConfig := blugeIndex.DefaultConfig(opts.Path) indexConfig.MergePlanOptions.MaxSegmentsPerTier = 1 indexConfig.MergePlanOptions.MaxSegmentSize = 500000 @@ -141,9 +149,9 @@ func (s *store) Write(fields []index.Field, docID uint64) error { defer s.closer.Done() select { case <-s.closer.CloseNotify(): - case s.ch <- doc{ - fields: fields, - docID: docID, + case s.ch <- index.Document{ + Fields: fields, + DocID: docID, }: } return nil @@ -315,6 +323,7 @@ func (s *store) run() { } defer flush() var docIDBuffer bytes.Buffer + seriesIDBuffer := make([]byte, 8) for { timer := time.NewTimer(s.batchInterval) select { @@ -330,33 +339,55 @@ func (s *store) run() { case flushEvent: flush() close(d.onComplete) - case doc: - // TODO: generate a segment directly. - fk := d.fields[0].Key - docIDBuffer.Reset() - if fk.HasSeriesID() { - docIDBuffer.Write(fk.SeriesID.Marshal()) - } - docIDBuffer.Write(convert.Uint64ToBytes(d.docID)) - doc := bluge.NewDocument(docIDBuffer.String()) - toAddSeriesIDField := false - for _, f := range d.fields { - if f.Key.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED { - doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Marshal()).StoreValue().Sortable()) - } else { - toAddSeriesIDField = true - doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Term).StoreValue().Sortable(). - WithAnalyzer(analyzers[f.Key.Analyzer])) - } - } - if toAddSeriesIDField && fk.HasSeriesID() { - doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField, fk.SeriesID.Marshal())) - } + case index.Series: + seriesIDBuffer = seriesIDBuffer[:0] + seriesIDBuffer = convert.AppendUint64ToBytes(seriesIDBuffer, uint64(d.ID)) + doc := bluge.NewDocument(string(seriesIDBuffer)) + doc.AddField(bluge.NewKeywordFieldBytes(entityField, d.EntityValues).StoreValue()) size++ batch.Update(doc.ID(), doc) if size >= batchSize { flush() } + case index.Document, index.Documents: + var docs []index.Document + var isBatch bool + switch v := d.(type) { + case index.Document: + docs = []index.Document{v} + case index.Documents: + docs = v + isBatch = true + } + + for _, d := range docs { + // TODO: generate a segment directly. + fk := d.Fields[0].Key + docIDBuffer.Reset() + if fk.HasSeriesID() { + docIDBuffer.Write(fk.SeriesID.Marshal()) + } + docIDBuffer.Write(convert.Uint64ToBytes(d.DocID)) + doc := bluge.NewDocument(docIDBuffer.String()) + toAddSeriesIDField := false + for _, f := range d.Fields { + if f.Key.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED { + doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Marshal()).StoreValue().Sortable()) + } else { + toAddSeriesIDField = true + doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Term).StoreValue().Sortable(). + WithAnalyzer(analyzers[f.Key.Analyzer])) + } + } + if toAddSeriesIDField && fk.HasSeriesID() { + doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField, fk.SeriesID.Marshal())) + } + size++ + batch.Update(doc.ID(), doc) + } + if isBatch || size >= batchSize { + flush() + } } case <-timer.C: flush() diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go new file mode 100644 index 00000000..f3a477ed --- /dev/null +++ b/pkg/index/inverted/inverted_series.go @@ -0,0 +1,129 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package inverted implements a inverted index repository. +package inverted + +import ( + "context" + + "github.com/blugelabs/bluge" + "github.com/blugelabs/bluge/search" + "github.com/pkg/errors" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index" +) + +// Create implements index.SeriesStore. +func (s *store) Create(series index.Series) error { + if !s.closer.AddRunning() { + return nil + } + defer s.closer.Done() + select { + case <-s.closer.CloseNotify(): + case s.ch <- series: + } + return nil +} + +// Search implements index.SeriesStore. +func (s *store) Search(term []byte) (common.SeriesID, error) { + reader, err := s.writer.Reader() + if err != nil { + return 0, err + } + query := bluge.NewTermQuery(convert.BytesToString(term)).SetField(entityField) + dmi, err := reader.Search(context.Background(), bluge.NewAllMatches(query)) + if err != nil { + return 0, err + } + var result common.SeriesID + next, err := dmi.Next() + if err == nil && next != nil { + err = next.VisitStoredFields(func(field string, value []byte) bool { + if field == docIDField { + result = common.SeriesID(convert.BytesToUint64(value)) + return false + } + return true + }) + if err != nil { + return 0, errors.WithMessage(err, "visit stored fields") + } + } + if err != nil { + return 0, errors.WithMessage(err, "iterate document match iterator") + } + return result, nil +} + +// SearchPrefix implements index.SeriesStore. +func (s *store) SearchPrefix(prefix []byte) ([]index.Series, error) { + reader, err := s.writer.Reader() + if err != nil { + return nil, err + } + query := bluge.NewPrefixQuery(convert.BytesToString(prefix)).SetField(entityField) + dmi, err := reader.Search(context.Background(), bluge.NewAllMatches(query)) + if err != nil { + return nil, err + } + return parseResult(dmi) +} + +// SearchWildcard implements index.SeriesStore. +func (s *store) SearchWildcard(wildcard []byte) ([]index.Series, error) { + reader, err := s.writer.Reader() + if err != nil { + return nil, err + } + query := bluge.NewWildcardQuery(convert.BytesToString(wildcard)).SetField(entityField) + dmi, err := reader.Search(context.Background(), bluge.NewAllMatches(query)) + if err != nil { + return nil, err + } + return parseResult(dmi) +} + +func parseResult(dmi search.DocumentMatchIterator) ([]index.Series, error) { + result := make([]index.Series, 0, 10) + next, err := dmi.Next() + for err == nil && next != nil { + var series index.Series + err = next.VisitStoredFields(func(field string, value []byte) bool { + if field == docIDField { + series.ID = common.SeriesID(convert.BytesToUint64(value)) + } + if field == entityField { + series.EntityValues = value + } + return true + }) + if err != nil { + return nil, errors.WithMessage(err, "visit stored fields") + } + result = append(result, series) + next, err = dmi.Next() + } + if err != nil { + return nil, errors.WithMessage(err, "iterate document match iterator") + } + return result, nil +} diff --git a/pkg/index/inverted/inverted_series_test.go b/pkg/index/inverted/inverted_series_test.go new file mode 100644 index 00000000..64daaaa2 --- /dev/null +++ b/pkg/index/inverted/inverted_series_test.go @@ -0,0 +1,214 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inverted + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +func TestStore_Search(t *testing.T) { + tester := assert.New(t) + path, fn := setUp(require.New(t)) + s, err := NewStore(StoreOpts{ + Path: path, + Logger: logger.GetLogger("test"), + }) + tester.NoError(err) + defer func() { + tester.NoError(s.Close()) + fn() + }() + + // Setup some data + setupData(tester, s) + + // Test cases + tests := []struct { + term []byte + want common.SeriesID + }{ + { + term: []byte("test1"), + want: common.SeriesID(1), + }, + { + term: []byte("foo"), + want: common.SeriesID(0), + }, + } + + for _, tt := range tests { + t.Run(string(tt.term), func(t *testing.T) { + got, err := s.Search(tt.term) + tester.NoError(err) + tester.Equal(tt.want, got) + }) + } +} + +func TestStore_SearchWildcard(t *testing.T) { + tester := assert.New(t) + path, fn := setUp(require.New(t)) + s, err := NewStore(StoreOpts{ + Path: path, + Logger: logger.GetLogger("test"), + }) + tester.NoError(err) + defer func() { + tester.NoError(s.Close()) + fn() + }() + + // Setup some data + setupData(tester, s) + + // Test cases + tests := []struct { + wildcard []byte + want []index.Series + }{ + { + wildcard: []byte("test*"), + want: []index.Series{ + { + ID: common.SeriesID(1), + EntityValues: []byte("test1"), + }, + { + ID: common.SeriesID(2), + EntityValues: []byte("test2"), + }, + { + ID: common.SeriesID(3), + EntityValues: []byte("test3"), + }, + }, + }, + { + wildcard: []byte("*2"), + want: []index.Series{ + { + ID: common.SeriesID(2), + EntityValues: []byte("test2"), + }, + }, + }, + { + wildcard: []byte("t*st1"), + want: []index.Series{ + { + ID: common.SeriesID(1), + EntityValues: []byte("test1"), + }, + }, + }, + { + wildcard: []byte("foo*"), + want: []index.Series{}, + }, + } + + for _, tt := range tests { + t.Run(string(tt.wildcard), func(t *testing.T) { + got, err := s.SearchWildcard(tt.wildcard) + tester.NoError(err) + tester.ElementsMatch(tt.want, got) + }) + } +} + +func TestStore_SearchPrefix(t *testing.T) { + tester := assert.New(t) + path, fn := setUp(require.New(t)) + s, err := NewStore(StoreOpts{ + Path: path, + Logger: logger.GetLogger("test"), + }) + tester.NoError(err) + defer func() { + tester.NoError(s.Close()) + fn() + }() + + // Setup some data + setupData(tester, s) + + // Test cases + tests := []struct { + prefix []byte + want []index.Series + }{ + { + prefix: []byte("test"), + want: []index.Series{ + { + ID: common.SeriesID(1), + EntityValues: []byte("test1"), + }, + { + ID: common.SeriesID(2), + EntityValues: []byte("test2"), + }, + { + ID: common.SeriesID(3), + EntityValues: []byte("test3"), + }, + }, + }, + { + prefix: []byte("foo"), + want: []index.Series{}, + }, + } + + for _, tt := range tests { + t.Run(string(tt.prefix), func(t *testing.T) { + got, err := s.SearchPrefix(tt.prefix) + tester.NoError(err) + tester.ElementsMatch(tt.want, got) + }) + } +} + +func setupData(tester *assert.Assertions, s index.SeriesStore) { + series1 := index.Series{ + ID: common.SeriesID(1), + EntityValues: []byte("test1"), + } + tester.NoError(s.Create(series1)) + + series2 := index.Series{ + ID: common.SeriesID(2), + EntityValues: []byte("test2"), + } + tester.NoError(s.Create(series2)) + + series3 := index.Series{ + ID: common.SeriesID(3), + EntityValues: []byte("test3"), + } + tester.NoError(s.Create(series3)) + s.(*store).flush() +} diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go index b0ec2bb4..1e95a654 100644 --- a/pkg/index/lsm/lsm.go +++ b/pkg/index/lsm/lsm.go @@ -39,6 +39,11 @@ type store struct { closeOnce sync.Once } +// Batch implements index.Store. +func (*store) Batch(docs index.Documents) error { + panic("unimplemented") +} + func (s *store) Close() (err error) { s.closeOnce.Do(func() { s.closer.Done()
