This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/time-series by this push:
new fa138c1 Implement series database
fa138c1 is described below
commit fa138c149703c55930ebf8a89b7b4b5f711bbb2b
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Aug 31 13:26:12 2021 +0800
Implement series database
Signed-off-by: Gao Hongtao <[email protected]>
---
banyand/kv/kv.go | 7 +-
banyand/tsdb/series.go | 14 ++-
banyand/tsdb/seriesdb.go | 38 ++++--
banyand/tsdb/seriesdb_test.go | 209 +++++++++++++++++++++++++++++++-
banyand/tsdb/tsdb_test.go | 20 +--
pkg/logger/logger.go | 2 +
pkg/{logger/logger.go => test/space.go} | 37 +++---
7 files changed, 272 insertions(+), 55 deletions(-)
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 67d6938..ccaa8be 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -138,10 +138,15 @@ type StoreOptions func(Store)
// StoreWithLogger sets a external logger into underlying Store
func StoreWithLogger(l *logger.Logger) StoreOptions {
+ return StoreWithNamedLogger("normal-kv", l)
+}
+
+// StoreWithNamedLogger sets a external logger with a name into underlying
Store
+func StoreWithNamedLogger(name string, l *logger.Logger) StoreOptions {
return func(store Store) {
if bdb, ok := store.(*badgerDB); ok {
bdb.dbOpts = bdb.dbOpts.WithLogger(&badgerLog{
- delegated: l.Named("normal-kv"),
+ delegated: l.Named(name),
})
}
}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index c8f2bc9..810aae0 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -20,6 +20,7 @@ package tsdb
import (
"time"
+ "github.com/apache/skywalking-banyandb/api/common"
modelv2
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
)
@@ -50,6 +51,7 @@ type TimeRange struct {
}
type Series interface {
+ ID() common.SeriesID
Span(timeRange TimeRange) (SeriesSpan, error)
Get(id ItemID) (Item, error)
}
@@ -86,7 +88,17 @@ type Seeker interface {
var _ Series = (*series)(nil)
type series struct {
- id []byte
+ id common.SeriesID
+}
+
+func newSeries(id common.SeriesID) *series {
+ return &series{
+ id: id,
+ }
+}
+
+func (s *series) ID() common.SeriesID {
+ return s.id
}
func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 6075b46..b38c30f 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -27,6 +27,7 @@ import (
"go.uber.org/multierr"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -48,8 +49,8 @@ type Path struct {
isFull bool
}
-func NewPath(entries []Entry) *Path {
- p := &Path{
+func NewPath(entries []Entry) Path {
+ p := Path{
mask: make([]byte, 0),
template: make([]byte, 0),
}
@@ -80,7 +81,7 @@ func NewPath(entries []Entry) *Path {
type SeriesDatabase interface {
io.Closer
Create(entity Entity) error
- List(path Path) ([]Series, error)
+ List(path Path) (SeriesList, error)
}
var _ SeriesDatabase = (*seriesDB)(nil)
@@ -103,13 +104,14 @@ func (s *seriesDB) Close() error {
func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase,
error) {
sdb := &seriesDB{}
parentLogger := ctx.Value(logger.ContextKey)
- if parentLogger != nil {
- if pl, ok := parentLogger.(*logger.Logger); ok {
- sdb.l = pl.Named("series")
- }
+ if parentLogger == nil {
+ return nil, logger.ErrNoLoggerInContext
+ }
+ if pl, ok := parentLogger.(*logger.Logger); ok {
+ sdb.l = pl.Named("series")
}
var err error
- sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md",
kv.StoreWithLogger(sdb.l))
+ sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md",
kv.StoreWithNamedLogger("metadata", sdb.l))
if err != nil {
return nil, err
}
@@ -147,14 +149,14 @@ func (s *seriesDB) Create(entity Entity) error {
return nil
}
-func (s *seriesDB) List(path Path) ([]Series, error) {
+func (s *seriesDB) List(path Path) (SeriesList, error) {
if path.isFull {
id, err := s.seriesMetadata.Get(path.prefix)
if err != nil && err != kv.ErrKeyNotFound {
return nil, err
}
if err == nil {
- return []Series{&series{id: id}}, nil
+ return
[]Series{newSeries(common.SeriesID(convert.BytesToUint64(id)))}, nil
}
return nil, nil
}
@@ -171,7 +173,7 @@ func (s *seriesDB) List(path Path) ([]Series, error) {
err = multierr.Append(err, errGetVal)
return nil
}
- result = append(result, &series{id: id})
+ result = append(result,
newSeries(common.SeriesID(convert.BytesToUint64(id))))
}
return nil
})
@@ -192,3 +194,17 @@ func hashEntity(entity Entity) []byte {
func hash(entry []byte) []byte {
return convert.Uint64ToBytes(convert.Hash(entry))
}
+
+type SeriesList []Series
+
+func (a SeriesList) Len() int {
+ return len(a)
+}
+
+func (a SeriesList) Less(i, j int) bool {
+ return a[i].ID() < a[j].ID()
+}
+
+func (a SeriesList) Swap(i, j int) {
+ a[i], a[j] = a[j], a[i]
+}
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index deef085..098648a 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -19,11 +19,16 @@ package tsdb
import (
"bytes"
+ "context"
+ "sort"
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
)
func TestNewPath(t *testing.T) {
@@ -31,7 +36,7 @@ func TestNewPath(t *testing.T) {
tests := []struct {
name string
entity Entity
- want *Path
+ want Path
}{
{
name: "general path",
@@ -40,7 +45,7 @@ func TestNewPath(t *testing.T) {
Entry("10.0.0.1"),
Entry(convert.Uint64ToBytes(0)),
},
- want: &Path{
+ want: Path{
isFull: true,
prefix: bytes.Join([][]byte{
hash([]byte("productpage")),
@@ -66,7 +71,7 @@ func TestNewPath(t *testing.T) {
Entry("10.0.0.1"),
Entry(convert.Uint64ToBytes(0)),
},
- want: &Path{
+ want: Path{
prefix: []byte{},
template: bytes.Join([][]byte{
zeroIntBytes,
@@ -87,7 +92,7 @@ func TestNewPath(t *testing.T) {
AnyEntry,
Entry(convert.Uint64ToBytes(0)),
},
- want: &Path{
+ want: Path{
prefix: bytes.Join([][]byte{
hash([]byte("productpage")),
}, nil),
@@ -110,7 +115,7 @@ func TestNewPath(t *testing.T) {
Entry("10.0.0.1"),
AnyEntry,
},
- want: &Path{
+ want: Path{
prefix: bytes.Join([][]byte{
hash([]byte("productpage")),
hash([]byte("10.0.0.1")),
@@ -135,3 +140,197 @@ func TestNewPath(t *testing.T) {
})
}
}
+
+func Test_SeriesDatabase_Create(t *testing.T) {
+
+ tests := []struct {
+ name string
+ entities []Entity
+ }{
+ {
+ name: "general entity",
+ entities: []Entity{{
+ Entry("productpage"),
+ Entry("10.0.0.1"),
+ Entry(convert.Uint64ToBytes(0)),
+ }},
+ },
+ {
+ name: "duplicated entity",
+ entities: []Entity{
+ {
+ Entry("productpage"),
+ Entry("10.0.0.1"),
+ Entry(convert.Uint64ToBytes(0)),
+ },
+ {
+ Entry("productpage"),
+ Entry("10.0.0.1"),
+ Entry(convert.Uint64ToBytes(0)),
+ },
+ },
+ },
+ }
+ tester := assert.New(t)
+ tester.NoError(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "debug",
+ }))
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ dir, deferFunc := test.Space(tester)
+ defer deferFunc()
+ s, err :=
newSeriesDataBase(context.WithValue(context.Background(), logger.ContextKey,
logger.GetLogger("test")), dir)
+ tester.NoError(err)
+ for _, entity := range tt.entities {
+ tester.NoError(s.Create(entity))
+ }
+ })
+ }
+}
+
+func Test_SeriesDatabase_List(t *testing.T) {
+ tester := assert.New(t)
+ tester.NoError(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "debug",
+ }))
+ dir, deferFunc := test.Space(tester)
+ defer deferFunc()
+ s, err := newSeriesDataBase(context.WithValue(context.Background(),
logger.ContextKey, logger.GetLogger("test")), dir)
+ tester.NoError(err)
+ data := setUpEntities(tester, s)
+ tests := []struct {
+ name string
+ path Path
+ wantErr bool
+ want SeriesList
+ }{
+ {
+ name: "equal",
+ path: NewPath([]Entry{
+ Entry("productpage"),
+ Entry("10.0.0.1"),
+ convert.Uint64ToBytes(0),
+ }),
+ want: SeriesList{
+ newSeries(data[0].id),
+ },
+ },
+ {
+ name: "all in an instance",
+ path: NewPath([]Entry{
+ Entry("productpage"),
+ Entry("10.0.0.2"),
+ AnyEntry,
+ }),
+ want: SeriesList{
+ newSeries(data[1].id),
+ newSeries(data[2].id),
+ },
+ },
+ {
+ name: "all in a service",
+ path: NewPath([]Entry{
+ Entry("productpage"),
+ AnyEntry,
+ AnyEntry,
+ }),
+ want: SeriesList{
+ newSeries(data[0].id),
+ newSeries(data[1].id),
+ newSeries(data[2].id),
+ newSeries(data[3].id),
+ },
+ },
+ {
+ name: "all successful",
+ path: NewPath([]Entry{
+ AnyEntry,
+ AnyEntry,
+ convert.Uint64ToBytes(0),
+ }),
+ want: SeriesList{
+ newSeries(data[0].id),
+ newSeries(data[1].id),
+ newSeries(data[3].id),
+ },
+ },
+ {
+ name: "all error",
+ path: NewPath([]Entry{
+ AnyEntry,
+ AnyEntry,
+ convert.Uint64ToBytes(1),
+ }),
+ want: SeriesList{
+ newSeries(data[2].id),
+ newSeries(data[4].id),
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ series, err := s.List(tt.path)
+ if tt.wantErr {
+ tester.Error(err)
+ return
+ }
+ tester.NoError(err)
+ sort.Sort(tt.want)
+ sort.Sort(series)
+ tester.Equal(tt.want, series)
+ })
+ }
+}
+
+type entityWithID struct {
+ id common.SeriesID
+ entity Entity
+}
+
+func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
+ data := []*entityWithID{
+ {
+ entity: Entity{
+ Entry("productpage"),
+ Entry("10.0.0.1"),
+ convert.Uint64ToBytes(0),
+ },
+ },
+ {
+ entity: Entity{
+ Entry("productpage"),
+ Entry("10.0.0.2"),
+ convert.Uint64ToBytes(0),
+ },
+ },
+ {
+ entity: Entity{
+ Entry("productpage"),
+ Entry("10.0.0.2"),
+ convert.Uint64ToBytes(1),
+ },
+ },
+ {
+ entity: Entity{
+ Entry("productpage"),
+ Entry("10.0.0.3"),
+ convert.Uint64ToBytes(0),
+ },
+ },
+ {
+ entity: Entity{
+ Entry("payment"),
+ Entry("10.0.0.2"),
+ convert.Uint64ToBytes(1),
+ },
+ },
+ }
+ for _, d := range data {
+ d.id =
common.SeriesID(convert.BytesToUint64(hash(hashEntity(d.entity))))
+ t.NoError(db.Create(d.entity))
+ }
+ return data
+}
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 7b59037..e12e0a6 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -20,7 +20,6 @@ package tsdb
import (
"context"
"fmt"
- "io/ioutil"
"os"
"testing"
"time"
@@ -28,12 +27,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
)
func TestOpenDatabase(t *testing.T) {
tester := assert.New(t)
- tempDir, _ := setUp(tester)
- defer removeDir(tempDir)
+ tempDir, deferFunc, _ := setUp(tester)
+ defer deferFunc()
shardPath := fmt.Sprintf(shardTemplate, tempDir, 0)
validateDirectory(tester, shardPath)
seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
@@ -44,14 +44,12 @@ func TestOpenDatabase(t *testing.T) {
validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath,
now.Format(blockFormat)))
}
-func setUp(t *assert.Assertions) (tempDir string, db Database) {
+func setUp(t *assert.Assertions) (tempDir string, deferFunc func(), db
Database) {
t.NoError(logger.Init(logger.Logging{
Env: "dev",
Level: "debug",
}))
- var tempDirErr error
- tempDir, tempDirErr = ioutil.TempDir("", "banyandb-test-*")
- t.Nil(tempDirErr)
+ tempDir, deferFunc = test.Space(t)
db, err := OpenDatabase(
context.WithValue(context.Background(), logger.ContextKey,
logger.GetLogger("test")),
DatabaseOpts{
@@ -60,7 +58,7 @@ func setUp(t *assert.Assertions) (tempDir string, db
Database) {
})
t.NoError(err)
t.NotNil(db)
- return tempDir, db
+ return tempDir, deferFunc, db
}
func validateDirectory(t *assert.Assertions, dir string) {
@@ -69,9 +67,3 @@ func validateDirectory(t *assert.Assertions, dir string) {
t.NoError(err, "Directory error: %v", dir)
t.True(info.IsDir(), "Directory is a file, not a directory: %#v\n", dir)
}
-
-func removeDir(dir string) {
- if err := os.RemoveAll(dir); err != nil {
- fmt.Printf("Error while removing dir: %v\n", err)
- }
-}
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 15c8e59..1407a4f 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -20,10 +20,12 @@ package logger
import (
"strings"
+ "github.com/pkg/errors"
"github.com/rs/zerolog"
)
var ContextKey = contextKey{}
+var ErrNoLoggerInContext = errors.New("no logger in context")
type contextKey struct{}
diff --git a/pkg/logger/logger.go b/pkg/test/space.go
similarity index 60%
copy from pkg/logger/logger.go
copy to pkg/test/space.go
index 15c8e59..c914b94 100644
--- a/pkg/logger/logger.go
+++ b/pkg/test/space.go
@@ -15,32 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-package logger
+package test
import (
- "strings"
+ "fmt"
+ "io/ioutil"
+ "os"
- "github.com/rs/zerolog"
+ "github.com/stretchr/testify/assert"
)
-var ContextKey = contextKey{}
-
-type contextKey struct{}
-
-// Logging is the config info
-type Logging struct {
- Env string
- Level string
-}
-
-// Logger is wrapper for rs/zerolog logger with module, it is singleton.
-type Logger struct {
- module string
- *zerolog.Logger
-}
-
-func (l *Logger) Named(name string) *Logger {
- module := strings.Join([]string{l.module, name}, ".")
- subLogger := root.Logger.With().Str("module", module).Logger()
- return &Logger{module: module, Logger: &subLogger}
+func Space(t *assert.Assertions) (tempDir string, deferFunc func()) {
+ var tempDirErr error
+ tempDir, tempDirErr = ioutil.TempDir("", "banyandb-test-*")
+ t.Nil(tempDirErr)
+ return tempDir, func() {
+ if err := os.RemoveAll(tempDir); err != nil {
+ _, _ = fmt.Fprintf(os.Stderr, "Error while removing
dir: %v\n", err)
+ }
+ }
}