This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/time-series by this push:
     new 14c6087  Implement series database
14c6087 is described below

commit 14c60870bd0bf675d951b554d94bd94a74f20a2c
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Aug 30 21:39:53 2021 +0800

    Implement series database
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/kv/badger.go          |  14 ++--
 banyand/tsdb/series.go        |  14 ++++
 banyand/tsdb/seriesdb.go      | 170 +++++++++++++++++++++++++++++++++++++++---
 banyand/tsdb/seriesdb_test.go | 137 ++++++++++++++++++++++++++++++++++
 banyand/tsdb/shard.go         |  27 +++----
 banyand/tsdb/tsdb.go          |  13 +++-
 banyand/tsdb/tsdb_test.go     |   4 +-
 7 files changed, 341 insertions(+), 38 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 64c9f29..8a35c29 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -33,11 +33,12 @@ import (
 )
 
 var (
-       _             Store           = (*badgerDB)(nil)
-       _             IndexStore      = (*badgerDB)(nil)
-       _             y.Iterator      = (*mergedIter)(nil)
-       _             TimeSeriesStore = (*badgerTSS)(nil)
-       bitMergeEntry byte            = 1 << 3
+       _              Store           = (*badgerDB)(nil)
+       _              IndexStore      = (*badgerDB)(nil)
+       _              y.Iterator      = (*mergedIter)(nil)
+       _              TimeSeriesStore = (*badgerTSS)(nil)
+       bitMergeEntry  byte            = 1 << 3
+       ErrKeyNotFound                 = badger.ErrKeyNotFound
 )
 
 type badgerTSS struct {
@@ -184,6 +185,9 @@ func (b *badgerDB) Put(key, val []byte) error {
 
 func (b *badgerDB) Get(key []byte) ([]byte, error) {
        v, err := b.db.Get(y.KeyWithTs(key, math.MaxInt64))
+       if err == badger.ErrKeyNotFound {
+               return nil, ErrKeyNotFound
+       }
        if err != nil {
                return nil, err
        }
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 03a6e65..c8f2bc9 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -82,3 +82,17 @@ type SeekerBuilder interface {
 type Seeker interface {
        Seek() Iterator
 }
+
+var _ Series = (*series)(nil)
+
+type series struct {
+       id []byte
+}
+
+func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
+       panic("implement me")
+}
+
+func (s *series) Get(id ItemID) (Item, error) {
+       panic("implement me")
+}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 727a6af..6075b46 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -17,30 +17,178 @@
 
 package tsdb
 
-import "bytes"
+import (
+       "bytes"
+       "context"
+       "io"
+       "math"
+       "sync"
+       "time"
 
-var AllEntry = Entry("*")
+       "go.uber.org/multierr"
 
-type Entity [][]byte
+       "github.com/apache/skywalking-banyandb/banyand/kv"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
 
-type Path struct {
-       prefix      []byte
-       suffixStack []Entry
-}
+var maxIntBytes = convert.Uint64ToBytes(math.MaxUint64)
+var zeroIntBytes = convert.Uint64ToBytes(0)
+
+var AnyEntry = Entry(nil)
 
 type Entry []byte
 
-func (e Entry) Equal(another Entry) bool {
-       return bytes.Equal(e, another)
+type Entity []Entry
+
+type Path struct {
+       prefix   []byte
+       mask     []byte
+       template []byte
+       isFull   bool
 }
 
-func NewPath(entries []Entry) {
-       if entries[0].Equal(AllEntry) {
+func NewPath(entries []Entry) *Path {
+       p := &Path{
+               mask:     make([]byte, 0),
+               template: make([]byte, 0),
+       }
 
+       var offset int
+       var encounterAny bool
+       for _, e := range entries {
+               if e == nil {
+                       encounterAny = true
+                       p.mask = append(p.mask, zeroIntBytes...)
+                       p.template = append(p.template, zeroIntBytes...)
+                       continue
+               }
+               entry := hash(e)
+               if !encounterAny {
+                       offset += 8
+               }
+               p.mask = append(p.mask, maxIntBytes...)
+               p.template = append(p.template, entry...)
+       }
+       if !encounterAny {
+               p.isFull = true
        }
+       p.prefix = p.template[:offset]
+       return p
 }
 
 type SeriesDatabase interface {
+       io.Closer
        Create(entity Entity) error
        List(path Path) ([]Series, error)
 }
+
+var _ SeriesDatabase = (*seriesDB)(nil)
+
+type seriesDB struct {
+       sync.Mutex
+       l *logger.Logger
+
+       lst            []*segment
+       seriesMetadata kv.Store
+}
+
+func (s *seriesDB) Close() error {
+       for _, seg := range s.lst {
+               seg.close()
+       }
+       return s.seriesMetadata.Close()
+}
+
+func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, 
error) {
+       sdb := &seriesDB{}
+       parentLogger := ctx.Value(logger.ContextKey)
+       if parentLogger != nil {
+               if pl, ok := parentLogger.(*logger.Logger); ok {
+                       sdb.l = pl.Named("series")
+               }
+       }
+       var err error
+       sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", 
kv.StoreWithLogger(sdb.l))
+       if err != nil {
+               return nil, err
+       }
+       segPath, err := mkdir(segTemplate, path, time.Now().Format(segFormat))
+       if err != nil {
+               return nil, err
+       }
+       seg, err := newSegment(ctx, segPath)
+       if err != nil {
+               return nil, err
+       }
+       {
+               sdb.Lock()
+               defer sdb.Unlock()
+               sdb.lst = append(sdb.lst, seg)
+       }
+       return sdb, nil
+}
+
+func (s *seriesDB) Create(entity Entity) error {
+       key := hashEntity(entity)
+       _, err := s.seriesMetadata.Get(key)
+       if err != nil && err != kv.ErrKeyNotFound {
+               return err
+       }
+       if err == nil {
+               return nil
+       }
+       s.Lock()
+       defer s.Unlock()
+       err = s.seriesMetadata.Put(key, hash(key))
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func (s *seriesDB) List(path Path) ([]Series, error) {
+       if path.isFull {
+               id, err := s.seriesMetadata.Get(path.prefix)
+               if err != nil && err != kv.ErrKeyNotFound {
+                       return nil, err
+               }
+               if err == nil {
+                       return []Series{&series{id: id}}, nil
+               }
+               return nil, nil
+       }
+       result := make([]Series, 0)
+       var err error
+       errScan := s.seriesMetadata.Scan(path.prefix, kv.DefaultScanOpts, 
func(_ int, key []byte, getVal func() ([]byte, error)) error {
+               comparableKey := make([]byte, len(key))
+               for i, b := range key {
+                       comparableKey[i] = path.mask[i] & b
+               }
+               if bytes.Equal(path.template, comparableKey) {
+                       id, errGetVal := getVal()
+                       if errGetVal != nil {
+                               err = multierr.Append(err, errGetVal)
+                               return nil
+                       }
+                       result = append(result, &series{id: id})
+               }
+               return nil
+       })
+       if errScan != nil {
+               return nil, errScan
+       }
+       return result, err
+}
+
+func hashEntity(entity Entity) []byte {
+       result := make(Entry, 0, len(entity)*8)
+       for _, entry := range entity {
+               result = append(result, hash(entry)...)
+       }
+       return result
+}
+
+func hash(entry []byte) []byte {
+       return convert.Uint64ToBytes(convert.Hash(entry))
+}
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
new file mode 100644
index 0000000..deef085
--- /dev/null
+++ b/banyand/tsdb/seriesdb_test.go
@@ -0,0 +1,137 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+       "bytes"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func TestNewPath(t *testing.T) {
+       tester := assert.New(t)
+       tests := []struct {
+               name   string
+               entity Entity
+               want   *Path
+       }{
+               {
+                       name: "general path",
+                       entity: Entity{
+                               Entry("productpage"),
+                               Entry("10.0.0.1"),
+                               Entry(convert.Uint64ToBytes(0)),
+                       },
+                       want: &Path{
+                               isFull: true,
+                               prefix: bytes.Join([][]byte{
+                                       hash([]byte("productpage")),
+                                       hash([]byte("10.0.0.1")),
+                                       hash(convert.Uint64ToBytes(0)),
+                               }, nil),
+                               template: bytes.Join([][]byte{
+                                       hash([]byte("productpage")),
+                                       hash([]byte("10.0.0.1")),
+                                       hash(convert.Uint64ToBytes(0)),
+                               }, nil),
+                               mask: bytes.Join([][]byte{
+                                       maxIntBytes,
+                                       maxIntBytes,
+                                       maxIntBytes,
+                               }, nil),
+                       },
+               },
+               {
+                       name: "the first is anyone",
+                       entity: Entity{
+                               AnyEntry,
+                               Entry("10.0.0.1"),
+                               Entry(convert.Uint64ToBytes(0)),
+                       },
+                       want: &Path{
+                               prefix: []byte{},
+                               template: bytes.Join([][]byte{
+                                       zeroIntBytes,
+                                       hash([]byte("10.0.0.1")),
+                                       hash(convert.Uint64ToBytes(0)),
+                               }, nil),
+                               mask: bytes.Join([][]byte{
+                                       zeroIntBytes,
+                                       maxIntBytes,
+                                       maxIntBytes,
+                               }, nil),
+                       },
+               },
+               {
+                       name: "the second is anyone",
+                       entity: Entity{
+                               Entry("productpage"),
+                               AnyEntry,
+                               Entry(convert.Uint64ToBytes(0)),
+                       },
+                       want: &Path{
+                               prefix: bytes.Join([][]byte{
+                                       hash([]byte("productpage")),
+                               }, nil),
+                               template: bytes.Join([][]byte{
+                                       hash([]byte("productpage")),
+                                       zeroIntBytes,
+                                       hash(convert.Uint64ToBytes(0)),
+                               }, nil),
+                               mask: bytes.Join([][]byte{
+                                       maxIntBytes,
+                                       zeroIntBytes,
+                                       maxIntBytes,
+                               }, nil),
+                       },
+               },
+               {
+                       name: "the last is anyone",
+                       entity: Entity{
+                               Entry("productpage"),
+                               Entry("10.0.0.1"),
+                               AnyEntry,
+                       },
+                       want: &Path{
+                               prefix: bytes.Join([][]byte{
+                                       hash([]byte("productpage")),
+                                       hash([]byte("10.0.0.1")),
+                               }, nil),
+                               template: bytes.Join([][]byte{
+                                       hash([]byte("productpage")),
+                                       hash([]byte("10.0.0.1")),
+                                       zeroIntBytes,
+                               }, nil),
+                               mask: bytes.Join([][]byte{
+                                       maxIntBytes,
+                                       maxIntBytes,
+                                       zeroIntBytes,
+                               }, nil),
+                       },
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got := NewPath(tt.entity)
+                       tester.Equal(tt.want, got)
+               })
+       }
+}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index ac58b9d..050444e 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -19,17 +19,15 @@ package tsdb
 
 import (
        "context"
-       "sync"
-       "time"
 )
 
 var _ Shard = (*shard)(nil)
 
 type shard struct {
-       id  int
-       lst []*segment
-       sync.Mutex
-       location string
+       id int
+
+       location       string
+       seriesDatabase SeriesDatabase
 }
 
 func (s *shard) Series() SeriesDatabase {
@@ -45,24 +43,19 @@ func newShard(ctx context.Context, id int, location string) 
(*shard, error) {
                id:       id,
                location: location,
        }
-       segPath, err := mkdir(segTemplate, s.location, 
time.Now().Format(segFormat))
+       seriesPath, err := mkdir(seriesTemplate, s.location)
        if err != nil {
                return nil, err
        }
-       seg, err := newSegment(ctx, segPath)
+       sdb, err := newSeriesDataBase(ctx, seriesPath)
        if err != nil {
                return nil, err
        }
-       {
-               s.Lock()
-               defer s.Unlock()
-               s.lst = append(s.lst, seg)
-       }
+       s.seriesDatabase = sdb
+
        return s, nil
 }
 
-func (s *shard) stop() {
-       for _, seg := range s.lst {
-               seg.close()
-       }
+func (s *shard) Close() error {
+       return s.seriesDatabase.Close()
 }
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index b14a932..503b0e8 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -33,9 +33,10 @@ import (
 )
 
 const (
-       shardTemplate = "%s/shard-%d"
-       segTemplate   = "%s/seg-%s"
-       blockTemplate = "%s/block-%s"
+       shardTemplate  = "%s/shard-%d"
+       seriesTemplate = "%s/series"
+       segTemplate    = "%s/seg-%s"
+       blockTemplate  = "%s/block-%s"
 
        segFormat   = "20060102"
        blockFormat = "1504"
@@ -49,6 +50,7 @@ type Database interface {
 }
 
 type Shard interface {
+       io.Closer
        Series() SeriesDatabase
        Index() IndexDatabase
 }
@@ -97,7 +99,10 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) 
(Database, error) {
 }
 
 func (d *database) Close() error {
-       panic("implement me")
+       for _, s := range d.sLst {
+               _ = s.Close()
+       }
+       return nil
 }
 
 func (d *database) Shards() []Shard {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index f791d2b..7b59037 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -36,8 +36,10 @@ func TestOpenDatabase(t *testing.T) {
        defer removeDir(tempDir)
        shardPath := fmt.Sprintf(shardTemplate, tempDir, 0)
        validateDirectory(tester, shardPath)
+       seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
+       validateDirectory(tester, seriesPath)
        now := time.Now()
-       segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segFormat))
+       segPath := fmt.Sprintf(segTemplate, seriesPath, now.Format(segFormat))
        validateDirectory(tester, segPath)
        validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, 
now.Format(blockFormat)))
 }

Reply via email to