This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new acede5e0 feat(metadata):expose auto-compaction (#654)
acede5e0 is described below

commit acede5e008032236e43660211d65fb08f398c7a2
Author: Raiki <[email protected]>
AuthorDate: Tue Apr 29 19:44:38 2025 +0800

    feat(metadata):expose auto-compaction (#654)
    
    * feat(metadata):expose auto-compaction
    
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
    Co-authored-by: Gao Hongtao <[email protected]>
---
 banyand/metadata/embeddedetcd/server.go            |  39 ++++++-
 banyand/metadata/embeddedserver/server.go          |  83 ++++++++++++++-
 banyand/metadata/embeddedserver/server_test.go     | 118 +++++++++++++++++++++
 banyand/metadata/schema/etcd_test.go               |   6 +-
 banyand/metadata/schema/register_test.go           |  12 ++-
 banyand/metadata/schema/watcher_test.go            |   6 +-
 docs/operation/configuration.md                    |   4 +
 .../distributed/backup/backup_suite_test.go        |   6 +-
 .../distributed/lifecycle/lifecycle_suite_test.go  |   6 +-
 .../distributed/query/query_suite_test.go          |   6 +-
 .../distributed/setup/setup_suite_test.go          |   6 +-
 test/integration/standalone/other/disk_test.go     |   6 +-
 12 files changed, 281 insertions(+), 17 deletions(-)

diff --git a/banyand/metadata/embeddedetcd/server.go 
b/banyand/metadata/embeddedetcd/server.go
index 0da9153e..6f55c207 100644
--- a/banyand/metadata/embeddedetcd/server.go
+++ b/banyand/metadata/embeddedetcd/server.go
@@ -63,13 +63,40 @@ func ConfigureListener(lcs, lps []string) Option {
        }
 }
 
+// AutoCompactionMode sets the auto compaction mode.
+func AutoCompactionMode(mode string) Option {
+       return func(config *config) {
+               config.autoCompactionMode = mode
+       }
+}
+
+// AutoCompactionRetention sets the auto compaction retention.
+func AutoCompactionRetention(retention string) Option {
+       return func(config *config) {
+               config.autoCompactionRetention = retention
+       }
+}
+
+// QuotaBackendBytes sets the quota for backend storage.
+func QuotaBackendBytes(quota int64) Option {
+       return func(config *config) {
+               config.quotaBackendBytes = quota
+       }
+}
+
 type config struct {
        // rootDir is the root directory for etcd storage
        rootDir string
+       // autoCompactionMode is the auto compaction mode
+       autoCompactionMode string
+       // autoCompactionRetention is the auto compaction retention
+       autoCompactionRetention string
        // listenerClientURLs is the listener for client
        listenerClientURLs []string
        // listenerPeerURLs is the listener for peer
        listenerPeerURLs []string
+       // quotaBackendBytes is the quota for backend storage
+       quotaBackendBytes int64
 }
 
 func (e *server) ReadyNotify() <-chan struct{} {
@@ -93,9 +120,12 @@ func (e *server) Close() error {
 // NewServer returns a new etcd server.
 func NewServer(options ...Option) (Server, error) {
        conf := &config{
-               rootDir:            os.TempDir(),
-               listenerClientURLs: []string{embed.DefaultListenClientURLs},
-               listenerPeerURLs:   []string{embed.DefaultListenPeerURLs},
+               rootDir:                 os.TempDir(),
+               listenerClientURLs:      
[]string{embed.DefaultListenClientURLs},
+               listenerPeerURLs:        []string{embed.DefaultListenPeerURLs},
+               autoCompactionMode:      "periodic",
+               autoCompactionRetention: "1h",
+               quotaBackendBytes:       2 * 1024 * 1024 * 1024,
        }
        for _, opt := range options {
                opt(conf)
@@ -152,6 +182,9 @@ func newEmbedEtcdConfig(config *config, logger *zap.Logger) 
(*embed.Config, erro
        cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs
        cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs
        cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
+       cfg.AutoCompactionMode = config.autoCompactionMode
+       cfg.AutoCompactionRetention = config.autoCompactionRetention
+       cfg.QuotaBackendBytes = config.quotaBackendBytes
 
        cfg.BackendBatchInterval = 500 * time.Millisecond
        cfg.BackendBatchLimit = 10000
diff --git a/banyand/metadata/embeddedserver/server.go 
b/banyand/metadata/embeddedserver/server.go
index 5a240cc6..f1bbbb5d 100644
--- a/banyand/metadata/embeddedserver/server.go
+++ b/banyand/metadata/embeddedserver/server.go
@@ -22,19 +22,31 @@ import (
        "context"
        "errors"
        "strings"
+       "time"
+
+       "github.com/robfig/cron/v3"
+       clientv3 "go.etcd.io/etcd/client/v3"
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type server struct {
        metadata.Service
-       metaServer      embeddedetcd.Server
-       rootDir         string
-       listenClientURL []string
-       listenPeerURL   []string
+       metaServer              embeddedetcd.Server
+       scheduler               *timestamp.Scheduler
+       ecli                    *clientv3.Client
+       rootDir                 string
+       defragCron              string
+       autoCompactionMode      string
+       autoCompactionRetention string
+       listenClientURL         []string
+       listenPeerURL           []string
+       quotaBackendBytes       run.Bytes
 }
 
 func (s *server) Name() string {
@@ -48,8 +60,12 @@ func (s *server) Role() databasev1.Role {
 func (s *server) FlagSet() *run.FlagSet {
        fs := run.NewFlagSet("metadata")
        fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path 
of metadata")
+       fs.StringVar(&s.autoCompactionMode, "etcd-auto-compaction-mode", 
"periodic", "auto compaction mode: 'periodic' or 'revision'")
+       fs.StringVar(&s.autoCompactionRetention, 
"etcd-auto-compaction-retention", "1h", "auto compaction retention: e.g. '1h', 
'30m', '24h' for periodic; '1000' for revision")
+       fs.StringVar(&s.defragCron, "etcd-defrag-cron", "@daily", 
"defragmentation cron: e.g. '@daily', '@hourly', '0 0 * * 0', '0 */6 * * *'")
        fs.StringSliceVar(&s.listenClientURL, "etcd-listen-client-url", 
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
        fs.StringSliceVar(&s.listenPeerURL, "etcd-listen-peer-url", 
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
+       fs.VarP(&s.quotaBackendBytes, "etcd-quota-backend-bytes", "", "Quota 
for backend storage")
        return fs
 }
 
@@ -63,6 +79,15 @@ func (s *server) Validate() error {
        if s.listenPeerURL == nil {
                return errors.New("listenPeerURL is empty")
        }
+       if s.autoCompactionMode == "" {
+               return errors.New("autoCompactionMode is empty")
+       }
+       if s.autoCompactionMode != "periodic" && s.autoCompactionMode != 
"revision" {
+               return errors.New("autoCompactionMode is invalid")
+       }
+       if s.autoCompactionRetention == "" {
+               return errors.New("autoCompactionRetention is empty")
+       }
        if err := s.Service.FlagSet().Set(metadata.FlagEtcdEndpointsName,
                strings.Join(s.listenClientURL, ",")); err != nil {
                return err
@@ -72,7 +97,9 @@ func (s *server) Validate() error {
 
 func (s *server) PreRun(ctx context.Context) error {
        var err error
-       s.metaServer, err = 
embeddedetcd.NewServer(embeddedetcd.RootDir(s.rootDir), 
embeddedetcd.ConfigureListener(s.listenClientURL, s.listenPeerURL))
+       s.metaServer, err = 
embeddedetcd.NewServer(embeddedetcd.RootDir(s.rootDir), 
embeddedetcd.ConfigureListener(s.listenClientURL, s.listenPeerURL),
+               embeddedetcd.AutoCompactionMode(s.autoCompactionMode), 
embeddedetcd.AutoCompactionRetention(s.autoCompactionRetention),
+               embeddedetcd.QuotaBackendBytes(int64(s.quotaBackendBytes)))
        if err != nil {
                return err
        }
@@ -82,10 +109,17 @@ func (s *server) PreRun(ctx context.Context) error {
 
 func (s *server) Serve() run.StopNotify {
        _ = s.Service.Serve()
+       s.registerDefrag()
        return s.metaServer.StoppingNotify()
 }
 
 func (s *server) GracefulStop() {
+       if s.scheduler != nil {
+               s.scheduler.Close()
+       }
+       if s.ecli != nil {
+               _ = s.ecli.Close()
+       }
        s.Service.GracefulStop()
        if s.metaServer != nil {
                s.metaServer.Close()
@@ -103,3 +137,42 @@ func NewService(_ context.Context) (metadata.Service, 
error) {
        }
        return s, nil
 }
+
+func performDefrag(listenURLs []string, ecli *clientv3.Client) error {
+       for _, listenURL := range listenURLs {
+               ctx, cancel := context.WithTimeout(context.Background(), 
5*time.Second)
+               defer cancel()
+               _, err := ecli.Defragment(ctx, listenURL)
+               return err
+       }
+       return nil
+}
+
+func (s *server) registerDefrag() {
+       var (
+               err        error
+               etcdLogger = logger.GetLogger().Named("etcd-server")
+               defrag     = func(_ time.Time, _ *logger.Logger) bool {
+                       if err = performDefrag(s.listenClientURL, s.ecli); err 
!= nil {
+                               etcdLogger.Error().Err(err).Msg("failed to 
execute defragmentation")
+                               return false
+                       }
+                       return true
+               }
+               parser = cron.Minute | cron.Hour | cron.Dom | cron.Month | 
cron.Dow | cron.Descriptor
+       )
+
+       s.ecli, err = clientv3.New(clientv3.Config{
+               Endpoints: s.listenClientURL,
+       })
+       if err != nil {
+               etcdLogger.Error().Err(err).Msg("failed to create client")
+               return
+       }
+       s.scheduler = timestamp.NewScheduler(etcdLogger, timestamp.NewClock())
+
+       err = s.scheduler.Register("defrag", parser, s.defragCron, defrag)
+       if err != nil {
+               etcdLogger.Error().Err(err).Msg("failed to register 
defragmentation")
+       }
+}
diff --git a/banyand/metadata/embeddedserver/server_test.go 
b/banyand/metadata/embeddedserver/server_test.go
new file mode 100644
index 00000000..acb557cd
--- /dev/null
+++ b/banyand/metadata/embeddedserver/server_test.go
@@ -0,0 +1,118 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package embeddedserver
+
+import (
+       "context"
+       "fmt"
+       "testing"
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       clientv3 "go.etcd.io/etcd/client/v3"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestDefragment(t *testing.T) {
+       gomega.RegisterFailHandler(ginkgo.Fail)
+       ginkgo.RunSpecs(t, "Defragment Suite")
+}
+
+var _ = ginkgo.Describe("Defragment", func() {
+       var (
+               etcdClient          *clientv3.Client
+               etcdServer          embeddedetcd.Server
+               path                string
+               defFn               func()
+               err                 error
+               endpoints, peerURLs []string
+               ports               []int
+       )
+
+       ginkgo.BeforeEach(func() {
+               gomega.Expect(logger.Init(logger.Logging{
+                       Env:   "dev",
+                       Level: "debug",
+               })).To(gomega.Succeed())
+
+               path, defFn, err = test.NewSpace()
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               ports, err = test.AllocateFreePorts(2)
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d";, 
ports[0])}
+               peerURLs = []string{fmt.Sprintf("http://127.0.0.1:%d";, 
ports[1])}
+
+               etcdServer, err = embeddedetcd.NewServer(
+                       embeddedetcd.RootDir(path),
+                       embeddedetcd.ConfigureListener(endpoints, peerURLs),
+                       embeddedetcd.AutoCompactionMode("periodic"),
+                       embeddedetcd.AutoCompactionRetention("1h"),
+                       embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+               )
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               <-etcdServer.ReadyNotify()
+
+               etcdClient, err = clientv3.New(clientv3.Config{
+                       Endpoints:   endpoints,
+                       DialTimeout: 5 * time.Second,
+               })
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       })
+
+       ginkgo.AfterEach(func() {
+               if etcdClient != nil {
+                       err = etcdClient.Close()
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               }
+               if etcdServer != nil {
+                       err = etcdServer.Close()
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+                       <-etcdServer.StopNotify()
+               }
+               defFn()
+       })
+
+       ginkgo.It("should successfully perform defragmentation", func() {
+               ctx := context.Background()
+               for i := 0; i < 100; i++ {
+                       _, err = etcdClient.Put(ctx, fmt.Sprintf("key-%d", i), 
fmt.Sprintf("value-%d", i))
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               }
+
+               for i := 0; i < 50; i++ {
+                       _, err = etcdClient.Delete(ctx, fmt.Sprintf("key-%d", 
i))
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               }
+
+               err = performDefrag(endpoints, etcdClient)
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       })
+
+       ginkgo.It("should handle invalid endpoints", func() {
+               invalidEndpoints := []string{"http://invalid-host:12345"}
+               err = performDefrag(invalidEndpoints, etcdClient)
+               gomega.Expect(err).To(gomega.HaveOccurred())
+       })
+})
diff --git a/banyand/metadata/schema/etcd_test.go 
b/banyand/metadata/schema/etcd_test.go
index d288d8cb..41b66715 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -108,7 +108,11 @@ func initServerAndRegister(t *testing.T) (schema.Registry, 
func()) {
        endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d";, ports[0])}
        server, err := embeddedetcd.NewServer(
                embeddedetcd.ConfigureListener(endpoints, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-               embeddedetcd.RootDir(path))
+               embeddedetcd.RootDir(path),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
        req.NoError(err)
        req.NotNil(server)
        <-server.ReadyNotify()
diff --git a/banyand/metadata/schema/register_test.go 
b/banyand/metadata/schema/register_test.go
index 1e9ceef8..854a3d92 100644
--- a/banyand/metadata/schema/register_test.go
+++ b/banyand/metadata/schema/register_test.go
@@ -65,7 +65,11 @@ var _ = ginkgo.Describe("etcd_register", func() {
                peers = []string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}
                server, err = embeddedetcd.NewServer(
                        embeddedetcd.ConfigureListener(endpoints, peers),
-                       embeddedetcd.RootDir(path))
+                       embeddedetcd.RootDir(path),
+                       embeddedetcd.AutoCompactionMode("periodic"),
+                       embeddedetcd.AutoCompactionRetention("1h"),
+                       embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+               )
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                <-server.ReadyNotify()
                r, err = schema.NewEtcdSchemaRegistry(
@@ -109,7 +113,11 @@ var _ = ginkgo.Describe("etcd_register", func() {
 
                server, err = embeddedetcd.NewServer(
                        embeddedetcd.ConfigureListener(endpoints, peers),
-                       embeddedetcd.RootDir(path))
+                       embeddedetcd.RootDir(path),
+                       embeddedetcd.AutoCompactionMode("periodic"),
+                       embeddedetcd.AutoCompactionRetention("1h"),
+                       embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+               )
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                <-server.ReadyNotify()
 
diff --git a/banyand/metadata/schema/watcher_test.go 
b/banyand/metadata/schema/watcher_test.go
index 20c0b7e1..374f38d3 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -102,7 +102,11 @@ var _ = ginkgo.Describe("Watcher", func() {
                endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d";, 
ports[0])}
                server, err = embeddedetcd.NewServer(
                        embeddedetcd.ConfigureListener(endpoints, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-                       embeddedetcd.RootDir(path))
+                       embeddedetcd.RootDir(path),
+                       embeddedetcd.AutoCompactionMode("periodic"),
+                       embeddedetcd.AutoCompactionRetention("1h"),
+                       embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+               )
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                <-server.ReadyNotify()
                registry, err = schema.NewEtcdSchemaRegistry(
diff --git a/docs/operation/configuration.md b/docs/operation/configuration.md
index 387ad9dc..77d7abdd 100644
--- a/docs/operation/configuration.md
+++ b/docs/operation/configuration.md
@@ -111,6 +111,10 @@ The following flags are used to configure the stream 
storage engine:
 The following flags are used to configure the embedded etcd storage engine 
which is only used when running as a standalone server:
 
 - `--metadata-root-path string`: The root path of metadata (default: "/tmp").
+- `--etcd-auto-compaction-mode string`: The mode to compact the storage 
(default: "periodic").
+- `--etcd-auto-compaction-retention string`: The retention period of the 
storage (default: "1h").
+- `--etcd-defrag-cron string`: The scheduled task to free up disk space 
(default: "@daily").
+- `--quota-backend-bytes bytes`: Quota for backend storage (default: 2.00GiB).
 
 The following flags are used to configure the memory protector:
 
diff --git a/test/integration/distributed/backup/backup_suite_test.go 
b/test/integration/distributed/backup/backup_suite_test.go
index 1258e7ac..f999b218 100644
--- a/test/integration/distributed/backup/backup_suite_test.go
+++ b/test/integration/distributed/backup/backup_suite_test.go
@@ -79,7 +79,11 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
        server, err := embeddedetcd.NewServer(
                embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-               embeddedetcd.RootDir(dir))
+               embeddedetcd.RootDir(dir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
        Expect(err).ShouldNot(HaveOccurred())
        <-server.ReadyNotify()
        By("Loading schema")
diff --git a/test/integration/distributed/lifecycle/lifecycle_suite_test.go 
b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
index 4fc99cbd..c5d1f1a6 100644
--- a/test/integration/distributed/lifecycle/lifecycle_suite_test.go
+++ b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
@@ -77,7 +77,11 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        ep = fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
        server, err := embeddedetcd.NewServer(
                embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-               embeddedetcd.RootDir(srcDir))
+               embeddedetcd.RootDir(srcDir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
        Expect(err).ShouldNot(HaveOccurred())
        <-server.ReadyNotify()
        By("Loading schema")
diff --git a/test/integration/distributed/query/query_suite_test.go 
b/test/integration/distributed/query/query_suite_test.go
index 3d8eaad3..ce6a3c5d 100644
--- a/test/integration/distributed/query/query_suite_test.go
+++ b/test/integration/distributed/query/query_suite_test.go
@@ -76,7 +76,11 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
        server, err := embeddedetcd.NewServer(
                embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-               embeddedetcd.RootDir(dir))
+               embeddedetcd.RootDir(dir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
        Expect(err).ShouldNot(HaveOccurred())
        <-server.ReadyNotify()
        By("Loading schema")
diff --git a/test/integration/distributed/setup/setup_suite_test.go 
b/test/integration/distributed/setup/setup_suite_test.go
index 3a51ab8c..9d1baf71 100644
--- a/test/integration/distributed/setup/setup_suite_test.go
+++ b/test/integration/distributed/setup/setup_suite_test.go
@@ -56,7 +56,11 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
        server, err := embeddedetcd.NewServer(
                embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-               embeddedetcd.RootDir(dir))
+               embeddedetcd.RootDir(dir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
        Expect(err).ShouldNot(HaveOccurred())
        <-server.ReadyNotify()
        deferFunc = func() {
diff --git a/test/integration/standalone/other/disk_test.go 
b/test/integration/standalone/other/disk_test.go
index 880ca963..d2741447 100644
--- a/test/integration/standalone/other/disk_test.go
+++ b/test/integration/standalone/other/disk_test.go
@@ -82,7 +82,11 @@ var _ = g.Describe("Disk", func() {
                ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
                server, err := embeddedetcd.NewServer(
                        embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-                       embeddedetcd.RootDir(dir))
+                       embeddedetcd.RootDir(dir),
+                       embeddedetcd.AutoCompactionMode("periodic"),
+                       embeddedetcd.AutoCompactionRetention("1h"),
+                       embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+               )
                gm.Expect(err).ShouldNot(gm.HaveOccurred())
                <-server.ReadyNotify()
                g.By("Loading schema")

Reply via email to