This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new acede5e0 feat(metadata):expose auto-compaction (#654)
acede5e0 is described below
commit acede5e008032236e43660211d65fb08f398c7a2
Author: Raiki <[email protected]>
AuthorDate: Tue Apr 29 19:44:38 2025 +0800
feat(metadata):expose auto-compaction (#654)
* feat(metadata):expose auto-compaction
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: Gao Hongtao <[email protected]>
---
banyand/metadata/embeddedetcd/server.go | 39 ++++++-
banyand/metadata/embeddedserver/server.go | 83 ++++++++++++++-
banyand/metadata/embeddedserver/server_test.go | 118 +++++++++++++++++++++
banyand/metadata/schema/etcd_test.go | 6 +-
banyand/metadata/schema/register_test.go | 12 ++-
banyand/metadata/schema/watcher_test.go | 6 +-
docs/operation/configuration.md | 4 +
.../distributed/backup/backup_suite_test.go | 6 +-
.../distributed/lifecycle/lifecycle_suite_test.go | 6 +-
.../distributed/query/query_suite_test.go | 6 +-
.../distributed/setup/setup_suite_test.go | 6 +-
test/integration/standalone/other/disk_test.go | 6 +-
12 files changed, 281 insertions(+), 17 deletions(-)
diff --git a/banyand/metadata/embeddedetcd/server.go
b/banyand/metadata/embeddedetcd/server.go
index 0da9153e..6f55c207 100644
--- a/banyand/metadata/embeddedetcd/server.go
+++ b/banyand/metadata/embeddedetcd/server.go
@@ -63,13 +63,40 @@ func ConfigureListener(lcs, lps []string) Option {
}
}
+// AutoCompactionMode sets the auto compaction mode.
+func AutoCompactionMode(mode string) Option {
+ return func(config *config) {
+ config.autoCompactionMode = mode
+ }
+}
+
+// AutoCompactionRetention sets the auto compaction retention.
+func AutoCompactionRetention(retention string) Option {
+ return func(config *config) {
+ config.autoCompactionRetention = retention
+ }
+}
+
+// QuotaBackendBytes sets the quota for backend storage.
+func QuotaBackendBytes(quota int64) Option {
+ return func(config *config) {
+ config.quotaBackendBytes = quota
+ }
+}
+
type config struct {
// rootDir is the root directory for etcd storage
rootDir string
+ // autoCompactionMode is the auto compaction mode
+ autoCompactionMode string
+ // autoCompactionRetention is the auto compaction retention
+ autoCompactionRetention string
// listenerClientURLs is the listener for client
listenerClientURLs []string
// listenerPeerURLs is the listener for peer
listenerPeerURLs []string
+ // quotaBackendBytes is the quota for backend storage
+ quotaBackendBytes int64
}
func (e *server) ReadyNotify() <-chan struct{} {
@@ -93,9 +120,12 @@ func (e *server) Close() error {
// NewServer returns a new etcd server.
func NewServer(options ...Option) (Server, error) {
conf := &config{
- rootDir: os.TempDir(),
- listenerClientURLs: []string{embed.DefaultListenClientURLs},
- listenerPeerURLs: []string{embed.DefaultListenPeerURLs},
+ rootDir: os.TempDir(),
+ listenerClientURLs:
[]string{embed.DefaultListenClientURLs},
+ listenerPeerURLs: []string{embed.DefaultListenPeerURLs},
+ autoCompactionMode: "periodic",
+ autoCompactionRetention: "1h",
+ quotaBackendBytes: 2 * 1024 * 1024 * 1024,
}
for _, opt := range options {
opt(conf)
@@ -152,6 +182,9 @@ func newEmbedEtcdConfig(config *config, logger *zap.Logger)
(*embed.Config, erro
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
+ cfg.AutoCompactionMode = config.autoCompactionMode
+ cfg.AutoCompactionRetention = config.autoCompactionRetention
+ cfg.QuotaBackendBytes = config.quotaBackendBytes
cfg.BackendBatchInterval = 500 * time.Millisecond
cfg.BackendBatchLimit = 10000
diff --git a/banyand/metadata/embeddedserver/server.go
b/banyand/metadata/embeddedserver/server.go
index 5a240cc6..f1bbbb5d 100644
--- a/banyand/metadata/embeddedserver/server.go
+++ b/banyand/metadata/embeddedserver/server.go
@@ -22,19 +22,31 @@ import (
"context"
"errors"
"strings"
+ "time"
+
+ "github.com/robfig/cron/v3"
+ clientv3 "go.etcd.io/etcd/client/v3"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type server struct {
metadata.Service
- metaServer embeddedetcd.Server
- rootDir string
- listenClientURL []string
- listenPeerURL []string
+ metaServer embeddedetcd.Server
+ scheduler *timestamp.Scheduler
+ ecli *clientv3.Client
+ rootDir string
+ defragCron string
+ autoCompactionMode string
+ autoCompactionRetention string
+ listenClientURL []string
+ listenPeerURL []string
+ quotaBackendBytes run.Bytes
}
func (s *server) Name() string {
@@ -48,8 +60,12 @@ func (s *server) Role() databasev1.Role {
func (s *server) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("metadata")
fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path
of metadata")
+ fs.StringVar(&s.autoCompactionMode, "etcd-auto-compaction-mode",
"periodic", "auto compaction mode: 'periodic' or 'revision'")
+ fs.StringVar(&s.autoCompactionRetention,
"etcd-auto-compaction-retention", "1h", "auto compaction retention: e.g. '1h',
'30m', '24h' for periodic; '1000' for revision")
+ fs.StringVar(&s.defragCron, "etcd-defrag-cron", "@daily",
"defragmentation cron: e.g. '@daily', '@hourly', '0 0 * * 0', '0 */6 * * *'")
fs.StringSliceVar(&s.listenClientURL, "etcd-listen-client-url",
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
fs.StringSliceVar(&s.listenPeerURL, "etcd-listen-peer-url",
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
+ fs.VarP(&s.quotaBackendBytes, "etcd-quota-backend-bytes", "", "Quota
for backend storage")
return fs
}
@@ -63,6 +79,15 @@ func (s *server) Validate() error {
if s.listenPeerURL == nil {
return errors.New("listenPeerURL is empty")
}
+ if s.autoCompactionMode == "" {
+ return errors.New("autoCompactionMode is empty")
+ }
+ if s.autoCompactionMode != "periodic" && s.autoCompactionMode !=
"revision" {
+ return errors.New("autoCompactionMode is invalid")
+ }
+ if s.autoCompactionRetention == "" {
+ return errors.New("autoCompactionRetention is empty")
+ }
if err := s.Service.FlagSet().Set(metadata.FlagEtcdEndpointsName,
strings.Join(s.listenClientURL, ",")); err != nil {
return err
@@ -72,7 +97,9 @@ func (s *server) Validate() error {
func (s *server) PreRun(ctx context.Context) error {
var err error
- s.metaServer, err =
embeddedetcd.NewServer(embeddedetcd.RootDir(s.rootDir),
embeddedetcd.ConfigureListener(s.listenClientURL, s.listenPeerURL))
+ s.metaServer, err =
embeddedetcd.NewServer(embeddedetcd.RootDir(s.rootDir),
embeddedetcd.ConfigureListener(s.listenClientURL, s.listenPeerURL),
+ embeddedetcd.AutoCompactionMode(s.autoCompactionMode),
embeddedetcd.AutoCompactionRetention(s.autoCompactionRetention),
+ embeddedetcd.QuotaBackendBytes(int64(s.quotaBackendBytes)))
if err != nil {
return err
}
@@ -82,10 +109,17 @@ func (s *server) PreRun(ctx context.Context) error {
func (s *server) Serve() run.StopNotify {
_ = s.Service.Serve()
+ s.registerDefrag()
return s.metaServer.StoppingNotify()
}
func (s *server) GracefulStop() {
+ if s.scheduler != nil {
+ s.scheduler.Close()
+ }
+ if s.ecli != nil {
+ _ = s.ecli.Close()
+ }
s.Service.GracefulStop()
if s.metaServer != nil {
s.metaServer.Close()
@@ -103,3 +137,42 @@ func NewService(_ context.Context) (metadata.Service,
error) {
}
return s, nil
}
+
+func performDefrag(listenURLs []string, ecli *clientv3.Client) error {
+ for _, listenURL := range listenURLs {
+ ctx, cancel := context.WithTimeout(context.Background(),
5*time.Second)
+ defer cancel()
+ _, err := ecli.Defragment(ctx, listenURL)
+ return err
+ }
+ return nil
+}
+
+func (s *server) registerDefrag() {
+ var (
+ err error
+ etcdLogger = logger.GetLogger().Named("etcd-server")
+ defrag = func(_ time.Time, _ *logger.Logger) bool {
+ if err = performDefrag(s.listenClientURL, s.ecli); err
!= nil {
+ etcdLogger.Error().Err(err).Msg("failed to
execute defragmentation")
+ return false
+ }
+ return true
+ }
+ parser = cron.Minute | cron.Hour | cron.Dom | cron.Month |
cron.Dow | cron.Descriptor
+ )
+
+ s.ecli, err = clientv3.New(clientv3.Config{
+ Endpoints: s.listenClientURL,
+ })
+ if err != nil {
+ etcdLogger.Error().Err(err).Msg("failed to create client")
+ return
+ }
+ s.scheduler = timestamp.NewScheduler(etcdLogger, timestamp.NewClock())
+
+ err = s.scheduler.Register("defrag", parser, s.defragCron, defrag)
+ if err != nil {
+ etcdLogger.Error().Err(err).Msg("failed to register
defragmentation")
+ }
+}
diff --git a/banyand/metadata/embeddedserver/server_test.go
b/banyand/metadata/embeddedserver/server_test.go
new file mode 100644
index 00000000..acb557cd
--- /dev/null
+++ b/banyand/metadata/embeddedserver/server_test.go
@@ -0,0 +1,118 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package embeddedserver
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/gomega"
+ clientv3 "go.etcd.io/etcd/client/v3"
+
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestDefragment(t *testing.T) {
+ gomega.RegisterFailHandler(ginkgo.Fail)
+ ginkgo.RunSpecs(t, "Defragment Suite")
+}
+
+var _ = ginkgo.Describe("Defragment", func() {
+ var (
+ etcdClient *clientv3.Client
+ etcdServer embeddedetcd.Server
+ path string
+ defFn func()
+ err error
+ endpoints, peerURLs []string
+ ports []int
+ )
+
+ ginkgo.BeforeEach(func() {
+ gomega.Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "debug",
+ })).To(gomega.Succeed())
+
+ path, defFn, err = test.NewSpace()
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ ports, err = test.AllocateFreePorts(2)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d",
ports[0])}
+ peerURLs = []string{fmt.Sprintf("http://127.0.0.1:%d",
ports[1])}
+
+ etcdServer, err = embeddedetcd.NewServer(
+ embeddedetcd.RootDir(path),
+ embeddedetcd.ConfigureListener(endpoints, peerURLs),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ <-etcdServer.ReadyNotify()
+
+ etcdClient, err = clientv3.New(clientv3.Config{
+ Endpoints: endpoints,
+ DialTimeout: 5 * time.Second,
+ })
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ })
+
+ ginkgo.AfterEach(func() {
+ if etcdClient != nil {
+ err = etcdClient.Close()
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ }
+ if etcdServer != nil {
+ err = etcdServer.Close()
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ <-etcdServer.StopNotify()
+ }
+ defFn()
+ })
+
+ ginkgo.It("should successfully perform defragmentation", func() {
+ ctx := context.Background()
+ for i := 0; i < 100; i++ {
+ _, err = etcdClient.Put(ctx, fmt.Sprintf("key-%d", i),
fmt.Sprintf("value-%d", i))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ }
+
+ for i := 0; i < 50; i++ {
+ _, err = etcdClient.Delete(ctx, fmt.Sprintf("key-%d",
i))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ }
+
+ err = performDefrag(endpoints, etcdClient)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ })
+
+ ginkgo.It("should handle invalid endpoints", func() {
+ invalidEndpoints := []string{"http://invalid-host:12345"}
+ err = performDefrag(invalidEndpoints, etcdClient)
+ gomega.Expect(err).To(gomega.HaveOccurred())
+ })
+})
diff --git a/banyand/metadata/schema/etcd_test.go
b/banyand/metadata/schema/etcd_test.go
index d288d8cb..41b66715 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -108,7 +108,11 @@ func initServerAndRegister(t *testing.T) (schema.Registry,
func()) {
endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])}
server, err := embeddedetcd.NewServer(
embeddedetcd.ConfigureListener(endpoints,
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(path))
+ embeddedetcd.RootDir(path),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
req.NoError(err)
req.NotNil(server)
<-server.ReadyNotify()
diff --git a/banyand/metadata/schema/register_test.go
b/banyand/metadata/schema/register_test.go
index 1e9ceef8..854a3d92 100644
--- a/banyand/metadata/schema/register_test.go
+++ b/banyand/metadata/schema/register_test.go
@@ -65,7 +65,11 @@ var _ = ginkgo.Describe("etcd_register", func() {
peers = []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}
server, err = embeddedetcd.NewServer(
embeddedetcd.ConfigureListener(endpoints, peers),
- embeddedetcd.RootDir(path))
+ embeddedetcd.RootDir(path),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
<-server.ReadyNotify()
r, err = schema.NewEtcdSchemaRegistry(
@@ -109,7 +113,11 @@ var _ = ginkgo.Describe("etcd_register", func() {
server, err = embeddedetcd.NewServer(
embeddedetcd.ConfigureListener(endpoints, peers),
- embeddedetcd.RootDir(path))
+ embeddedetcd.RootDir(path),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
<-server.ReadyNotify()
diff --git a/banyand/metadata/schema/watcher_test.go
b/banyand/metadata/schema/watcher_test.go
index 20c0b7e1..374f38d3 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -102,7 +102,11 @@ var _ = ginkgo.Describe("Watcher", func() {
endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d",
ports[0])}
server, err = embeddedetcd.NewServer(
embeddedetcd.ConfigureListener(endpoints,
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(path))
+ embeddedetcd.RootDir(path),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
<-server.ReadyNotify()
registry, err = schema.NewEtcdSchemaRegistry(
diff --git a/docs/operation/configuration.md b/docs/operation/configuration.md
index 387ad9dc..77d7abdd 100644
--- a/docs/operation/configuration.md
+++ b/docs/operation/configuration.md
@@ -111,6 +111,10 @@ The following flags are used to configure the stream
storage engine:
The following flags are used to configure the embedded etcd storage engine
which is only used when running as a standalone server:
- `--metadata-root-path string`: The root path of metadata (default: "/tmp").
+- `--etcd-auto-compaction-mode string`: The mode to compact the storage
(default: "periodic").
+- `--etcd-auto-compaction-retention string`: The retention period of the
storage (default: "1h").
+- `--etcd-defrag-cron string`: The scheduled task to free up disk space
(default: "@daily").
+- `--quota-backend-bytes bytes`: Quota for backend storage (default: 2.00GiB).
The following flags are used to configure the memory protector:
diff --git a/test/integration/distributed/backup/backup_suite_test.go
b/test/integration/distributed/backup/backup_suite_test.go
index 1258e7ac..f999b218 100644
--- a/test/integration/distributed/backup/backup_suite_test.go
+++ b/test/integration/distributed/backup/backup_suite_test.go
@@ -79,7 +79,11 @@ var _ = SynchronizedBeforeSuite(func() []byte {
ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
server, err := embeddedetcd.NewServer(
embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(dir))
+ embeddedetcd.RootDir(dir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
Expect(err).ShouldNot(HaveOccurred())
<-server.ReadyNotify()
By("Loading schema")
diff --git a/test/integration/distributed/lifecycle/lifecycle_suite_test.go
b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
index 4fc99cbd..c5d1f1a6 100644
--- a/test/integration/distributed/lifecycle/lifecycle_suite_test.go
+++ b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
@@ -77,7 +77,11 @@ var _ = SynchronizedBeforeSuite(func() []byte {
ep = fmt.Sprintf("http://127.0.0.1:%d", ports[0])
server, err := embeddedetcd.NewServer(
embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(srcDir))
+ embeddedetcd.RootDir(srcDir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
Expect(err).ShouldNot(HaveOccurred())
<-server.ReadyNotify()
By("Loading schema")
diff --git a/test/integration/distributed/query/query_suite_test.go
b/test/integration/distributed/query/query_suite_test.go
index 3d8eaad3..ce6a3c5d 100644
--- a/test/integration/distributed/query/query_suite_test.go
+++ b/test/integration/distributed/query/query_suite_test.go
@@ -76,7 +76,11 @@ var _ = SynchronizedBeforeSuite(func() []byte {
ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
server, err := embeddedetcd.NewServer(
embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(dir))
+ embeddedetcd.RootDir(dir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
Expect(err).ShouldNot(HaveOccurred())
<-server.ReadyNotify()
By("Loading schema")
diff --git a/test/integration/distributed/setup/setup_suite_test.go
b/test/integration/distributed/setup/setup_suite_test.go
index 3a51ab8c..9d1baf71 100644
--- a/test/integration/distributed/setup/setup_suite_test.go
+++ b/test/integration/distributed/setup/setup_suite_test.go
@@ -56,7 +56,11 @@ var _ = SynchronizedBeforeSuite(func() []byte {
ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
server, err := embeddedetcd.NewServer(
embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(dir))
+ embeddedetcd.RootDir(dir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
Expect(err).ShouldNot(HaveOccurred())
<-server.ReadyNotify()
deferFunc = func() {
diff --git a/test/integration/standalone/other/disk_test.go
b/test/integration/standalone/other/disk_test.go
index 880ca963..d2741447 100644
--- a/test/integration/standalone/other/disk_test.go
+++ b/test/integration/standalone/other/disk_test.go
@@ -82,7 +82,11 @@ var _ = g.Describe("Disk", func() {
ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
server, err := embeddedetcd.NewServer(
embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(dir))
+ embeddedetcd.RootDir(dir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
gm.Expect(err).ShouldNot(gm.HaveOccurred())
<-server.ReadyNotify()
g.By("Loading schema")