This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch property-ttl
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit f3874f3ecfeb3eb4bb695f4b524fb53c4d1a4235
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Sep 17 02:16:04 2023 +0000

    Apply TTL to a property
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 api/proto/banyandb/property/v1/property.proto      |   7 +
 banyand/metadata/schema/property.go                | 163 ++++++++++++++++++---
 bydbctl/internal/cmd/property_test.go              |  18 ++-
 docs/api-reference.md                              |   2 +
 docs/crud/property.md                              |  17 +++
 test/integration/standalone/other/property_test.go |  42 ++++++
 6 files changed, 221 insertions(+), 28 deletions(-)

diff --git a/api/proto/banyandb/property/v1/property.proto 
b/api/proto/banyandb/property/v1/property.proto
index 6e6b5d86..40aac5f6 100644
--- a/api/proto/banyandb/property/v1/property.proto
+++ b/api/proto/banyandb/property/v1/property.proto
@@ -43,4 +43,11 @@ message Property {
   repeated model.v1.Tag tags = 2 [(validate.rules).repeated.min_items = 1];
   // updated_at indicates when the property is updated
   google.protobuf.Timestamp updated_at = 3;
+   // readonly. lease_id is the ID of the lease that attached to key.
+  int64 lease_id = 4;
+  // ttl indicates the time to live of the property.
+  // It's a string in the format of "1h", "2m", "3s", "1500ms".
+  // It defaults to 0s, which means the property never expires.
+  // The minimum allowed ttl is 1s.
+  string ttl = 5;
 }
diff --git a/banyand/metadata/schema/property.go 
b/banyand/metadata/schema/property.go
index bb76f0a8..44dad59b 100644
--- a/banyand/metadata/schema/property.go
+++ b/banyand/metadata/schema/property.go
@@ -20,12 +20,17 @@ package schema
 import (
        "context"
        "path"
+       "time"
 
        "github.com/pkg/errors"
+       clientv3 "go.etcd.io/etcd/client/v3"
+       "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/types/known/timestamppb"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 const all = "*"
@@ -33,11 +38,28 @@ const all = "*"
 var propertyKeyPrefix = "/properties/"
 
 func (e *etcdSchemaRegistry) GetProperty(ctx context.Context, metadata 
*propertyv1.Metadata, tags []string) (*propertyv1.Property, error) {
-       var entity propertyv1.Property
-       if err := e.get(ctx, formatPropertyKey(transformKey(metadata)), 
&entity); err != nil {
+       if !e.closer.AddRunning() {
+               return nil, ErrClosed
+       }
+       defer e.closer.Done()
+       key := e.prependNamespace(formatPropertyKey(transformKey(metadata)))
+       resp, err := e.client.Get(ctx, key)
+       if err != nil {
+               return nil, err
+       }
+       if resp.Count == 0 {
+               return nil, ErrGRPCResourceNotFound
+       }
+       if resp.Count > 1 {
+               return nil, errUnexpectedNumberOfEntities
+       }
+       var property propertyv1.Property
+       if err = proto.Unmarshal(resp.Kvs[0].Value, &property); err != nil {
                return nil, err
        }
-       return filterTags(&entity, tags), nil
+       property.Metadata.Container.CreateRevision = resp.Kvs[0].CreateRevision
+       property.Metadata.Container.ModRevision = resp.Kvs[0].ModRevision
+       return filterTags(&property, tags), nil
 }
 
 func filterTags(property *propertyv1.Property, tags []string) 
*propertyv1.Property {
@@ -84,14 +106,15 @@ func (e *etcdSchemaRegistry) ListProperty(ctx 
context.Context, container *common
 }
 
 func (e *etcdSchemaRegistry) ApplyProperty(ctx context.Context, property 
*propertyv1.Property, strategy propertyv1.ApplyRequest_Strategy) (bool, uint32, 
error) {
+       if !e.closer.AddRunning() {
+               return false, 0, ErrClosed
+       }
+       defer e.closer.Done()
        m := transformKey(property.GetMetadata())
        group := m.GetGroup()
        if _, getGroupErr := e.GetGroup(ctx, group); getGroupErr != nil {
                return false, 0, errors.Wrap(getGroupErr, "group is not exist")
        }
-       if property.UpdatedAt != nil {
-               property.UpdatedAt = timestamppb.Now()
-       }
        md := Metadata{
                TypeMeta: TypeMeta{
                        Kind:  KindProperty,
@@ -100,37 +123,133 @@ func (e *etcdSchemaRegistry) ApplyProperty(ctx 
context.Context, property *proper
                },
                Spec: property,
        }
-       tagsNum := uint32(len(property.Tags))
-       err := e.create(ctx, md)
-       if err == nil {
-               return true, tagsNum, nil
-       }
-       if !errors.Is(err, errGRPCAlreadyExists) {
+       key, err := md.key()
+       if err != nil {
                return false, 0, err
        }
-       if strategy != propertyv1.ApplyRequest_STRATEGY_REPLACE {
-               existed, errGet := e.GetProperty(ctx, property.Metadata, nil)
-               if errGet != nil {
-                       return false, 0, errGet
+       key = e.prependNamespace(key)
+       var ttl int64
+       if property.Ttl != "" {
+               t, err := timestamp.ParseDuration(property.Ttl)
+               if err != nil {
+                       return false, 0, err
                }
+               if t < time.Second {
+                       return false, 0, errors.New("ttl should be greater than 
1s")
+               }
+               ttl = int64(t / time.Second)
+       }
+       if strategy == propertyv1.ApplyRequest_STRATEGY_REPLACE {
+               return e.replaceProperty(ctx, key, property, ttl)
+       }
+       return e.mergeProperty(ctx, key, property, ttl)
+}
+
+func (e *etcdSchemaRegistry) replaceProperty(ctx context.Context, key string, 
property *propertyv1.Property, ttl int64) (bool, uint32, error) {
+       val, opts, err := e.marshalProperty(ctx, property, ttl)
+       if err != nil {
+               return false, 0, err
+       }
+       _, err = e.client.Put(ctx, key, string(val), opts...)
+       if err != nil {
+               return false, 0, err
+       }
+       return true, uint32(len(property.Tags)), nil
+}
+
+func (e *etcdSchemaRegistry) mergeProperty(ctx context.Context, key string, 
property *propertyv1.Property, ttl int64) (bool, uint32, error) {
+       tagsNum := uint32(len(property.Tags))
+       existed, errGet := e.GetProperty(ctx, property.Metadata, nil)
+       if errors.Is(errGet, ErrGRPCResourceNotFound) {
+               return e.replaceProperty(ctx, key, property, ttl)
+       }
+       if errGet != nil {
+               return false, 0, errGet
+       }
+       var prevLeaseID int64
+       merge := func(existed *propertyv1.Property) (*propertyv1.Property, 
error) {
+               tags := make([]*modelv1.Tag, len(property.Tags))
+               copy(tags, property.Tags)
                for i := 0; i < int(tagsNum); i++ {
-                       t := property.Tags[0]
-                       property.Tags = property.Tags[1:]
+                       t := tags[0]
+                       tags = tags[1:]
                        for _, et := range existed.Tags {
                                if et.Key == t.Key {
                                        et.Value = t.Value
                                }
                        }
                }
-               existed.Tags = append(existed.Tags, property.Tags...)
-               md.Spec = existed
+               existed.Tags = append(existed.Tags, tags...)
+               prevLeaseID = existed.LeaseId
+               val, opts, err := e.marshalProperty(ctx, existed, ttl)
+               if err != nil {
+                       return nil, err
+               }
+               txnResp, err := e.client.Txn(ctx).If(
+                       clientv3.Compare(clientv3.ModRevision(key), "=", 
existed.Metadata.Container.ModRevision),
+               ).Then(
+                       clientv3.OpPut(key, string(val), opts...),
+               ).Else(
+                       clientv3.OpGet(key),
+               ).Commit()
+               if err != nil {
+                       return nil, err
+               }
+               if txnResp.Succeeded {
+                       return nil, nil
+               }
+               getResp := txnResp.Responses[0].GetResponseRange()
+               if getResp.Count < 1 {
+                       return nil, ErrGRPCResourceNotFound
+               }
+               p := new(propertyv1.Property)
+               if err = proto.Unmarshal(getResp.Kvs[0].Value, p); err != nil {
+                       return nil, err
+               }
+               return p, nil
        }
-       if err = e.update(ctx, md); err != nil {
-               return false, 0, err
+       // Self-spin to merge property
+       var err error
+       for i := 0; i < 10; i++ {
+               existed, err = merge(existed)
+               if errors.Is(err, ErrGRPCResourceNotFound) {
+                       return e.replaceProperty(ctx, key, property, ttl)
+               }
+               if err != nil {
+                       return false, 0, err
+               }
+               if existed == nil {
+                       break
+               }
+               time.Sleep(time.Millisecond * 100)
+       }
+       if existed != nil {
+               return false, 0, errors.New("merge property failed: retry 
timeout")
+       }
+       if prevLeaseID > 0 {
+               _, _ = e.client.Revoke(ctx, clientv3.LeaseID(prevLeaseID))
        }
        return false, tagsNum, nil
 }
 
+func (e *etcdSchemaRegistry) marshalProperty(ctx context.Context, property 
*propertyv1.Property, ttl int64) ([]byte, []clientv3.OpOption, error) {
+       var opts []clientv3.OpOption
+       if ttl > 0 {
+               lease, err := e.client.Grant(ctx, ttl)
+               if err != nil {
+                       return nil, nil, err
+               }
+               property.LeaseId = int64(lease.ID)
+               opts = append(opts, clientv3.WithLease(lease.ID))
+       }
+       property.UpdatedAt = timestamppb.Now()
+       val, err := proto.Marshal(property)
+       if err != nil {
+               return nil, nil, err
+       }
+       return val, opts, nil
+}
+
 func (e *etcdSchemaRegistry) DeleteProperty(ctx context.Context, metadata 
*propertyv1.Metadata, tags []string) (bool, uint32, error) {
        if len(tags) == 0 || tags[0] == all {
                m := transformKey(metadata)
diff --git a/bydbctl/internal/cmd/property_test.go 
b/bydbctl/internal/cmd/property_test.go
index 191591c6..86fc241a 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -27,6 +27,7 @@ import (
        "github.com/zenizh/go-capturer"
        "google.golang.org/protobuf/testing/protocmp"
 
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
@@ -34,6 +35,14 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
 )
 
+var equalsOpts = []cmp.Option{
+       protocmp.Transform(),
+       protocmp.IgnoreUnknown(),
+       protocmp.IgnoreFields(&propertyv1.Property{}, "updated_at"),
+       protocmp.IgnoreFields(&commonv1.Metadata{}, "mod_revision"),
+       protocmp.IgnoreFields(&commonv1.Metadata{}, "create_revision"),
+}
+
 var _ = Describe("Property Operation", func() {
        var addr string
        var deferFunc func()
@@ -115,8 +124,7 @@ metadata:
                resp := new(propertyv1.GetResponse)
                helpers.UnmarshalYAML([]byte(out), resp)
                Expect(cmp.Equal(resp.Property, p1Proto,
-                       protocmp.IgnoreUnknown(),
-                       protocmp.Transform())).To(BeTrue())
+                       equalsOpts...)).To(BeTrue())
        })
 
        It("gets a tag", func() {
@@ -149,8 +157,7 @@ metadata:
                resp := new(propertyv1.GetResponse)
                helpers.UnmarshalYAML([]byte(out), resp)
                Expect(cmp.Equal(resp.Property, p1Proto,
-                       protocmp.IgnoreUnknown(),
-                       protocmp.Transform())).To(BeTrue())
+                       equalsOpts...)).To(BeTrue())
        })
 
        It("update property", func() {
@@ -182,8 +189,7 @@ tags:
                helpers.UnmarshalYAML([]byte(out), resp)
 
                Expect(cmp.Equal(resp.Property, p2Proto,
-                       protocmp.IgnoreUnknown(),
-                       protocmp.Transform())).To(BeTrue())
+                       equalsOpts...)).To(BeTrue())
        })
 
        It("delete property", func() {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 96379185..f20926e6 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2637,6 +2637,8 @@ Property stores the user defined data
 | metadata | [Metadata](#banyandb-property-v1-Metadata) |  | metadata is the 
identity of a property |
 | tags | [banyandb.model.v1.Tag](#banyandb-model-v1-Tag) | repeated | tag 
stores the content of a property |
 | updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  | 
updated_at indicates when the property is updated |
+| lease_id | [int64](#int64) |  | readonly. lease_id is the ID of the lease 
that attached to key. |
+| ttl | [string](#string) |  | ttl indicates the time to live of the property. 
It&#39;s a string in the format of &#34;1h&#34;, &#34;2m&#34;, &#34;3s&#34;, 
&#34;1500ms&#34;. It defaults to 0s, which means the property never expires. 
The minimum allowed ttl is 1s. |
 
 
 
diff --git a/docs/crud/property.md b/docs/crud/property.md
index 7f28d856..4c010e7c 100644
--- a/docs/crud/property.md
+++ b/docs/crud/property.md
@@ -61,6 +61,23 @@ tags:
 EOF
 ```
 
+TTL is supported in the operation.
+
+```shell
+$ bydbctl property apply -f - <<EOF
+metadata:
+  container:
+    group: sw
+    name: ui_template
+  id: General-Service
+tags:
+- key: state
+  value:
+    str:
+      value: "failed"
+ttl: "1h"
+```
+
 ## Get operation
 
 Get operation gets a property.
diff --git a/test/integration/standalone/other/property_test.go 
b/test/integration/standalone/other/property_test.go
index 79f8f38b..2e614911 100644
--- a/test/integration/standalone/other/property_test.go
+++ b/test/integration/standalone/other/property_test.go
@@ -101,6 +101,48 @@ var _ = Describe("Property application", func() {
                        {Key: "t2", Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v22"}}}},
                }))
        })
+       It("applies a property with TTL", func() {
+               md := &propertyv1.Metadata{
+                       Container: &commonv1.Metadata{
+                               Name:  "p",
+                               Group: "g",
+                       },
+                       Id: "1",
+               }
+               resp, err := client.Apply(context.Background(), 
&propertyv1.ApplyRequest{Property: &propertyv1.Property{
+                       Metadata: md,
+                       Tags: []*modelv1.Tag{
+                               {Key: "t1", Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v1"}}}},
+                               {Key: "t2", Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v2"}}}},
+                       },
+                       Ttl: "1h",
+               }})
+               Expect(err).NotTo(HaveOccurred())
+               Expect(resp.Created).To(BeTrue())
+               Expect(resp.TagsNum).To(Equal(uint32(2)))
+               got, err := client.Get(context.Background(), 
&propertyv1.GetRequest{Metadata: md})
+               Expect(err).NotTo(HaveOccurred())
+               Expect(got.Property.Tags).To(Equal([]*modelv1.Tag{
+                       {Key: "t1", Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v1"}}}},
+                       {Key: "t2", Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v2"}}}},
+               }))
+               Expect(got.Property.GetTtl()).To(Equal("1h"))
+               Expect(got.Property.GetLeaseId()).To(BeNumerically(">", 0))
+               resp, err = client.Apply(context.Background(), 
&propertyv1.ApplyRequest{Property: &propertyv1.Property{
+                       Metadata: md,
+                       Tags: []*modelv1.Tag{
+                               {Key: "t2", Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v22"}}}},
+                       },
+                       Ttl: "1s",
+               }})
+               Expect(err).NotTo(HaveOccurred())
+               Expect(resp.Created).To(BeFalse())
+               Expect(resp.TagsNum).To(Equal(uint32(1)))
+               Eventually(func() error {
+                       _, err := client.Get(context.Background(), 
&propertyv1.GetRequest{Metadata: md})
+                       return err
+               }, flags.EventuallyTimeout).Should(MatchError("rpc error: code 
= NotFound desc = banyandb: resource not found"))
+       })
 })
 
 var _ = Describe("Property application", func() {

Reply via email to