This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/time-series by this push:
new 14c6087 Implement series database
14c6087 is described below
commit 14c60870bd0bf675d951b554d94bd94a74f20a2c
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Aug 30 21:39:53 2021 +0800
Implement series database
Signed-off-by: Gao Hongtao <[email protected]>
---
banyand/kv/badger.go | 14 ++--
banyand/tsdb/series.go | 14 ++++
banyand/tsdb/seriesdb.go | 170 +++++++++++++++++++++++++++++++++++++++---
banyand/tsdb/seriesdb_test.go | 137 ++++++++++++++++++++++++++++++++++
banyand/tsdb/shard.go | 27 +++----
banyand/tsdb/tsdb.go | 13 +++-
banyand/tsdb/tsdb_test.go | 4 +-
7 files changed, 341 insertions(+), 38 deletions(-)
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 64c9f29..8a35c29 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -33,11 +33,12 @@ import (
)
var (
- _ Store = (*badgerDB)(nil)
- _ IndexStore = (*badgerDB)(nil)
- _ y.Iterator = (*mergedIter)(nil)
- _ TimeSeriesStore = (*badgerTSS)(nil)
- bitMergeEntry byte = 1 << 3
+ _ Store = (*badgerDB)(nil)
+ _ IndexStore = (*badgerDB)(nil)
+ _ y.Iterator = (*mergedIter)(nil)
+ _ TimeSeriesStore = (*badgerTSS)(nil)
+ bitMergeEntry byte = 1 << 3
+ ErrKeyNotFound = badger.ErrKeyNotFound
)
type badgerTSS struct {
@@ -184,6 +185,9 @@ func (b *badgerDB) Put(key, val []byte) error {
func (b *badgerDB) Get(key []byte) ([]byte, error) {
v, err := b.db.Get(y.KeyWithTs(key, math.MaxInt64))
+ if err == badger.ErrKeyNotFound {
+ return nil, ErrKeyNotFound
+ }
if err != nil {
return nil, err
}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 03a6e65..c8f2bc9 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -82,3 +82,17 @@ type SeekerBuilder interface {
type Seeker interface {
Seek() Iterator
}
+
+var _ Series = (*series)(nil)
+
+type series struct {
+ id []byte
+}
+
+func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
+ panic("implement me")
+}
+
+func (s *series) Get(id ItemID) (Item, error) {
+ panic("implement me")
+}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 727a6af..6075b46 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -17,30 +17,178 @@
package tsdb
-import "bytes"
+import (
+ "bytes"
+ "context"
+ "io"
+ "math"
+ "sync"
+ "time"
-var AllEntry = Entry("*")
+ "go.uber.org/multierr"
-type Entity [][]byte
+ "github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
-type Path struct {
- prefix []byte
- suffixStack []Entry
-}
+var maxIntBytes = convert.Uint64ToBytes(math.MaxUint64)
+var zeroIntBytes = convert.Uint64ToBytes(0)
+
+var AnyEntry = Entry(nil)
type Entry []byte
-func (e Entry) Equal(another Entry) bool {
- return bytes.Equal(e, another)
+type Entity []Entry
+
+type Path struct {
+ prefix []byte
+ mask []byte
+ template []byte
+ isFull bool
}
-func NewPath(entries []Entry) {
- if entries[0].Equal(AllEntry) {
+func NewPath(entries []Entry) *Path {
+ p := &Path{
+ mask: make([]byte, 0),
+ template: make([]byte, 0),
+ }
+ var offset int
+ var encounterAny bool
+ for _, e := range entries {
+ if e == nil {
+ encounterAny = true
+ p.mask = append(p.mask, zeroIntBytes...)
+ p.template = append(p.template, zeroIntBytes...)
+ continue
+ }
+ entry := hash(e)
+ if !encounterAny {
+ offset += 8
+ }
+ p.mask = append(p.mask, maxIntBytes...)
+ p.template = append(p.template, entry...)
+ }
+ if !encounterAny {
+ p.isFull = true
}
+ p.prefix = p.template[:offset]
+ return p
}
type SeriesDatabase interface {
+ io.Closer
Create(entity Entity) error
List(path Path) ([]Series, error)
}
+
+var _ SeriesDatabase = (*seriesDB)(nil)
+
+type seriesDB struct {
+ sync.Mutex
+ l *logger.Logger
+
+ lst []*segment
+ seriesMetadata kv.Store
+}
+
+func (s *seriesDB) Close() error {
+ for _, seg := range s.lst {
+ seg.close()
+ }
+ return s.seriesMetadata.Close()
+}
+
+func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase,
error) {
+ sdb := &seriesDB{}
+ parentLogger := ctx.Value(logger.ContextKey)
+ if parentLogger != nil {
+ if pl, ok := parentLogger.(*logger.Logger); ok {
+ sdb.l = pl.Named("series")
+ }
+ }
+ var err error
+ sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md",
kv.StoreWithLogger(sdb.l))
+ if err != nil {
+ return nil, err
+ }
+ segPath, err := mkdir(segTemplate, path, time.Now().Format(segFormat))
+ if err != nil {
+ return nil, err
+ }
+ seg, err := newSegment(ctx, segPath)
+ if err != nil {
+ return nil, err
+ }
+ {
+ sdb.Lock()
+ defer sdb.Unlock()
+ sdb.lst = append(sdb.lst, seg)
+ }
+ return sdb, nil
+}
+
+func (s *seriesDB) Create(entity Entity) error {
+ key := hashEntity(entity)
+ _, err := s.seriesMetadata.Get(key)
+ if err != nil && err != kv.ErrKeyNotFound {
+ return err
+ }
+ if err == nil {
+ return nil
+ }
+ s.Lock()
+ defer s.Unlock()
+ err = s.seriesMetadata.Put(key, hash(key))
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *seriesDB) List(path Path) ([]Series, error) {
+ if path.isFull {
+ id, err := s.seriesMetadata.Get(path.prefix)
+ if err != nil && err != kv.ErrKeyNotFound {
+ return nil, err
+ }
+ if err == nil {
+ return []Series{&series{id: id}}, nil
+ }
+ return nil, nil
+ }
+ result := make([]Series, 0)
+ var err error
+ errScan := s.seriesMetadata.Scan(path.prefix, kv.DefaultScanOpts,
func(_ int, key []byte, getVal func() ([]byte, error)) error {
+ comparableKey := make([]byte, len(key))
+ for i, b := range key {
+ comparableKey[i] = path.mask[i] & b
+ }
+ if bytes.Equal(path.template, comparableKey) {
+ id, errGetVal := getVal()
+ if errGetVal != nil {
+ err = multierr.Append(err, errGetVal)
+ return nil
+ }
+ result = append(result, &series{id: id})
+ }
+ return nil
+ })
+ if errScan != nil {
+ return nil, errScan
+ }
+ return result, err
+}
+
+func hashEntity(entity Entity) []byte {
+ result := make(Entry, 0, len(entity)*8)
+ for _, entry := range entity {
+ result = append(result, hash(entry)...)
+ }
+ return result
+}
+
+func hash(entry []byte) []byte {
+ return convert.Uint64ToBytes(convert.Hash(entry))
+}
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
new file mode 100644
index 0000000..deef085
--- /dev/null
+++ b/banyand/tsdb/seriesdb_test.go
@@ -0,0 +1,137 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func TestNewPath(t *testing.T) {
+ tester := assert.New(t)
+ tests := []struct {
+ name string
+ entity Entity
+ want *Path
+ }{
+ {
+ name: "general path",
+ entity: Entity{
+ Entry("productpage"),
+ Entry("10.0.0.1"),
+ Entry(convert.Uint64ToBytes(0)),
+ },
+ want: &Path{
+ isFull: true,
+ prefix: bytes.Join([][]byte{
+ hash([]byte("productpage")),
+ hash([]byte("10.0.0.1")),
+ hash(convert.Uint64ToBytes(0)),
+ }, nil),
+ template: bytes.Join([][]byte{
+ hash([]byte("productpage")),
+ hash([]byte("10.0.0.1")),
+ hash(convert.Uint64ToBytes(0)),
+ }, nil),
+ mask: bytes.Join([][]byte{
+ maxIntBytes,
+ maxIntBytes,
+ maxIntBytes,
+ }, nil),
+ },
+ },
+ {
+ name: "the first is anyone",
+ entity: Entity{
+ AnyEntry,
+ Entry("10.0.0.1"),
+ Entry(convert.Uint64ToBytes(0)),
+ },
+ want: &Path{
+ prefix: []byte{},
+ template: bytes.Join([][]byte{
+ zeroIntBytes,
+ hash([]byte("10.0.0.1")),
+ hash(convert.Uint64ToBytes(0)),
+ }, nil),
+ mask: bytes.Join([][]byte{
+ zeroIntBytes,
+ maxIntBytes,
+ maxIntBytes,
+ }, nil),
+ },
+ },
+ {
+ name: "the second is anyone",
+ entity: Entity{
+ Entry("productpage"),
+ AnyEntry,
+ Entry(convert.Uint64ToBytes(0)),
+ },
+ want: &Path{
+ prefix: bytes.Join([][]byte{
+ hash([]byte("productpage")),
+ }, nil),
+ template: bytes.Join([][]byte{
+ hash([]byte("productpage")),
+ zeroIntBytes,
+ hash(convert.Uint64ToBytes(0)),
+ }, nil),
+ mask: bytes.Join([][]byte{
+ maxIntBytes,
+ zeroIntBytes,
+ maxIntBytes,
+ }, nil),
+ },
+ },
+ {
+ name: "the last is anyone",
+ entity: Entity{
+ Entry("productpage"),
+ Entry("10.0.0.1"),
+ AnyEntry,
+ },
+ want: &Path{
+ prefix: bytes.Join([][]byte{
+ hash([]byte("productpage")),
+ hash([]byte("10.0.0.1")),
+ }, nil),
+ template: bytes.Join([][]byte{
+ hash([]byte("productpage")),
+ hash([]byte("10.0.0.1")),
+ zeroIntBytes,
+ }, nil),
+ mask: bytes.Join([][]byte{
+ maxIntBytes,
+ maxIntBytes,
+ zeroIntBytes,
+ }, nil),
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := NewPath(tt.entity)
+ tester.Equal(tt.want, got)
+ })
+ }
+}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index ac58b9d..050444e 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -19,17 +19,15 @@ package tsdb
import (
"context"
- "sync"
- "time"
)
var _ Shard = (*shard)(nil)
type shard struct {
- id int
- lst []*segment
- sync.Mutex
- location string
+ id int
+
+ location string
+ seriesDatabase SeriesDatabase
}
func (s *shard) Series() SeriesDatabase {
@@ -45,24 +43,19 @@ func newShard(ctx context.Context, id int, location string)
(*shard, error) {
id: id,
location: location,
}
- segPath, err := mkdir(segTemplate, s.location,
time.Now().Format(segFormat))
+ seriesPath, err := mkdir(seriesTemplate, s.location)
if err != nil {
return nil, err
}
- seg, err := newSegment(ctx, segPath)
+ sdb, err := newSeriesDataBase(ctx, seriesPath)
if err != nil {
return nil, err
}
- {
- s.Lock()
- defer s.Unlock()
- s.lst = append(s.lst, seg)
- }
+ s.seriesDatabase = sdb
+
return s, nil
}
-func (s *shard) stop() {
- for _, seg := range s.lst {
- seg.close()
- }
+func (s *shard) Close() error {
+ return s.seriesDatabase.Close()
}
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index b14a932..503b0e8 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -33,9 +33,10 @@ import (
)
const (
- shardTemplate = "%s/shard-%d"
- segTemplate = "%s/seg-%s"
- blockTemplate = "%s/block-%s"
+ shardTemplate = "%s/shard-%d"
+ seriesTemplate = "%s/series"
+ segTemplate = "%s/seg-%s"
+ blockTemplate = "%s/block-%s"
segFormat = "20060102"
blockFormat = "1504"
@@ -49,6 +50,7 @@ type Database interface {
}
type Shard interface {
+ io.Closer
Series() SeriesDatabase
Index() IndexDatabase
}
@@ -97,7 +99,10 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts)
(Database, error) {
}
func (d *database) Close() error {
- panic("implement me")
+ for _, s := range d.sLst {
+ _ = s.Close()
+ }
+ return nil
}
func (d *database) Shards() []Shard {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index f791d2b..7b59037 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -36,8 +36,10 @@ func TestOpenDatabase(t *testing.T) {
defer removeDir(tempDir)
shardPath := fmt.Sprintf(shardTemplate, tempDir, 0)
validateDirectory(tester, shardPath)
+ seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
+ validateDirectory(tester, seriesPath)
now := time.Now()
- segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segFormat))
+ segPath := fmt.Sprintf(segTemplate, seriesPath, now.Format(segFormat))
validateDirectory(tester, segPath)
validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath,
now.Format(blockFormat)))
}