This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 42d7188 Open the tsdb on a existing path (#64)
42d7188 is described below
commit 42d71882260ca197dc59a8d7d4f43c5d5c47fb42
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jan 6 13:09:35 2022 +0800
Open the tsdb on a existing path (#64)
* Open the tsdb on an existing path
Signed-off-by: Gao Hongtao <[email protected]>
---
banyand/tsdb/segment.go | 37 +++++++++++++++++++---------
banyand/tsdb/shard.go | 34 +++++++++++++++++++-------
banyand/tsdb/tsdb.go | 61 +++++++++++++++++++++++++++++++++++++++++------
banyand/tsdb/tsdb_test.go | 28 ++++++++++++++++++----
4 files changed, 128 insertions(+), 32 deletions(-)
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index b2dad50..a2450a4 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -63,20 +63,35 @@ func newSegment(ctx context.Context, path string) (s
*segment, err error) {
if s.globalIndex, err = kv.OpenStore(0, indexPath,
kv.StoreWithLogger(s.l)); err != nil {
return nil, err
}
- blockPath, err := mkdir(blockTemplate, path,
time.Now().Format(blockFormat))
- if err != nil {
- return nil, err
+ loadBlock := func(path string) error {
+ var b *block
+ if b, err = newBlock(context.WithValue(ctx, logger.ContextKey,
s.l), blockOpts{
+ path: path,
+ }); err != nil {
+ return err
+ }
+ {
+ s.Lock()
+ defer s.Unlock()
+ s.lst = append(s.lst, b)
+ }
+ return nil
}
- var b *block
- if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, s.l),
blockOpts{
- path: blockPath,
- }); err != nil {
+ err = walkDir(path, blockPathPrefix, func(name, absolutePath string)
error {
+ return loadBlock(absolutePath)
+ })
+ if err != nil {
return nil, err
}
- {
- s.Lock()
- defer s.Unlock()
- s.lst = append(s.lst, b)
+ if len(s.lst) < 1 {
+ blockPath, err := mkdir(blockTemplate, path,
time.Now().Format(blockFormat))
+ if err != nil {
+ return nil, err
+ }
+ err = loadBlock(blockPath)
+ if err != nil {
+ return nil, err
+ }
}
return s, nil
}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 4bcc40e..09b6ded 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -49,23 +49,39 @@ func (s *shard) Index() IndexDatabase {
return s.indexDatabase
}
-func newShard(ctx context.Context, id common.ShardID, location string)
(*shard, error) {
+func openShard(ctx context.Context, id common.ShardID, location string)
(*shard, error) {
s := &shard{
id: id,
location: location,
}
- segPath, err := mkdir(segTemplate, location,
time.Now().Format(segFormat))
- if err != nil {
- return nil, err
+ loadSeg := func(path string) error {
+ seg, err := newSegment(ctx, path)
+ if err != nil {
+ return err
+ }
+ {
+ s.Lock()
+ defer s.Unlock()
+ s.lst = append(s.lst, seg)
+ }
+ return nil
}
- seg, err := newSegment(ctx, segPath)
+ err := walkDir(location, segPathPrefix, func(_, absolutePath string)
error {
+ return loadSeg(absolutePath)
+ })
if err != nil {
return nil, err
}
- {
- s.Lock()
- defer s.Unlock()
- s.lst = append(s.lst, seg)
+ if len(s.lst) < 1 {
+ var segPath string
+ segPath, err = mkdir(segTemplate, location,
time.Now().Format(segFormat))
+ if err != nil {
+ return nil, err
+ }
+ err = loadSeg(segPath)
+ if err != nil {
+ return nil, err
+ }
}
seriesPath, err := mkdir(seriesTemplate, s.location)
if err != nil {
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index e9ed19a..fbb710a 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -24,6 +24,8 @@ import (
"io/fs"
"io/ioutil"
"os"
+ "strconv"
+ "strings"
"sync"
"github.com/pkg/errors"
@@ -36,11 +38,16 @@ import (
)
const (
- shardTemplate = "%s/shard-%d"
- seriesTemplate = "%s/series"
- segTemplate = "%s/seg-%s"
- blockTemplate = "%s/block-%s"
- globalIndexTemplate = "%s/index"
+ shardPathPrefix = "shard"
+ pathSeparator = string(os.PathSeparator)
+ rootPrefix = "%s" + pathSeparator
+ shardTemplate = rootPrefix + shardPathPrefix + "-%d"
+ seriesTemplate = rootPrefix + "series"
+ segPathPrefix = "seg"
+ segTemplate = rootPrefix + segPathPrefix + "-%s"
+ blockPathPrefix = "block"
+ blockTemplate = rootPrefix + blockPathPrefix + "-%s"
+ globalIndexTemplate = rootPrefix + "index"
segFormat = "20060102"
blockFormat = "1504"
@@ -155,7 +162,7 @@ func createDatabase(ctx context.Context, db *database)
(Database, error) {
err = multierr.Append(err, errInternal)
continue
}
- so, errNewShard := newShard(ctx, common.ShardID(i),
shardLocation)
+ so, errNewShard := openShard(ctx, common.ShardID(i),
shardLocation)
if errNewShard != nil {
err = multierr.Append(err, errNewShard)
continue
@@ -166,10 +173,50 @@ func createDatabase(ctx context.Context, db *database)
(Database, error) {
}
func loadDatabase(ctx context.Context, db *database) (Database, error) {
- //TODO: load the existing database
+ //TODO: open the lock file
+ //TODO: open the manifest file
+ db.Lock()
+ defer db.Unlock()
+ err := walkDir(db.location, shardPathPrefix, func(name, absolutePath
string) error {
+ shardSegs := strings.Split(name, "-")
+ shardID, err := strconv.Atoi(shardSegs[1])
+ if err != nil {
+ return err
+ }
+ if shardID >= int(db.shardNum) {
+ return nil
+ }
+ so, errOpenShard := openShard(ctx, common.ShardID(shardID),
absolutePath)
+ if errOpenShard != nil {
+ return errOpenShard
+ }
+ db.sLst = append(db.sLst, so)
+ return nil
+ })
+ if err != nil {
+ return nil, errors.WithMessage(err, "load the database failed")
+ }
return db, nil
}
+type walkFn func(name, absolutePath string) error
+
+func walkDir(root, prefix string, walkFn walkFn) error {
+ files, err := ioutil.ReadDir(root)
+ if err != nil {
+ return errors.Wrapf(err, "failed to walk the database path:
%s", root)
+ }
+ for _, f := range files {
+ if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) {
+ continue
+ }
+ if walkFn(f.Name(), fmt.Sprintf(rootPrefix, root)+f.Name()) !=
nil {
+ return errors.WithMessagef(err, "failed to load: %s",
f.Name())
+ }
+ }
+ return nil
+}
+
func mkdir(format string, a ...interface{}) (path string, err error) {
path = fmt.Sprintf(format, a...)
if err = os.MkdirAll(path, dirPerm); err != nil {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index c570fdf..bfb9957 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -34,8 +34,27 @@ import (
func TestOpenDatabase(t *testing.T) {
tester := assert.New(t)
- tempDir, deferFunc, _ := setUp(require.New(t))
+ req := require.New(t)
+ tempDir, deferFunc := test.Space(req)
+ openDatabase(req, tempDir)
defer deferFunc()
+ verifyDatabaseStructure(tester, tempDir)
+}
+
+func TestReOpenDatabase(t *testing.T) {
+ tester := assert.New(t)
+ req := require.New(t)
+ tempDir, deferFunc := test.Space(req)
+ defer deferFunc()
+ db := openDatabase(req, tempDir)
+ req.NoError(db.Close())
+ verifyDatabaseStructure(tester, tempDir)
+ db = openDatabase(req, tempDir)
+ req.NoError(db.Close())
+ verifyDatabaseStructure(tester, tempDir)
+}
+
+func verifyDatabaseStructure(tester *assert.Assertions, tempDir string) {
shardPath := fmt.Sprintf(shardTemplate, tempDir, 0)
validateDirectory(tester, shardPath)
seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
@@ -46,16 +65,15 @@ func TestOpenDatabase(t *testing.T) {
validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath,
now.Format(blockFormat)))
}
-func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db
Database) {
+func openDatabase(t *require.Assertions, path string) (db Database) {
t.NoError(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
}))
- tempDir, deferFunc = test.Space(t)
db, err := OpenDatabase(
context.WithValue(context.Background(), logger.ContextKey,
logger.GetLogger("test")),
DatabaseOpts{
- Location: tempDir,
+ Location: path,
ShardNum: 1,
EncodingMethod: EncodingMethod{
EncoderPool: encoding.NewPlainEncoderPool(0),
@@ -64,7 +82,7 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc
func(), db Database
})
t.NoError(err)
t.NotNil(db)
- return tempDir, deferFunc, db
+ return db
}
func validateDirectory(t *assert.Assertions, dir string) {