This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch storage-column
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5df41fc942456848968f023379d893ea897e700b
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Nov 18 06:12:48 2023 +0000

    Add the series index and attach it to tsdb
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/internal/storage/index.go          | 239 +++++++++++++++++++++++++
 banyand/internal/storage/index_test.go     | 134 ++++++++++++++
 banyand/internal/storage/series.go         | 269 +++++++++++++++++++++++++++++
 banyand/internal/storage/series_test.go    | 178 +++++++++++++++++++
 banyand/internal/storage/storage.go        | 116 +++++++++++++
 banyand/internal/storage/tsdb.go           | 192 ++++++++++++++++++++
 pkg/convert/number.go                      |  27 +++
 pkg/fs/file_system.go                      |   2 +
 pkg/fs/local_file_system.go                |  55 +++++-
 pkg/index/index.go                         |  21 +++
 pkg/index/inverted/inverted.go             |  93 ++++++----
 pkg/index/inverted/inverted_series.go      | 129 ++++++++++++++
 pkg/index/inverted/inverted_series_test.go | 214 +++++++++++++++++++++++
 pkg/index/lsm/lsm.go                       |   5 +
 14 files changed, 1637 insertions(+), 37 deletions(-)

diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
new file mode 100644
index 00000000..f496f283
--- /dev/null
+++ b/banyand/internal/storage/index.go
@@ -0,0 +1,239 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "context"
+       "path"
+
+       "github.com/pkg/errors"
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+       "github.com/apache/skywalking-banyandb/pkg/index/posting"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// OrderBy specifies the order of the result.
+type OrderBy struct {
+       Index *databasev1.IndexRule
+       Sort  modelv1.Sort
+}
+
+type seriesIndex struct {
+       store    index.SeriesStore
+       l        *logger.Logger
+       position common.Position
+}
+
+func newSeriesIndex(ctx context.Context, root string) (*seriesIndex, error) {
+       si := &seriesIndex{
+               l: logger.Fetch(ctx, "series_index"),
+       }
+       var err error
+       if si.store, err = inverted.NewStore(inverted.StoreOpts{
+               Path:   path.Join(root, "idx"),
+               Logger: si.l,
+       }); err != nil {
+               return nil, err
+       }
+       return si, nil
+}
+
+var entityKey = index.FieldKey{}
+
+func (s *seriesIndex) createPrimary(series *Series) (*Series, error) {
+       if err := series.marshal(); err != nil {
+               return nil, err
+       }
+       id, err := s.store.Search(series.Buffer)
+       if err != nil {
+               return nil, err
+       }
+       if id > 0 {
+               return series, nil
+       }
+       evv := make([]byte, len(series.Buffer))
+       copy(evv, series.Buffer)
+       if err := s.store.Create(index.Series{
+               ID:           series.ID,
+               EntityValues: evv,
+       }); err != nil {
+               return nil, errors.WithMessagef(err, "create entity values -> 
seriesID: %s", string(evv))
+       }
+       return series, nil
+}
+
+func (s *seriesIndex) createSecondary(docs index.Documents) error {
+       return s.store.Batch(docs)
+}
+
+var rangeOpts = index.RangeOpts{}
+
+func (s *seriesIndex) SearchPrimary(ctx context.Context, series *Series) 
(SeriesList, error) {
+       var hasAny, hasWildcard bool
+       var prefixIndex int
+
+       for i, tv := range series.EntityValues {
+               if tv == nil {
+                       return nil, errors.New("nil tag value")
+               }
+               if tv.Value == AnyEntry {
+                       if !hasAny {
+                               hasAny = true
+                               prefixIndex = i
+                       }
+                       continue
+               }
+               if hasAny {
+                       hasWildcard = true
+                       break
+               }
+       }
+
+       var err error
+
+       if hasAny {
+               var ss []index.Series
+               if hasWildcard {
+                       if err = series.marshal(); err != nil {
+                               return nil, err
+                       }
+                       ss, err = s.store.SearchWildcard(series.Buffer)
+                       if err != nil {
+                               return nil, err
+                       }
+                       return convertIndexSeriesToSeriesList(ss)
+               }
+               series.EntityValues = series.EntityValues[:prefixIndex]
+               if err = series.marshal(); err != nil {
+                       return nil, err
+               }
+               ss, err = s.store.SearchPrefix(series.Buffer)
+               if err != nil {
+                       return nil, err
+               }
+               return convertIndexSeriesToSeriesList(ss)
+       }
+       if err = series.marshal(); err != nil {
+               return nil, err
+       }
+       var seriesID common.SeriesID
+       seriesID, err = s.store.Search(series.Buffer)
+       if err != nil {
+               return nil, err
+       }
+       if seriesID > 0 {
+               series.ID = seriesID
+               return SeriesList{series}, nil
+       }
+       return nil, nil
+}
+
+func convertIndexSeriesToSeriesList(indexSeries []index.Series) (SeriesList, 
error) {
+       seriesList := make(SeriesList, 0, len(indexSeries))
+       for _, s := range indexSeries {
+               var series Series
+               series.ID = s.ID
+               if err := series.unmarshal(s.EntityValues); err != nil {
+                       return nil, err
+               }
+               seriesList = append(seriesList, &series)
+       }
+       return seriesList, nil
+}
+
+func (s *seriesIndex) SearchSecondary(ctx context.Context, series *Series, 
filter index.Filter, order *OrderBy) (SeriesList, error) {
+       seriesList, err := s.SearchPrimary(ctx, series)
+       if err != nil {
+               return nil, err
+       }
+
+       pl := seriesList.toList()
+       if filter != nil {
+               var plFilter posting.List
+               plFilter, err = filter.Execute(func(ruleType 
databasev1.IndexRule_Type) (index.Searcher, error) {
+                       return s.store, nil
+               }, 0)
+               if err != nil {
+                       return nil, err
+               }
+               if err = pl.Intersect(plFilter); err != nil {
+                       return nil, err
+               }
+       }
+
+       if order == nil {
+               return filterSeriesList(seriesList, pl), nil
+       }
+
+       fieldKey := index.FieldKey{
+               IndexRuleID: order.Index.GetMetadata().Id,
+       }
+       iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort)
+       if err != nil {
+               return nil, err
+       }
+       defer func() {
+               err = multierr.Append(err, iter.Close())
+       }()
+
+       var sortedSeriesList SeriesList
+       for iter.Next() {
+               pv := iter.Val().Value
+               if err = pv.Intersect(pl); err != nil {
+                       return nil, err
+               }
+               if pv.IsEmpty() {
+                       continue
+               }
+               sortedSeriesList = appendSeriesList(sortedSeriesList, 
seriesList, pv)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       return sortedSeriesList, err
+}
+
+func filterSeriesList(seriesList SeriesList, filter posting.List) SeriesList {
+       for i := 0; i < len(seriesList); i++ {
+               if !filter.Contains(uint64(seriesList[i].ID)) {
+                       seriesList = append(seriesList[:i], seriesList[i+1:]...)
+                       i--
+               }
+       }
+       return seriesList
+}
+
+func appendSeriesList(dest, src SeriesList, filter posting.List) SeriesList {
+       for i := 0; i < len(src); i++ {
+               if !filter.Contains(uint64(src[i].ID)) {
+                       continue
+               }
+               dest = append(dest, src[i])
+       }
+       return dest
+}
+
+func (s *seriesIndex) Close() error {
+       return s.store.Close()
+}
diff --git a/banyand/internal/storage/index_test.go 
b/banyand/internal/storage/index_test.go
new file mode 100644
index 00000000..3b400aca
--- /dev/null
+++ b/banyand/internal/storage/index_test.go
@@ -0,0 +1,134 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "context"
+       "fmt"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+)
+
+var testSeriesPool SeriesPool
+
+func TestSeriesIndex_Primary(t *testing.T) {
+       ctx := context.Background()
+       tester := assert.New(t)
+       path, fn := setUp(require.New(t))
+       si, err := newSeriesIndex(ctx, path)
+       tester.NoError(err)
+       defer func() {
+               tester.NoError(si.Close())
+               fn()
+       }()
+       for i := 0; i < 100; i++ {
+               series := testSeriesPool.Get()
+               series.Subject = "service_instance_latency"
+               series.EntityValues = []*modelv1.TagValue{
+                       {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
fmt.Sprintf("svc_%d", i)}}},
+                       {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
fmt.Sprintf("svc_%d_instance_%d", i, i)}}},
+               }
+
+               // Initialize test data
+               series, err = si.createPrimary(series)
+               if err != nil {
+                       t.Fatalf("Failed to create primary series: %v", err)
+               }
+               tester.True(series.ID > 0)
+               testSeriesPool.Put(series)
+       }
+       // Restart the index
+       tester.NoError(si.Close())
+       si, err = newSeriesIndex(ctx, path)
+       tester.NoError(err)
+       tests := []struct {
+               name         string
+               subject      string
+               entityValues []*modelv1.TagValue
+               expected     []*modelv1.TagValue
+       }{
+               {
+                       name:    "Search",
+                       subject: "service_instance_latency",
+                       entityValues: []*modelv1.TagValue{
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1"}}},
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1_instance_1"}}},
+                       },
+                       expected: []*modelv1.TagValue{
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1"}}},
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1_instance_1"}}},
+                       },
+               },
+               {
+                       name:    "Prefix",
+                       subject: "service_instance_latency",
+                       entityValues: []*modelv1.TagValue{
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1"}}},
+                               {Value: AnyEntry},
+                       },
+                       expected: []*modelv1.TagValue{
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1"}}},
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1_instance_1"}}},
+                       },
+               },
+               {
+                       name:    "Wildcard",
+                       subject: "service_instance_latency",
+                       entityValues: []*modelv1.TagValue{
+                               {Value: AnyEntry},
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1_instance_1"}}},
+                       },
+                       expected: []*modelv1.TagValue{
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1"}}},
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc_1_instance_1"}}},
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       seriesQuery := testSeriesPool.Get()
+                       defer testSeriesPool.Put(seriesQuery)
+                       seriesQuery.Subject = tt.subject
+                       seriesQuery.EntityValues = tt.entityValues
+                       sl, err := si.SearchPrimary(ctx, seriesQuery)
+                       tester.NoError(err)
+                       tester.Equal(1, len(sl))
+                       tester.Equal(tt.subject, sl[0].Subject)
+                       tester.Equal(tt.expected[0].GetStr().GetValue(), 
sl[0].EntityValues[0].GetStr().GetValue())
+                       tester.Equal(tt.expected[1].GetStr().GetValue(), 
sl[0].EntityValues[1].GetStr().GetValue())
+                       tester.True(sl[0].ID > 0)
+               })
+       }
+}
+
+func setUp(t *require.Assertions) (tempDir string, deferFunc func()) {
+       t.NoError(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       }))
+       tempDir, deferFunc = test.Space(t)
+       return tempDir, deferFunc
+}
diff --git a/banyand/internal/storage/series.go 
b/banyand/internal/storage/series.go
new file mode 100644
index 00000000..a5c81fd1
--- /dev/null
+++ b/banyand/internal/storage/series.go
@@ -0,0 +1,269 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "bytes"
+       "sort"
+       "sync"
+
+       "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/index/posting"
+       "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+)
+
+type Series struct {
+       Subject      string
+       EntityValues []*modelv1.TagValue
+       Buffer       []byte
+       ID           common.SeriesID
+}
+
+func (s *Series) marshal() error {
+       s.Buffer = marshalEntityValue(s.Buffer, 
convert.StringToBytes(s.Subject))
+       var err error
+       for _, tv := range s.EntityValues {
+               if s.Buffer, err = marshalTagValue(s.Buffer, tv); err != nil {
+                       return errors.WithMessage(err, "marshal subject and 
entity values")
+               }
+       }
+       s.ID = common.SeriesID(convert.Hash(s.Buffer))
+       return nil
+}
+
+func (s *Series) unmarshal(src []byte) error {
+       var err error
+       s.Buffer = s.Buffer[:0]
+       if s.Buffer, src, err = unmarshalEntityValue(s.Buffer, src); err != nil 
{
+               return errors.WithMessage(err, "unmarshal subject")
+       }
+       s.Subject = string(s.Buffer)
+       for len(src) > 0 {
+               s.Buffer = s.Buffer[:0]
+               var tv *modelv1.TagValue
+               if s.Buffer, src, tv, err = unmarshalTagValue(s.Buffer, src); 
err != nil {
+                       return errors.WithMessage(err, "unmarshal tag value")
+               }
+               s.EntityValues = append(s.EntityValues, tv)
+       }
+       return nil
+}
+
+func (s *Series) reset() {
+       s.ID = 0
+       s.Subject = ""
+       s.EntityValues = s.EntityValues[:0]
+       s.Buffer = s.Buffer[:0]
+}
+
+type SeriesPool struct {
+       pool sync.Pool
+}
+
+func (sp *SeriesPool) Get() *Series {
+       sv := sp.pool.Get()
+       if sv == nil {
+               return &Series{}
+       }
+       return sv.(*Series)
+}
+
+func (sp *SeriesPool) Put(s *Series) {
+       s.reset()
+       sp.pool.Put(s)
+}
+
+// AnyEntry is the `*` for a regular expression. It could match "any" Entry in 
an Entity.
+var AnyEntry = &modelv1.TagValue_Null{}
+
+const (
+       entityDelimiter = '|'
+       escape          = '\\'
+)
+
+var anyWildcard = []byte{'*'}
+
+func marshalEntityValue(dest, src []byte) []byte {
+       if src == nil {
+               dest = append(dest, entityDelimiter)
+               return dest
+       }
+       if bytes.IndexByte(src, entityDelimiter) < 0 && bytes.IndexByte(src, 
escape) < 0 {
+               dest = append(dest, src...)
+               dest = append(dest, entityDelimiter)
+               return dest
+       }
+       for _, b := range src {
+               if b == entityDelimiter || b == escape {
+                       dest = append(dest, escape)
+               }
+               dest = append(dest, b)
+       }
+       dest = append(dest, entityDelimiter)
+       return dest
+}
+
+func unmarshalEntityValue(dest, src []byte) ([]byte, []byte, error) {
+       if len(src) == 0 {
+               return nil, nil, errors.New("empty entity value")
+       }
+       if src[0] == entityDelimiter {
+               return dest, src[1:], nil
+       }
+       for len(src) > 0 {
+               if src[0] == escape {
+                       if len(src) < 2 {
+                               return nil, nil, errors.New("invalid escape 
character")
+                       }
+                       src = src[1:]
+                       dest = append(dest, src[0])
+               } else if src[0] == entityDelimiter {
+                       return dest, src[1:], nil
+               } else {
+                       dest = append(dest, src[0])
+               }
+               src = src[1:]
+       }
+       return nil, nil, errors.New("invalid entity value")
+}
+
+func marshalTagValue(dest []byte, tv *modelv1.TagValue) ([]byte, error) {
+       if tv.Value == AnyEntry {
+               dest = marshalEntityValue(dest, anyWildcard)
+               return dest, nil
+       }
+       switch tv.Value.(type) {
+       case *modelv1.TagValue_Str:
+               dest = append(dest, 0)
+               dest = marshalEntityValue(dest, 
convert.StringToBytes(tv.GetStr().Value))
+       case *modelv1.TagValue_Int:
+               dest = append(dest, 1)
+               dest = convert.AppendInt64ToBytes(dest, tv.GetInt().Value)
+               dest = marshalEntityValue(dest, nil)
+       case *modelv1.TagValue_BinaryData:
+               dest = append(dest, 2)
+               dest = marshalEntityValue(dest, tv.GetBinaryData())
+       default:
+               return nil, errors.New("unsupported tag value type: " + 
tv.String())
+       }
+       return dest, nil
+}
+
+func unmarshalTagValue(dest []byte, src []byte) ([]byte, []byte, 
*modelv1.TagValue, error) {
+       if len(src) == 0 {
+               return nil, nil, nil, errors.New("empty tag value")
+       }
+       var err error
+       switch src[0] {
+       case 0:
+               if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != 
nil {
+                       return nil, nil, nil, errors.WithMessage(err, 
"unmarshal string tag value")
+               }
+               return dest, src, &modelv1.TagValue{
+                       Value: &modelv1.TagValue_Str{
+                               Str: &modelv1.Str{
+                                       Value: string(dest),
+                               },
+                       },
+               }, nil
+       case 1:
+               if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != 
nil {
+                       return nil, nil, nil, errors.WithMessage(err, 
"unmarshal int tag value")
+               }
+               return dest, src, &modelv1.TagValue{
+                       Value: &modelv1.TagValue_Int{
+                               Int: &modelv1.Int{
+                                       Value: convert.BytesToInt64(dest),
+                               },
+                       },
+               }, nil
+       case 2:
+               if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != 
nil {
+                       return nil, nil, nil, errors.WithMessage(err, 
"unmarshal binary tag value")
+               }
+               data := make([]byte, len(dest))
+               copy(data, dest)
+               return dest, src, &modelv1.TagValue{
+                       Value: &modelv1.TagValue_BinaryData{
+                               BinaryData: data,
+                       },
+               }, nil
+       }
+       return nil, nil, nil, errors.New("unsupported tag value type")
+}
+
+// SeriesList is a collection of Series.
+type SeriesList []*Series
+
+func (a SeriesList) Len() int {
+       return len(a)
+}
+
+func (a SeriesList) Less(i, j int) bool {
+       return a[i].ID < a[j].ID
+}
+
+func (a SeriesList) Swap(i, j int) {
+       a[i], a[j] = a[j], a[i]
+}
+
+// Merge other SeriesList with this one to create a new SeriesList.
+func (a SeriesList) Merge(other SeriesList) SeriesList {
+       if len(other) == 0 {
+               return a
+       }
+       sort.Sort(other)
+       if len(a) == 0 {
+               return other
+       }
+       final := SeriesList{}
+       i := 0
+       j := 0
+       for i < len(a) && j < len(other) {
+               if a[i].ID < other[j].ID {
+                       final = append(final, a[i])
+                       i++
+               } else {
+                       // deduplication
+                       if a[i].ID == other[j].ID {
+                               i++
+                       }
+                       final = append(final, other[j])
+                       j++
+               }
+       }
+       for ; i < len(a); i++ {
+               final = append(final, a[i])
+       }
+       for ; j < len(other); j++ {
+               final = append(final, other[j])
+       }
+       return final
+}
+
+func (a SeriesList) toList() posting.List {
+       pl := roaring.NewPostingList()
+       for _, v := range a {
+               pl.Insert(uint64(v.ID))
+       }
+       return pl
+}
diff --git a/banyand/internal/storage/series_test.go 
b/banyand/internal/storage/series_test.go
new file mode 100644
index 00000000..6862a340
--- /dev/null
+++ b/banyand/internal/storage/series_test.go
@@ -0,0 +1,178 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+)
+
+func TestMarshalAndUnmarshalEntityValue(t *testing.T) {
+       tests := []struct {
+               name string
+               src  []byte
+       }{
+               {
+                       name: "plain text",
+                       src:  []byte("plainText"),
+               },
+               {
+                       name: "text with entityDelimiter",
+                       src:  []byte("text|with|delimiter"),
+               },
+               {
+                       name: "text with escape",
+                       src:  []byte("text\\with\\escape"),
+               },
+               {
+                       name: "text with both special characters",
+                       src:  []byte("text|with\\both"),
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       dest := []byte{}
+
+                       // Test marshalEntityValue
+                       marshaled := marshalEntityValue(dest, tt.src)
+
+                       // Add assertions
+                       assert.NotNil(t, marshaled)
+
+                       // Test unmarshalEntityValue
+                       unmarshaledDest, unmarshaledSrc, err := 
unmarshalEntityValue(dest, marshaled)
+
+                       // Add assertions
+                       assert.NoError(t, err)
+                       assert.NotNil(t, unmarshaledDest)
+                       assert.NotNil(t, unmarshaledSrc)
+
+                       // Check that unmarshaling the marshaled value gives 
the original value
+                       assert.Equal(t, tt.src, unmarshaledDest)
+               })
+       }
+}
+
+func TestMarshalAndUnmarshalTagValue(t *testing.T) {
+       tests := []struct {
+               src  *modelv1.TagValue
+               name string
+       }{
+               {
+                       name: "string value",
+                       src:  &modelv1.TagValue{Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "stringValue"}}},
+               },
+               {
+                       name: "int value",
+                       src:  &modelv1.TagValue{Value: 
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: 123}}},
+               },
+               {
+                       name: "binary data",
+                       src:  &modelv1.TagValue{Value: 
&modelv1.TagValue_BinaryData{BinaryData: []byte("binaryData")}},
+               },
+               {
+                       name: "unsupported type",
+                       src:  &modelv1.TagValue{Value: 
&modelv1.TagValue_Null{}},
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       dest := []byte{}
+                       // Test marshalTagValue
+                       dest, err := marshalTagValue(dest, tt.src)
+                       if tt.name == "unsupported type" {
+                               assert.Error(t, err)
+                               dest = dest[:0]
+                               _, _, _, err = unmarshalTagValue(dest, 
[]byte("unsupported type"))
+                               assert.Error(t, err)
+                               return
+                       }
+                       marshaled := make([]byte, len(dest))
+                       copy(marshaled, dest)
+
+                       // Add assertions
+                       assert.NoError(t, err)
+                       assert.True(t, len(marshaled) > 0)
+
+                       // Test unmarshalTagValue
+                       dest = dest[:0]
+                       dest, marshaled, unmarshaled, err := 
unmarshalTagValue(dest, marshaled)
+
+                       // Add assertions
+                       assert.NoError(t, err)
+                       assert.True(t, len(dest) > 0)
+                       assert.True(t, len(marshaled) == 0)
+                       assert.NotNil(t, unmarshaled)
+
+                       // Check that unmarshaling the marshaled value gives 
the original value
+                       assert.Equal(t, tt.src, unmarshaled)
+               })
+       }
+}
+
+func TestMarshalAndUnmarshalSeries(t *testing.T) {
+       tests := []struct {
+               src  *Series
+               name string
+       }{
+               {
+                       name: "series with entity values",
+                       src:  &Series{Subject: "subject", EntityValues: 
[]*modelv1.TagValue{{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"stringValue"}}}}},
+               },
+               {
+                       name: "series without entity values",
+                       src:  &Series{Subject: "subject", EntityValues: 
[]*modelv1.TagValue{}},
+               },
+               {
+                       name: "series with multiple entity values",
+                       src: &Series{Subject: "subject", EntityValues: 
[]*modelv1.TagValue{
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "stringValue"}}},
+                               {Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: 123}}},
+                       }},
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       // Test Series.Marshal
+                       err := tt.src.marshal()
+
+                       // Add assertions
+                       assert.NoError(t, err)
+                       assert.True(t, len(tt.src.Buffer) > 0)
+                       marshaled := make([]byte, len(tt.src.Buffer))
+                       copy(marshaled, tt.src.Buffer)
+
+                       // Test Series.Unmarshal
+                       tt.src.reset()
+                       err = tt.src.unmarshal(marshaled)
+
+                       // Add assertions
+                       assert.NoError(t, err)
+
+                       // Check that unmarshaling the marshaled value gives 
the original value
+                       assert.Equal(t, tt.src.Subject, tt.src.Subject)
+                       assert.Equal(t, tt.src.EntityValues, 
tt.src.EntityValues)
+               })
+       }
+}
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
new file mode 100644
index 00000000..ff0c151a
--- /dev/null
+++ b/banyand/internal/storage/storage.go
@@ -0,0 +1,116 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package tsdb implements a time-series-based storage engine.
+// It provides:
+//   - Partition data based on a time axis.
+//   - Sharding data based on a series id which represents a unique entity of 
stream/measure
+//   - Retrieving data based on index.Filter.
+//   - Cleaning expired data, or the data retention.
+package storage
+
+import (
+       "io"
+       "time"
+
+       "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+)
+
+const (
+       shardPathPrefix = "shard"
+       shardTemplate   = shardPathPrefix + "-%d"
+       metadataPath    = "metadata"
+       segTemplate     = "seg-%s"
+
+       hourFormat = "2006010215"
+       dayFormat  = "20060102"
+
+       dirPerm = 0o700
+)
+
+var (
+       // ErrUnknownShard indicates that the shard is not found.
+       ErrUnknownShard = errors.New("unknown shard")
+       errOpenDatabase = errors.New("fails to open the database")
+)
+
+// Supplier allows getting a tsdb's runtime.
+type Supplier interface {
+       SupplyTSDB() Database
+}
+
+// Database allows listing and getting shard details.
+type Database interface {
+       io.Closer
+       CreateShardsAndGetByID(id common.ShardID) (Shard, error)
+       Shards() []Shard
+       Shard(id common.ShardID) (Shard, error)
+}
+
+// Shard allows accessing data of tsdb.
+type Shard interface {
+       io.Closer
+       ID() common.ShardID
+       // Series() SeriesDatabase
+}
+
+// IntervalUnit denotes the unit of a time point.
+type IntervalUnit int
+
+// Available IntervalUnits. HOUR and DAY are adequate for the APM scenario.
+const (
+       HOUR IntervalUnit = iota
+       DAY
+)
+
+func (iu IntervalUnit) String() string {
+       switch iu {
+       case HOUR:
+               return "hour"
+       case DAY:
+               return "day"
+       }
+       panic("invalid interval unit")
+}
+
+// IntervalRule defines a length of two points in time.
+type IntervalRule struct {
+       Unit IntervalUnit
+       Num  int
+}
+
+func (ir IntervalRule) nextTime(current time.Time) time.Time {
+       switch ir.Unit {
+       case HOUR:
+               return current.Add(time.Hour * time.Duration(ir.Num))
+       case DAY:
+               return current.AddDate(0, 0, ir.Num)
+       }
+       panic("invalid interval unit")
+}
+
+func (ir IntervalRule) estimatedDuration() time.Duration {
+       switch ir.Unit {
+       case HOUR:
+               return time.Hour * time.Duration(ir.Num)
+       case DAY:
+               return 24 * time.Hour * time.Duration(ir.Num)
+       }
+       panic("invalid interval unit")
+}
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
new file mode 100644
index 00000000..aab9c8c1
--- /dev/null
+++ b/banyand/internal/storage/tsdb.go
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package tsdb implements a time-series-based storage engine.
+// It provides:
+//   - Partition data based on a time axis.
+//   - Sharding data based on a series id which represents a unique entity of 
stream/measure
+//   - Retrieving data based on index.Filter.
+//   - Cleaning expired data, or the data retention.
+package storage
+
+import (
+       "context"
+       "path/filepath"
+       "sync"
+       "sync/atomic"
+
+       "github.com/pkg/errors"
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// IndexGranularity denotes the granularity of the local index.
+type IndexGranularity int
+
+// The options of the local index granularity.
+const (
+       IndexGranularityBlock IndexGranularity = iota
+       IndexGranularitySeries
+)
+
+var _ Database = (*database)(nil)
+
+// DatabaseOpts wraps options to create a tsdb.
+type DatabaseOpts struct {
+       Location         string
+       SegmentInterval  IntervalRule
+       TTL              IntervalRule
+       IndexGranularity IndexGranularity
+       ShardNum         uint32
+}
+
+type (
+       // SegID is the kind of a segment.
+       SegID uint32
+)
+
+func GenerateSegID(unit IntervalUnit, suffix int) SegID {
+       return SegID(unit)<<31 | ((SegID(suffix) << 1) >> 1)
+}
+
+func parseSuffix(id SegID) int {
+       return int((id << 1) >> 1)
+}
+
+func segIDToBytes(id SegID) []byte {
+       return convert.Uint32ToBytes(uint32(id))
+}
+
+func readSegID(data []byte, offset int) (SegID, int) {
+       end := offset + 4
+       return SegID(convert.BytesToUint32(data[offset:end])), end
+}
+
+type database struct {
+       fileSystem  fs.FileSystem
+       logger      *logger.Logger
+       index       *seriesIndex
+       location    string
+       sLst        []Shard
+       segmentSize IntervalRule
+       ttl         IntervalRule
+       sync.RWMutex
+       shardNum           uint32
+       shardCreationState uint32
+}
+
+func (d *database) CreateShardsAndGetByID(id common.ShardID) (Shard, error) {
+       if atomic.LoadUint32(&d.shardCreationState) != 0 {
+               return d.shard(id)
+       }
+       d.Lock()
+       defer d.Unlock()
+       if atomic.LoadUint32(&d.shardCreationState) != 0 {
+               return d.shard(id)
+       }
+       loadedShardsNum := len(d.sLst)
+       if loadedShardsNum < int(d.shardNum) {
+               _, err := createDatabase(d, loadedShardsNum)
+               if err != nil {
+                       return nil, errors.WithMessage(err, "create the 
database failed")
+               }
+       }
+       atomic.StoreUint32(&d.shardCreationState, 1)
+       return d.shard(id)
+}
+
+func (d *database) Shards() []Shard {
+       d.RLock()
+       defer d.RUnlock()
+       return d.sLst
+}
+
+func (d *database) Shard(id common.ShardID) (Shard, error) {
+       d.RLock()
+       defer d.RUnlock()
+       return d.shard(id)
+}
+
+func (d *database) shard(id common.ShardID) (Shard, error) {
+       if uint(id) >= uint(len(d.sLst)) {
+               return nil, ErrUnknownShard
+       }
+       return d.sLst[id], nil
+}
+
+func (d *database) Close() error {
+       d.Lock()
+       defer d.Unlock()
+       var err error
+       for _, s := range d.sLst {
+               innerErr := s.Close()
+               if innerErr != nil {
+                       err = multierr.Append(err, innerErr)
+               }
+       }
+       return err
+}
+
+// OpenDatabase returns a new tsdb runtime. This constructor will create a new 
database if it's absent,
+// or load an existing one.
+func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
+       if opts.SegmentInterval.Num == 0 {
+               return nil, errors.Wrap(errOpenDatabase, "segment interval is 
absent")
+       }
+       if opts.TTL.Num == 0 {
+               return nil, errors.Wrap(errOpenDatabase, "ttl is absent")
+       }
+       p := common.GetPosition(ctx)
+       l := logger.Fetch(ctx, p.Database)
+       fileSystem := fs.NewLocalFileSystemWithLogger(l)
+       path := filepath.Clean(opts.Location)
+       fileSystem.Mkdir(path, dirPerm)
+       si, err := newSeriesIndex(ctx, path)
+       if err != nil {
+               return nil, errors.Wrap(errOpenDatabase, 
errors.WithMessage(err, "create series index failed").Error())
+       }
+       db := &database{
+               location:    path,
+               shardNum:    opts.ShardNum,
+               logger:      logger.Fetch(ctx, p.Database),
+               segmentSize: opts.SegmentInterval,
+               ttl:         opts.TTL,
+               fileSystem:  fileSystem,
+               index:       si,
+       }
+       db.logger.Info().Str("path", opts.Location).Msg("initialized")
+       return db, nil
+}
+
+func createDatabase(db *database, startID int) (Database, error) {
+       var err error
+       for i := startID; i < int(db.shardNum); i++ {
+               db.logger.Info().Int("shard_id", i).Msg("creating a shard")
+               // so, errNewShard := OpenShard(common.ShardID(i),
+               //      db.location, db.segmentSize, db.blockSize, db.ttl, 
defaultBlockQueueSize, defaultMaxBlockQueueSize, db.enableWAL)
+               // if errNewShard != nil {
+               //      err = multierr.Append(err, errNewShard)
+               //      continue
+               // }
+               // db.sLst = append(db.sLst, so)
+       }
+       return db, err
+}
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index 0074c588..55e8fbc2 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -29,6 +29,19 @@ func Uint64ToBytes(u uint64) []byte {
        return bs
 }
 
+func AppendUint64ToBytes(dest []byte, u uint64) []byte {
+       // Use BigEndian to marshal uint64.
+       return append(dest,
+               byte(u>>56),
+               byte(u>>48),
+               byte(u>>40),
+               byte(u>>32),
+               byte(u>>24),
+               byte(u>>16),
+               byte(u>>8),
+               byte(u))
+}
+
 // Int64ToBytes converts int64 to bytes.
 func Int64ToBytes(i int64) []byte {
        abs := i
@@ -44,6 +57,20 @@ func Int64ToBytes(i int64) []byte {
        return Uint64ToBytes(u)
 }
 
+func AppendInt64ToBytes(dest []byte, i int64) []byte {
+       abs := i
+       if i < 0 {
+               abs = -abs
+       }
+       u := uint64(abs)
+       if i >= 0 {
+               u |= 1 << 63
+       } else {
+               u = 1<<63 - u
+       }
+       return AppendUint64ToBytes(dest, u)
+}
+
 // Uint32ToBytes converts uint32 to bytes.
 func Uint32ToBytes(u uint32) []byte {
        bs := make([]byte, 4)
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 23f91d83..8d028fbf 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -54,6 +54,8 @@ type File interface {
 
 // FileSystem operation interface.
 type FileSystem interface {
+       // Mkdir creates a new directory with the specified name and permission.
+       Mkdir(path string, permission Mode)
        // Create and open the file by specified name and mode.
        CreateFile(name string, permission Mode) (File, error)
        // Flush mode, which flushes all data to one file.
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 76df0b1f..fdba4e88 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -24,12 +24,13 @@ import (
        "fmt"
        "io"
        "os"
+       "path/filepath"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-// LocalFileSystem implements the File System interface.
-type LocalFileSystem struct {
+// localFileSystem implements the File System interface.
+type localFileSystem struct {
        logger *logger.Logger
 }
 
@@ -40,11 +41,17 @@ type LocalFile struct {
 
 // NewLocalFileSystem is used to create the Local File system.
 func NewLocalFileSystem() FileSystem {
-       return &LocalFileSystem{
+       return &localFileSystem{
                logger: logger.GetLogger(moduleName),
        }
 }
 
+func NewLocalFileSystemWithLogger(parent *logger.Logger) FileSystem {
+       return &localFileSystem{
+               logger: parent.Named(moduleName),
+       }
+}
+
 func readErrorHandle(operation string, err error, name string, size int) (int, 
error) {
        switch {
        case errors.Is(err, io.EOF):
@@ -67,8 +74,44 @@ func readErrorHandle(operation string, err error, name 
string, size int) (int, e
        }
 }
 
+// Mkdir implements FileSystem.
+func (fs *localFileSystem) Mkdir(path string, permission Mode) {
+       if fs.pathExist(path) {
+               return
+       }
+       if err := os.MkdirAll(path, 0o755); err != nil {
+               fs.logger.Panic().Str("path", path).Err(err).Msg("failed to 
create directory")
+       }
+       parentDirPath := filepath.Dir(path)
+       fs.syncPath(parentDirPath)
+}
+
+func (fs *localFileSystem) pathExist(path string) bool {
+       if _, err := os.Stat(path); err != nil {
+               if os.IsNotExist(err) {
+                       return false
+               }
+               fs.logger.Panic().Str("path", path).Err(err).Msg("failed to 
stat path")
+       }
+       return true
+}
+
+func (fs *localFileSystem) syncPath(path string) {
+       d, err := os.Open(path)
+       if err != nil {
+               fs.logger.Panic().Str("path", path).Err(err).Msg("failed to 
open directory")
+       }
+       if err := d.Sync(); err != nil {
+               _ = d.Close()
+               fs.logger.Panic().Str("path", path).Err(err).Msg("failed to 
sync directory")
+       }
+       if err := d.Close(); err != nil {
+               fs.logger.Panic().Str("path", path).Err(err).Msg("ailed to sync 
directory")
+       }
+}
+
 // CreateFile is used to create and open the file by specified name and mode.
-func (fs *LocalFileSystem) CreateFile(name string, permission Mode) (File, 
error) {
+func (fs *localFileSystem) CreateFile(name string, permission Mode) (File, 
error) {
        file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
os.FileMode(permission))
        switch {
        case err == nil:
@@ -94,7 +137,7 @@ func (fs *LocalFileSystem) CreateFile(name string, 
permission Mode) (File, error
 }
 
 // Write flushes all data to one file.
-func (fs *LocalFileSystem) Write(buffer []byte, name string, permission Mode) 
(int, error) {
+func (fs *localFileSystem) Write(buffer []byte, name string, permission Mode) 
(int, error) {
        file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
os.FileMode(permission))
        if err != nil {
                switch {
@@ -129,7 +172,7 @@ func (fs *LocalFileSystem) Write(buffer []byte, name 
string, permission Mode) (i
 }
 
 // DeleteFile is used to delete the file.
-func (fs *LocalFileSystem) DeleteFile(name string) error {
+func (fs *localFileSystem) DeleteFile(name string) error {
        err := os.Remove(name)
        switch {
        case err == nil:
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 1667d85b..27eacb88 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -191,9 +191,17 @@ type PostingValue struct {
        Term  []byte
 }
 
+type Document struct {
+       Fields []Field
+       DocID  uint64
+}
+
+type Documents []Document
+
 // Writer allows writing fields and docID in a document to a index.
 type Writer interface {
        Write(fields []Field, docID uint64) error
+       Batch(docs Documents) error
 }
 
 // FieldIterable allows building a FieldIterator.
@@ -218,6 +226,19 @@ type Store interface {
        SizeOnDisk() int64
 }
 
+type Series struct {
+       ID           common.SeriesID
+       EntityValues []byte
+}
+
+type SeriesStore interface {
+       Store
+       Create(Series) error
+       Search([]byte) (common.SeriesID, error)
+       SearchPrefix([]byte) ([]Series, error)
+       SearchWildcard([]byte) ([]Series, error)
+}
+
 // GetSearcher returns a searcher associated with input index rule type.
 type GetSearcher func(location databasev1.IndexRule_Type) (Searcher, error)
 
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 2437a627..f40283b5 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -50,6 +50,7 @@ const (
        docIDField    = "_id"
        batchSize     = 1024
        seriesIDField = "series_id"
+       entityField   = "entity"
        idField       = "id"
 )
 
@@ -77,11 +78,6 @@ type StoreOpts struct {
        BatchWaitSec int64
 }
 
-type doc struct {
-       fields []index.Field
-       docID  uint64
-}
-
 type flushEvent struct {
        onComplete chan struct{}
 }
@@ -95,8 +91,20 @@ type store struct {
        batchInterval time.Duration
 }
 
+func (s *store) Batch(docs index.Documents) error {
+       if !s.closer.AddRunning() {
+               return nil
+       }
+       defer s.closer.Done()
+       select {
+       case <-s.closer.CloseNotify():
+       case s.ch <- docs:
+       }
+       return nil
+}
+
 // NewStore create a new inverted index repository.
-func NewStore(opts StoreOpts) (index.Store, error) {
+func NewStore(opts StoreOpts) (index.SeriesStore, error) {
        indexConfig := blugeIndex.DefaultConfig(opts.Path)
        indexConfig.MergePlanOptions.MaxSegmentsPerTier = 1
        indexConfig.MergePlanOptions.MaxSegmentSize = 500000
@@ -141,9 +149,9 @@ func (s *store) Write(fields []index.Field, docID uint64) 
error {
        defer s.closer.Done()
        select {
        case <-s.closer.CloseNotify():
-       case s.ch <- doc{
-               fields: fields,
-               docID:  docID,
+       case s.ch <- index.Document{
+               Fields: fields,
+               DocID:  docID,
        }:
        }
        return nil
@@ -315,6 +323,7 @@ func (s *store) run() {
                }
                defer flush()
                var docIDBuffer bytes.Buffer
+               seriesIDBuffer := make([]byte, 8)
                for {
                        timer := time.NewTimer(s.batchInterval)
                        select {
@@ -330,33 +339,55 @@ func (s *store) run() {
                                case flushEvent:
                                        flush()
                                        close(d.onComplete)
-                               case doc:
-                                       // TODO: generate a segment directly.
-                                       fk := d.fields[0].Key
-                                       docIDBuffer.Reset()
-                                       if fk.HasSeriesID() {
-                                               
docIDBuffer.Write(fk.SeriesID.Marshal())
-                                       }
-                                       
docIDBuffer.Write(convert.Uint64ToBytes(d.docID))
-                                       doc := 
bluge.NewDocument(docIDBuffer.String())
-                                       toAddSeriesIDField := false
-                                       for _, f := range d.fields {
-                                               if f.Key.Analyzer == 
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
-                                                       
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), 
f.Marshal()).StoreValue().Sortable())
-                                               } else {
-                                                       toAddSeriesIDField = 
true
-                                                       
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), 
f.Term).StoreValue().Sortable().
-                                                               
WithAnalyzer(analyzers[f.Key.Analyzer]))
-                                               }
-                                       }
-                                       if toAddSeriesIDField && 
fk.HasSeriesID() {
-                                               
doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField, fk.SeriesID.Marshal()))
-                                       }
+                               case index.Series:
+                                       seriesIDBuffer = seriesIDBuffer[:0]
+                                       seriesIDBuffer = 
convert.AppendUint64ToBytes(seriesIDBuffer, uint64(d.ID))
+                                       doc := 
bluge.NewDocument(string(seriesIDBuffer))
+                                       
doc.AddField(bluge.NewKeywordFieldBytes(entityField, 
d.EntityValues).StoreValue())
                                        size++
                                        batch.Update(doc.ID(), doc)
                                        if size >= batchSize {
                                                flush()
                                        }
+                               case index.Document, index.Documents:
+                                       var docs []index.Document
+                                       var isBatch bool
+                                       switch v := d.(type) {
+                                       case index.Document:
+                                               docs = []index.Document{v}
+                                       case index.Documents:
+                                               docs = v
+                                               isBatch = true
+                                       }
+
+                                       for _, d := range docs {
+                                               // TODO: generate a segment 
directly.
+                                               fk := d.Fields[0].Key
+                                               docIDBuffer.Reset()
+                                               if fk.HasSeriesID() {
+                                                       
docIDBuffer.Write(fk.SeriesID.Marshal())
+                                               }
+                                               
docIDBuffer.Write(convert.Uint64ToBytes(d.DocID))
+                                               doc := 
bluge.NewDocument(docIDBuffer.String())
+                                               toAddSeriesIDField := false
+                                               for _, f := range d.Fields {
+                                                       if f.Key.Analyzer == 
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
+                                                               
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), 
f.Marshal()).StoreValue().Sortable())
+                                                       } else {
+                                                               
toAddSeriesIDField = true
+                                                               
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), 
f.Term).StoreValue().Sortable().
+                                                                       
WithAnalyzer(analyzers[f.Key.Analyzer]))
+                                                       }
+                                               }
+                                               if toAddSeriesIDField && 
fk.HasSeriesID() {
+                                                       
doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField, fk.SeriesID.Marshal()))
+                                               }
+                                               size++
+                                               batch.Update(doc.ID(), doc)
+                                       }
+                                       if isBatch || size >= batchSize {
+                                               flush()
+                                       }
                                }
                        case <-timer.C:
                                flush()
diff --git a/pkg/index/inverted/inverted_series.go 
b/pkg/index/inverted/inverted_series.go
new file mode 100644
index 00000000..f3a477ed
--- /dev/null
+++ b/pkg/index/inverted/inverted_series.go
@@ -0,0 +1,129 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package inverted implements a inverted index repository.
+package inverted
+
+import (
+       "context"
+
+       "github.com/blugelabs/bluge"
+       "github.com/blugelabs/bluge/search"
+       "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+)
+
+// Create implements index.SeriesStore.
+func (s *store) Create(series index.Series) error {
+       if !s.closer.AddRunning() {
+               return nil
+       }
+       defer s.closer.Done()
+       select {
+       case <-s.closer.CloseNotify():
+       case s.ch <- series:
+       }
+       return nil
+}
+
+// Search implements index.SeriesStore.
+func (s *store) Search(term []byte) (common.SeriesID, error) {
+       reader, err := s.writer.Reader()
+       if err != nil {
+               return 0, err
+       }
+       query := 
bluge.NewTermQuery(convert.BytesToString(term)).SetField(entityField)
+       dmi, err := reader.Search(context.Background(), 
bluge.NewAllMatches(query))
+       if err != nil {
+               return 0, err
+       }
+       var result common.SeriesID
+       next, err := dmi.Next()
+       if err == nil && next != nil {
+               err = next.VisitStoredFields(func(field string, value []byte) 
bool {
+                       if field == docIDField {
+                               result = 
common.SeriesID(convert.BytesToUint64(value))
+                               return false
+                       }
+                       return true
+               })
+               if err != nil {
+                       return 0, errors.WithMessage(err, "visit stored fields")
+               }
+       }
+       if err != nil {
+               return 0, errors.WithMessage(err, "iterate document match 
iterator")
+       }
+       return result, nil
+}
+
+// SearchPrefix implements index.SeriesStore.
+func (s *store) SearchPrefix(prefix []byte) ([]index.Series, error) {
+       reader, err := s.writer.Reader()
+       if err != nil {
+               return nil, err
+       }
+       query := 
bluge.NewPrefixQuery(convert.BytesToString(prefix)).SetField(entityField)
+       dmi, err := reader.Search(context.Background(), 
bluge.NewAllMatches(query))
+       if err != nil {
+               return nil, err
+       }
+       return parseResult(dmi)
+}
+
+// SearchWildcard implements index.SeriesStore.
+func (s *store) SearchWildcard(wildcard []byte) ([]index.Series, error) {
+       reader, err := s.writer.Reader()
+       if err != nil {
+               return nil, err
+       }
+       query := 
bluge.NewWildcardQuery(convert.BytesToString(wildcard)).SetField(entityField)
+       dmi, err := reader.Search(context.Background(), 
bluge.NewAllMatches(query))
+       if err != nil {
+               return nil, err
+       }
+       return parseResult(dmi)
+}
+
+func parseResult(dmi search.DocumentMatchIterator) ([]index.Series, error) {
+       result := make([]index.Series, 0, 10)
+       next, err := dmi.Next()
+       for err == nil && next != nil {
+               var series index.Series
+               err = next.VisitStoredFields(func(field string, value []byte) 
bool {
+                       if field == docIDField {
+                               series.ID = 
common.SeriesID(convert.BytesToUint64(value))
+                       }
+                       if field == entityField {
+                               series.EntityValues = value
+                       }
+                       return true
+               })
+               if err != nil {
+                       return nil, errors.WithMessage(err, "visit stored 
fields")
+               }
+               result = append(result, series)
+               next, err = dmi.Next()
+       }
+       if err != nil {
+               return nil, errors.WithMessage(err, "iterate document match 
iterator")
+       }
+       return result, nil
+}
diff --git a/pkg/index/inverted/inverted_series_test.go 
b/pkg/index/inverted/inverted_series_test.go
new file mode 100644
index 00000000..64daaaa2
--- /dev/null
+++ b/pkg/index/inverted/inverted_series_test.go
@@ -0,0 +1,214 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package inverted
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestStore_Search(t *testing.T) {
+       tester := assert.New(t)
+       path, fn := setUp(require.New(t))
+       s, err := NewStore(StoreOpts{
+               Path:   path,
+               Logger: logger.GetLogger("test"),
+       })
+       tester.NoError(err)
+       defer func() {
+               tester.NoError(s.Close())
+               fn()
+       }()
+
+       // Setup some data
+       setupData(tester, s)
+
+       // Test cases
+       tests := []struct {
+               term []byte
+               want common.SeriesID
+       }{
+               {
+                       term: []byte("test1"),
+                       want: common.SeriesID(1),
+               },
+               {
+                       term: []byte("foo"),
+                       want: common.SeriesID(0),
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(string(tt.term), func(t *testing.T) {
+                       got, err := s.Search(tt.term)
+                       tester.NoError(err)
+                       tester.Equal(tt.want, got)
+               })
+       }
+}
+
+func TestStore_SearchWildcard(t *testing.T) {
+       tester := assert.New(t)
+       path, fn := setUp(require.New(t))
+       s, err := NewStore(StoreOpts{
+               Path:   path,
+               Logger: logger.GetLogger("test"),
+       })
+       tester.NoError(err)
+       defer func() {
+               tester.NoError(s.Close())
+               fn()
+       }()
+
+       // Setup some data
+       setupData(tester, s)
+
+       // Test cases
+       tests := []struct {
+               wildcard []byte
+               want     []index.Series
+       }{
+               {
+                       wildcard: []byte("test*"),
+                       want: []index.Series{
+                               {
+                                       ID:           common.SeriesID(1),
+                                       EntityValues: []byte("test1"),
+                               },
+                               {
+                                       ID:           common.SeriesID(2),
+                                       EntityValues: []byte("test2"),
+                               },
+                               {
+                                       ID:           common.SeriesID(3),
+                                       EntityValues: []byte("test3"),
+                               },
+                       },
+               },
+               {
+                       wildcard: []byte("*2"),
+                       want: []index.Series{
+                               {
+                                       ID:           common.SeriesID(2),
+                                       EntityValues: []byte("test2"),
+                               },
+                       },
+               },
+               {
+                       wildcard: []byte("t*st1"),
+                       want: []index.Series{
+                               {
+                                       ID:           common.SeriesID(1),
+                                       EntityValues: []byte("test1"),
+                               },
+                       },
+               },
+               {
+                       wildcard: []byte("foo*"),
+                       want:     []index.Series{},
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(string(tt.wildcard), func(t *testing.T) {
+                       got, err := s.SearchWildcard(tt.wildcard)
+                       tester.NoError(err)
+                       tester.ElementsMatch(tt.want, got)
+               })
+       }
+}
+
+func TestStore_SearchPrefix(t *testing.T) {
+       tester := assert.New(t)
+       path, fn := setUp(require.New(t))
+       s, err := NewStore(StoreOpts{
+               Path:   path,
+               Logger: logger.GetLogger("test"),
+       })
+       tester.NoError(err)
+       defer func() {
+               tester.NoError(s.Close())
+               fn()
+       }()
+
+       // Setup some data
+       setupData(tester, s)
+
+       // Test cases
+       tests := []struct {
+               prefix []byte
+               want   []index.Series
+       }{
+               {
+                       prefix: []byte("test"),
+                       want: []index.Series{
+                               {
+                                       ID:           common.SeriesID(1),
+                                       EntityValues: []byte("test1"),
+                               },
+                               {
+                                       ID:           common.SeriesID(2),
+                                       EntityValues: []byte("test2"),
+                               },
+                               {
+                                       ID:           common.SeriesID(3),
+                                       EntityValues: []byte("test3"),
+                               },
+                       },
+               },
+               {
+                       prefix: []byte("foo"),
+                       want:   []index.Series{},
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(string(tt.prefix), func(t *testing.T) {
+                       got, err := s.SearchPrefix(tt.prefix)
+                       tester.NoError(err)
+                       tester.ElementsMatch(tt.want, got)
+               })
+       }
+}
+
+func setupData(tester *assert.Assertions, s index.SeriesStore) {
+       series1 := index.Series{
+               ID:           common.SeriesID(1),
+               EntityValues: []byte("test1"),
+       }
+       tester.NoError(s.Create(series1))
+
+       series2 := index.Series{
+               ID:           common.SeriesID(2),
+               EntityValues: []byte("test2"),
+       }
+       tester.NoError(s.Create(series2))
+
+       series3 := index.Series{
+               ID:           common.SeriesID(3),
+               EntityValues: []byte("test3"),
+       }
+       tester.NoError(s.Create(series3))
+       s.(*store).flush()
+}
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index b0ec2bb4..1e95a654 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -39,6 +39,11 @@ type store struct {
        closeOnce sync.Once
 }
 
+// Batch implements index.Store.
+func (*store) Batch(docs index.Documents) error {
+       panic("unimplemented")
+}
+
 func (s *store) Close() (err error) {
        s.closeOnce.Do(func() {
                s.closer.Done()

Reply via email to