This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/time-series by this push:
     new fa138c1  Implement series database
fa138c1 is described below

commit fa138c149703c55930ebf8a89b7b4b5f711bbb2b
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Aug 31 13:26:12 2021 +0800

    Implement series database
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/kv/kv.go                        |   7 +-
 banyand/tsdb/series.go                  |  14 ++-
 banyand/tsdb/seriesdb.go                |  38 ++++--
 banyand/tsdb/seriesdb_test.go           | 209 +++++++++++++++++++++++++++++++-
 banyand/tsdb/tsdb_test.go               |  20 +--
 pkg/logger/logger.go                    |   2 +
 pkg/{logger/logger.go => test/space.go} |  37 +++---
 7 files changed, 272 insertions(+), 55 deletions(-)

diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 67d6938..ccaa8be 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -138,10 +138,15 @@ type StoreOptions func(Store)
 
 // StoreWithLogger sets a external logger into underlying Store
 func StoreWithLogger(l *logger.Logger) StoreOptions {
+       return StoreWithNamedLogger("normal-kv", l)
+}
+
+// StoreWithNamedLogger sets a external logger with a name into underlying 
Store
+func StoreWithNamedLogger(name string, l *logger.Logger) StoreOptions {
        return func(store Store) {
                if bdb, ok := store.(*badgerDB); ok {
                        bdb.dbOpts = bdb.dbOpts.WithLogger(&badgerLog{
-                               delegated: l.Named("normal-kv"),
+                               delegated: l.Named(name),
                        })
                }
        }
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index c8f2bc9..810aae0 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -20,6 +20,7 @@ package tsdb
 import (
        "time"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        modelv2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 )
 
@@ -50,6 +51,7 @@ type TimeRange struct {
 }
 
 type Series interface {
+       ID() common.SeriesID
        Span(timeRange TimeRange) (SeriesSpan, error)
        Get(id ItemID) (Item, error)
 }
@@ -86,7 +88,17 @@ type Seeker interface {
 var _ Series = (*series)(nil)
 
 type series struct {
-       id []byte
+       id common.SeriesID
+}
+
+func newSeries(id common.SeriesID) *series {
+       return &series{
+               id: id,
+       }
+}
+
+func (s *series) ID() common.SeriesID {
+       return s.id
 }
 
 func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 6075b46..b38c30f 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -27,6 +27,7 @@ import (
 
        "go.uber.org/multierr"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -48,8 +49,8 @@ type Path struct {
        isFull   bool
 }
 
-func NewPath(entries []Entry) *Path {
-       p := &Path{
+func NewPath(entries []Entry) Path {
+       p := Path{
                mask:     make([]byte, 0),
                template: make([]byte, 0),
        }
@@ -80,7 +81,7 @@ func NewPath(entries []Entry) *Path {
 type SeriesDatabase interface {
        io.Closer
        Create(entity Entity) error
-       List(path Path) ([]Series, error)
+       List(path Path) (SeriesList, error)
 }
 
 var _ SeriesDatabase = (*seriesDB)(nil)
@@ -103,13 +104,14 @@ func (s *seriesDB) Close() error {
 func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, 
error) {
        sdb := &seriesDB{}
        parentLogger := ctx.Value(logger.ContextKey)
-       if parentLogger != nil {
-               if pl, ok := parentLogger.(*logger.Logger); ok {
-                       sdb.l = pl.Named("series")
-               }
+       if parentLogger == nil {
+               return nil, logger.ErrNoLoggerInContext
+       }
+       if pl, ok := parentLogger.(*logger.Logger); ok {
+               sdb.l = pl.Named("series")
        }
        var err error
-       sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", 
kv.StoreWithLogger(sdb.l))
+       sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", 
kv.StoreWithNamedLogger("metadata", sdb.l))
        if err != nil {
                return nil, err
        }
@@ -147,14 +149,14 @@ func (s *seriesDB) Create(entity Entity) error {
        return nil
 }
 
-func (s *seriesDB) List(path Path) ([]Series, error) {
+func (s *seriesDB) List(path Path) (SeriesList, error) {
        if path.isFull {
                id, err := s.seriesMetadata.Get(path.prefix)
                if err != nil && err != kv.ErrKeyNotFound {
                        return nil, err
                }
                if err == nil {
-                       return []Series{&series{id: id}}, nil
+                       return 
[]Series{newSeries(common.SeriesID(convert.BytesToUint64(id)))}, nil
                }
                return nil, nil
        }
@@ -171,7 +173,7 @@ func (s *seriesDB) List(path Path) ([]Series, error) {
                                err = multierr.Append(err, errGetVal)
                                return nil
                        }
-                       result = append(result, &series{id: id})
+                       result = append(result, 
newSeries(common.SeriesID(convert.BytesToUint64(id))))
                }
                return nil
        })
@@ -192,3 +194,17 @@ func hashEntity(entity Entity) []byte {
 func hash(entry []byte) []byte {
        return convert.Uint64ToBytes(convert.Hash(entry))
 }
+
+type SeriesList []Series
+
+func (a SeriesList) Len() int {
+       return len(a)
+}
+
+func (a SeriesList) Less(i, j int) bool {
+       return a[i].ID() < a[j].ID()
+}
+
+func (a SeriesList) Swap(i, j int) {
+       a[i], a[j] = a[j], a[i]
+}
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index deef085..098648a 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -19,11 +19,16 @@ package tsdb
 
 import (
        "bytes"
+       "context"
+       "sort"
        "testing"
 
        "github.com/stretchr/testify/assert"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
 func TestNewPath(t *testing.T) {
@@ -31,7 +36,7 @@ func TestNewPath(t *testing.T) {
        tests := []struct {
                name   string
                entity Entity
-               want   *Path
+               want   Path
        }{
                {
                        name: "general path",
@@ -40,7 +45,7 @@ func TestNewPath(t *testing.T) {
                                Entry("10.0.0.1"),
                                Entry(convert.Uint64ToBytes(0)),
                        },
-                       want: &Path{
+                       want: Path{
                                isFull: true,
                                prefix: bytes.Join([][]byte{
                                        hash([]byte("productpage")),
@@ -66,7 +71,7 @@ func TestNewPath(t *testing.T) {
                                Entry("10.0.0.1"),
                                Entry(convert.Uint64ToBytes(0)),
                        },
-                       want: &Path{
+                       want: Path{
                                prefix: []byte{},
                                template: bytes.Join([][]byte{
                                        zeroIntBytes,
@@ -87,7 +92,7 @@ func TestNewPath(t *testing.T) {
                                AnyEntry,
                                Entry(convert.Uint64ToBytes(0)),
                        },
-                       want: &Path{
+                       want: Path{
                                prefix: bytes.Join([][]byte{
                                        hash([]byte("productpage")),
                                }, nil),
@@ -110,7 +115,7 @@ func TestNewPath(t *testing.T) {
                                Entry("10.0.0.1"),
                                AnyEntry,
                        },
-                       want: &Path{
+                       want: Path{
                                prefix: bytes.Join([][]byte{
                                        hash([]byte("productpage")),
                                        hash([]byte("10.0.0.1")),
@@ -135,3 +140,197 @@ func TestNewPath(t *testing.T) {
                })
        }
 }
+
+func Test_SeriesDatabase_Create(t *testing.T) {
+
+       tests := []struct {
+               name     string
+               entities []Entity
+       }{
+               {
+                       name: "general entity",
+                       entities: []Entity{{
+                               Entry("productpage"),
+                               Entry("10.0.0.1"),
+                               Entry(convert.Uint64ToBytes(0)),
+                       }},
+               },
+               {
+                       name: "duplicated entity",
+                       entities: []Entity{
+                               {
+                                       Entry("productpage"),
+                                       Entry("10.0.0.1"),
+                                       Entry(convert.Uint64ToBytes(0)),
+                               },
+                               {
+                                       Entry("productpage"),
+                                       Entry("10.0.0.1"),
+                                       Entry(convert.Uint64ToBytes(0)),
+                               },
+                       },
+               },
+       }
+       tester := assert.New(t)
+       tester.NoError(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: "debug",
+       }))
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       dir, deferFunc := test.Space(tester)
+                       defer deferFunc()
+                       s, err := 
newSeriesDataBase(context.WithValue(context.Background(), logger.ContextKey, 
logger.GetLogger("test")), dir)
+                       tester.NoError(err)
+                       for _, entity := range tt.entities {
+                               tester.NoError(s.Create(entity))
+                       }
+               })
+       }
+}
+
+func Test_SeriesDatabase_List(t *testing.T) {
+       tester := assert.New(t)
+       tester.NoError(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: "debug",
+       }))
+       dir, deferFunc := test.Space(tester)
+       defer deferFunc()
+       s, err := newSeriesDataBase(context.WithValue(context.Background(), 
logger.ContextKey, logger.GetLogger("test")), dir)
+       tester.NoError(err)
+       data := setUpEntities(tester, s)
+       tests := []struct {
+               name    string
+               path    Path
+               wantErr bool
+               want    SeriesList
+       }{
+               {
+                       name: "equal",
+                       path: NewPath([]Entry{
+                               Entry("productpage"),
+                               Entry("10.0.0.1"),
+                               convert.Uint64ToBytes(0),
+                       }),
+                       want: SeriesList{
+                               newSeries(data[0].id),
+                       },
+               },
+               {
+                       name: "all in an instance",
+                       path: NewPath([]Entry{
+                               Entry("productpage"),
+                               Entry("10.0.0.2"),
+                               AnyEntry,
+                       }),
+                       want: SeriesList{
+                               newSeries(data[1].id),
+                               newSeries(data[2].id),
+                       },
+               },
+               {
+                       name: "all in a service",
+                       path: NewPath([]Entry{
+                               Entry("productpage"),
+                               AnyEntry,
+                               AnyEntry,
+                       }),
+                       want: SeriesList{
+                               newSeries(data[0].id),
+                               newSeries(data[1].id),
+                               newSeries(data[2].id),
+                               newSeries(data[3].id),
+                       },
+               },
+               {
+                       name: "all successful",
+                       path: NewPath([]Entry{
+                               AnyEntry,
+                               AnyEntry,
+                               convert.Uint64ToBytes(0),
+                       }),
+                       want: SeriesList{
+                               newSeries(data[0].id),
+                               newSeries(data[1].id),
+                               newSeries(data[3].id),
+                       },
+               },
+               {
+                       name: "all error",
+                       path: NewPath([]Entry{
+                               AnyEntry,
+                               AnyEntry,
+                               convert.Uint64ToBytes(1),
+                       }),
+                       want: SeriesList{
+                               newSeries(data[2].id),
+                               newSeries(data[4].id),
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       series, err := s.List(tt.path)
+                       if tt.wantErr {
+                               tester.Error(err)
+                               return
+                       }
+                       tester.NoError(err)
+                       sort.Sort(tt.want)
+                       sort.Sort(series)
+                       tester.Equal(tt.want, series)
+               })
+       }
+}
+
+type entityWithID struct {
+       id     common.SeriesID
+       entity Entity
+}
+
+func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
+       data := []*entityWithID{
+               {
+                       entity: Entity{
+                               Entry("productpage"),
+                               Entry("10.0.0.1"),
+                               convert.Uint64ToBytes(0),
+                       },
+               },
+               {
+                       entity: Entity{
+                               Entry("productpage"),
+                               Entry("10.0.0.2"),
+                               convert.Uint64ToBytes(0),
+                       },
+               },
+               {
+                       entity: Entity{
+                               Entry("productpage"),
+                               Entry("10.0.0.2"),
+                               convert.Uint64ToBytes(1),
+                       },
+               },
+               {
+                       entity: Entity{
+                               Entry("productpage"),
+                               Entry("10.0.0.3"),
+                               convert.Uint64ToBytes(0),
+                       },
+               },
+               {
+                       entity: Entity{
+                               Entry("payment"),
+                               Entry("10.0.0.2"),
+                               convert.Uint64ToBytes(1),
+                       },
+               },
+       }
+       for _, d := range data {
+               d.id = 
common.SeriesID(convert.BytesToUint64(hash(hashEntity(d.entity))))
+               t.NoError(db.Create(d.entity))
+       }
+       return data
+}
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 7b59037..e12e0a6 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -20,7 +20,6 @@ package tsdb
 import (
        "context"
        "fmt"
-       "io/ioutil"
        "os"
        "testing"
        "time"
@@ -28,12 +27,13 @@ import (
        "github.com/stretchr/testify/assert"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
 func TestOpenDatabase(t *testing.T) {
        tester := assert.New(t)
-       tempDir, _ := setUp(tester)
-       defer removeDir(tempDir)
+       tempDir, deferFunc, _ := setUp(tester)
+       defer deferFunc()
        shardPath := fmt.Sprintf(shardTemplate, tempDir, 0)
        validateDirectory(tester, shardPath)
        seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
@@ -44,14 +44,12 @@ func TestOpenDatabase(t *testing.T) {
        validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, 
now.Format(blockFormat)))
 }
 
-func setUp(t *assert.Assertions) (tempDir string, db Database) {
+func setUp(t *assert.Assertions) (tempDir string, deferFunc func(), db 
Database) {
        t.NoError(logger.Init(logger.Logging{
                Env:   "dev",
                Level: "debug",
        }))
-       var tempDirErr error
-       tempDir, tempDirErr = ioutil.TempDir("", "banyandb-test-*")
-       t.Nil(tempDirErr)
+       tempDir, deferFunc = test.Space(t)
        db, err := OpenDatabase(
                context.WithValue(context.Background(), logger.ContextKey, 
logger.GetLogger("test")),
                DatabaseOpts{
@@ -60,7 +58,7 @@ func setUp(t *assert.Assertions) (tempDir string, db 
Database) {
                })
        t.NoError(err)
        t.NotNil(db)
-       return tempDir, db
+       return tempDir, deferFunc, db
 }
 
 func validateDirectory(t *assert.Assertions, dir string) {
@@ -69,9 +67,3 @@ func validateDirectory(t *assert.Assertions, dir string) {
        t.NoError(err, "Directory error: %v", dir)
        t.True(info.IsDir(), "Directory is a file, not a directory: %#v\n", dir)
 }
-
-func removeDir(dir string) {
-       if err := os.RemoveAll(dir); err != nil {
-               fmt.Printf("Error while removing dir: %v\n", err)
-       }
-}
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 15c8e59..1407a4f 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -20,10 +20,12 @@ package logger
 import (
        "strings"
 
+       "github.com/pkg/errors"
        "github.com/rs/zerolog"
 )
 
 var ContextKey = contextKey{}
+var ErrNoLoggerInContext = errors.New("no logger in context")
 
 type contextKey struct{}
 
diff --git a/pkg/logger/logger.go b/pkg/test/space.go
similarity index 60%
copy from pkg/logger/logger.go
copy to pkg/test/space.go
index 15c8e59..c914b94 100644
--- a/pkg/logger/logger.go
+++ b/pkg/test/space.go
@@ -15,32 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package logger
+package test
 
 import (
-       "strings"
+       "fmt"
+       "io/ioutil"
+       "os"
 
-       "github.com/rs/zerolog"
+       "github.com/stretchr/testify/assert"
 )
 
-var ContextKey = contextKey{}
-
-type contextKey struct{}
-
-// Logging is the config info
-type Logging struct {
-       Env   string
-       Level string
-}
-
-// Logger is wrapper for rs/zerolog logger with module, it is singleton.
-type Logger struct {
-       module string
-       *zerolog.Logger
-}
-
-func (l *Logger) Named(name string) *Logger {
-       module := strings.Join([]string{l.module, name}, ".")
-       subLogger := root.Logger.With().Str("module", module).Logger()
-       return &Logger{module: module, Logger: &subLogger}
+func Space(t *assert.Assertions) (tempDir string, deferFunc func()) {
+       var tempDirErr error
+       tempDir, tempDirErr = ioutil.TempDir("", "banyandb-test-*")
+       t.Nil(tempDirErr)
+       return tempDir, func() {
+               if err := os.RemoveAll(tempDir); err != nil {
+                       _, _ = fmt.Fprintf(os.Stderr, "Error while removing 
dir: %v\n", err)
+               }
+       }
 }

Reply via email to