This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch property-ttl in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit f3874f3ecfeb3eb4bb695f4b524fb53c4d1a4235 Author: Gao Hongtao <[email protected]> AuthorDate: Sun Sep 17 02:16:04 2023 +0000 Apply TTL to a property Signed-off-by: Gao Hongtao <[email protected]> --- api/proto/banyandb/property/v1/property.proto | 7 + banyand/metadata/schema/property.go | 163 ++++++++++++++++++--- bydbctl/internal/cmd/property_test.go | 18 ++- docs/api-reference.md | 2 + docs/crud/property.md | 17 +++ test/integration/standalone/other/property_test.go | 42 ++++++ 6 files changed, 221 insertions(+), 28 deletions(-) diff --git a/api/proto/banyandb/property/v1/property.proto b/api/proto/banyandb/property/v1/property.proto index 6e6b5d86..40aac5f6 100644 --- a/api/proto/banyandb/property/v1/property.proto +++ b/api/proto/banyandb/property/v1/property.proto @@ -43,4 +43,11 @@ message Property { repeated model.v1.Tag tags = 2 [(validate.rules).repeated.min_items = 1]; // updated_at indicates when the property is updated google.protobuf.Timestamp updated_at = 3; + // readonly. lease_id is the ID of the lease that attached to key. + int64 lease_id = 4; + // ttl indicates the time to live of the property. + // It's a string in the format of "1h", "2m", "3s", "1500ms". + // It defaults to 0s, which means the property never expires. + // The minimum allowed ttl is 1s. + string ttl = 5; } diff --git a/banyand/metadata/schema/property.go b/banyand/metadata/schema/property.go index bb76f0a8..44dad59b 100644 --- a/banyand/metadata/schema/property.go +++ b/banyand/metadata/schema/property.go @@ -20,12 +20,17 @@ package schema import ( "context" "path" + "time" "github.com/pkg/errors" + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) const all = "*" @@ -33,11 +38,28 @@ const all = "*" var propertyKeyPrefix = "/properties/" func (e *etcdSchemaRegistry) GetProperty(ctx context.Context, metadata *propertyv1.Metadata, tags []string) (*propertyv1.Property, error) { - var entity propertyv1.Property - if err := e.get(ctx, formatPropertyKey(transformKey(metadata)), &entity); err != nil { + if !e.closer.AddRunning() { + return nil, ErrClosed + } + defer e.closer.Done() + key := e.prependNamespace(formatPropertyKey(transformKey(metadata))) + resp, err := e.client.Get(ctx, key) + if err != nil { + return nil, err + } + if resp.Count == 0 { + return nil, ErrGRPCResourceNotFound + } + if resp.Count > 1 { + return nil, errUnexpectedNumberOfEntities + } + var property propertyv1.Property + if err = proto.Unmarshal(resp.Kvs[0].Value, &property); err != nil { return nil, err } - return filterTags(&entity, tags), nil + property.Metadata.Container.CreateRevision = resp.Kvs[0].CreateRevision + property.Metadata.Container.ModRevision = resp.Kvs[0].ModRevision + return filterTags(&property, tags), nil } func filterTags(property *propertyv1.Property, tags []string) *propertyv1.Property { @@ -84,14 +106,15 @@ func (e *etcdSchemaRegistry) ListProperty(ctx context.Context, container *common } func (e *etcdSchemaRegistry) ApplyProperty(ctx context.Context, property *propertyv1.Property, strategy propertyv1.ApplyRequest_Strategy) (bool, uint32, error) { + if !e.closer.AddRunning() { + return false, 0, ErrClosed + } + defer e.closer.Done() m := transformKey(property.GetMetadata()) group := m.GetGroup() if _, getGroupErr := e.GetGroup(ctx, group); getGroupErr != nil { return false, 0, errors.Wrap(getGroupErr, "group is not exist") } - if property.UpdatedAt != nil { - property.UpdatedAt = timestamppb.Now() - } md := Metadata{ TypeMeta: TypeMeta{ Kind: KindProperty, @@ -100,37 +123,133 @@ func (e *etcdSchemaRegistry) ApplyProperty(ctx context.Context, property *proper }, Spec: property, } - tagsNum := uint32(len(property.Tags)) - err := e.create(ctx, md) - if err == nil { - return true, tagsNum, nil - } - if !errors.Is(err, errGRPCAlreadyExists) { + key, err := md.key() + if err != nil { return false, 0, err } - if strategy != propertyv1.ApplyRequest_STRATEGY_REPLACE { - existed, errGet := e.GetProperty(ctx, property.Metadata, nil) - if errGet != nil { - return false, 0, errGet + key = e.prependNamespace(key) + var ttl int64 + if property.Ttl != "" { + t, err := timestamp.ParseDuration(property.Ttl) + if err != nil { + return false, 0, err } + if t < time.Second { + return false, 0, errors.New("ttl should be greater than 1s") + } + ttl = int64(t / time.Second) + } + if strategy == propertyv1.ApplyRequest_STRATEGY_REPLACE { + return e.replaceProperty(ctx, key, property, ttl) + } + return e.mergeProperty(ctx, key, property, ttl) +} + +func (e *etcdSchemaRegistry) replaceProperty(ctx context.Context, key string, property *propertyv1.Property, ttl int64) (bool, uint32, error) { + val, opts, err := e.marshalProperty(ctx, property, ttl) + if err != nil { + return false, 0, err + } + _, err = e.client.Put(ctx, key, string(val), opts...) + if err != nil { + return false, 0, err + } + return true, uint32(len(property.Tags)), nil +} + +func (e *etcdSchemaRegistry) mergeProperty(ctx context.Context, key string, property *propertyv1.Property, ttl int64) (bool, uint32, error) { + tagsNum := uint32(len(property.Tags)) + existed, errGet := e.GetProperty(ctx, property.Metadata, nil) + if errors.Is(errGet, ErrGRPCResourceNotFound) { + return e.replaceProperty(ctx, key, property, ttl) + } + if errGet != nil { + return false, 0, errGet + } + var prevLeaseID int64 + merge := func(existed *propertyv1.Property) (*propertyv1.Property, error) { + tags := make([]*modelv1.Tag, len(property.Tags)) + copy(tags, property.Tags) for i := 0; i < int(tagsNum); i++ { - t := property.Tags[0] - property.Tags = property.Tags[1:] + t := tags[0] + tags = tags[1:] for _, et := range existed.Tags { if et.Key == t.Key { et.Value = t.Value } } } - existed.Tags = append(existed.Tags, property.Tags...) - md.Spec = existed + existed.Tags = append(existed.Tags, tags...) + prevLeaseID = existed.LeaseId + val, opts, err := e.marshalProperty(ctx, existed, ttl) + if err != nil { + return nil, err + } + txnResp, err := e.client.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(key), "=", existed.Metadata.Container.ModRevision), + ).Then( + clientv3.OpPut(key, string(val), opts...), + ).Else( + clientv3.OpGet(key), + ).Commit() + if err != nil { + return nil, err + } + if txnResp.Succeeded { + return nil, nil + } + getResp := txnResp.Responses[0].GetResponseRange() + if getResp.Count < 1 { + return nil, ErrGRPCResourceNotFound + } + p := new(propertyv1.Property) + if err = proto.Unmarshal(getResp.Kvs[0].Value, p); err != nil { + return nil, err + } + return p, nil } - if err = e.update(ctx, md); err != nil { - return false, 0, err + // Self-spin to merge property + var err error + for i := 0; i < 10; i++ { + existed, err = merge(existed) + if errors.Is(err, ErrGRPCResourceNotFound) { + return e.replaceProperty(ctx, key, property, ttl) + } + if err != nil { + return false, 0, err + } + if existed == nil { + break + } + time.Sleep(time.Millisecond * 100) + } + if existed != nil { + return false, 0, errors.New("merge property failed: retry timeout") + } + if prevLeaseID > 0 { + _, _ = e.client.Revoke(ctx, clientv3.LeaseID(prevLeaseID)) } return false, tagsNum, nil } +func (e *etcdSchemaRegistry) marshalProperty(ctx context.Context, property *propertyv1.Property, ttl int64) ([]byte, []clientv3.OpOption, error) { + var opts []clientv3.OpOption + if ttl > 0 { + lease, err := e.client.Grant(ctx, ttl) + if err != nil { + return nil, nil, err + } + property.LeaseId = int64(lease.ID) + opts = append(opts, clientv3.WithLease(lease.ID)) + } + property.UpdatedAt = timestamppb.Now() + val, err := proto.Marshal(property) + if err != nil { + return nil, nil, err + } + return val, opts, nil +} + func (e *etcdSchemaRegistry) DeleteProperty(ctx context.Context, metadata *propertyv1.Metadata, tags []string) (bool, uint32, error) { if len(tags) == 0 || tags[0] == all { m := transformKey(metadata) diff --git a/bydbctl/internal/cmd/property_test.go b/bydbctl/internal/cmd/property_test.go index 191591c6..86fc241a 100644 --- a/bydbctl/internal/cmd/property_test.go +++ b/bydbctl/internal/cmd/property_test.go @@ -27,6 +27,7 @@ import ( "github.com/zenizh/go-capturer" "google.golang.org/protobuf/testing/protocmp" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd" "github.com/apache/skywalking-banyandb/pkg/test/flags" @@ -34,6 +35,14 @@ import ( "github.com/apache/skywalking-banyandb/pkg/test/setup" ) +var equalsOpts = []cmp.Option{ + protocmp.Transform(), + protocmp.IgnoreUnknown(), + protocmp.IgnoreFields(&propertyv1.Property{}, "updated_at"), + protocmp.IgnoreFields(&commonv1.Metadata{}, "mod_revision"), + protocmp.IgnoreFields(&commonv1.Metadata{}, "create_revision"), +} + var _ = Describe("Property Operation", func() { var addr string var deferFunc func() @@ -115,8 +124,7 @@ metadata: resp := new(propertyv1.GetResponse) helpers.UnmarshalYAML([]byte(out), resp) Expect(cmp.Equal(resp.Property, p1Proto, - protocmp.IgnoreUnknown(), - protocmp.Transform())).To(BeTrue()) + equalsOpts...)).To(BeTrue()) }) It("gets a tag", func() { @@ -149,8 +157,7 @@ metadata: resp := new(propertyv1.GetResponse) helpers.UnmarshalYAML([]byte(out), resp) Expect(cmp.Equal(resp.Property, p1Proto, - protocmp.IgnoreUnknown(), - protocmp.Transform())).To(BeTrue()) + equalsOpts...)).To(BeTrue()) }) It("update property", func() { @@ -182,8 +189,7 @@ tags: helpers.UnmarshalYAML([]byte(out), resp) Expect(cmp.Equal(resp.Property, p2Proto, - protocmp.IgnoreUnknown(), - protocmp.Transform())).To(BeTrue()) + equalsOpts...)).To(BeTrue()) }) It("delete property", func() { diff --git a/docs/api-reference.md b/docs/api-reference.md index 96379185..f20926e6 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -2637,6 +2637,8 @@ Property stores the user defined data | metadata | [Metadata](#banyandb-property-v1-Metadata) | | metadata is the identity of a property | | tags | [banyandb.model.v1.Tag](#banyandb-model-v1-Tag) | repeated | tag stores the content of a property | | updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | updated_at indicates when the property is updated | +| lease_id | [int64](#int64) | | readonly. lease_id is the ID of the lease that attached to key. | +| ttl | [string](#string) | | ttl indicates the time to live of the property. It's a string in the format of "1h", "2m", "3s", "1500ms". It defaults to 0s, which means the property never expires. The minimum allowed ttl is 1s. | diff --git a/docs/crud/property.md b/docs/crud/property.md index 7f28d856..4c010e7c 100644 --- a/docs/crud/property.md +++ b/docs/crud/property.md @@ -61,6 +61,23 @@ tags: EOF ``` +TTL is supported in the operation. + +```shell +$ bydbctl property apply -f - <<EOF +metadata: + container: + group: sw + name: ui_template + id: General-Service +tags: +- key: state + value: + str: + value: "failed" +ttl: "1h" +``` + ## Get operation Get operation gets a property. diff --git a/test/integration/standalone/other/property_test.go b/test/integration/standalone/other/property_test.go index 79f8f38b..2e614911 100644 --- a/test/integration/standalone/other/property_test.go +++ b/test/integration/standalone/other/property_test.go @@ -101,6 +101,48 @@ var _ = Describe("Property application", func() { {Key: "t2", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v22"}}}}, })) }) + It("applies a property with TTL", func() { + md := &propertyv1.Metadata{ + Container: &commonv1.Metadata{ + Name: "p", + Group: "g", + }, + Id: "1", + } + resp, err := client.Apply(context.Background(), &propertyv1.ApplyRequest{Property: &propertyv1.Property{ + Metadata: md, + Tags: []*modelv1.Tag{ + {Key: "t1", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v1"}}}}, + {Key: "t2", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v2"}}}}, + }, + Ttl: "1h", + }}) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Created).To(BeTrue()) + Expect(resp.TagsNum).To(Equal(uint32(2))) + got, err := client.Get(context.Background(), &propertyv1.GetRequest{Metadata: md}) + Expect(err).NotTo(HaveOccurred()) + Expect(got.Property.Tags).To(Equal([]*modelv1.Tag{ + {Key: "t1", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v1"}}}}, + {Key: "t2", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v2"}}}}, + })) + Expect(got.Property.GetTtl()).To(Equal("1h")) + Expect(got.Property.GetLeaseId()).To(BeNumerically(">", 0)) + resp, err = client.Apply(context.Background(), &propertyv1.ApplyRequest{Property: &propertyv1.Property{ + Metadata: md, + Tags: []*modelv1.Tag{ + {Key: "t2", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v22"}}}}, + }, + Ttl: "1s", + }}) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Created).To(BeFalse()) + Expect(resp.TagsNum).To(Equal(uint32(1))) + Eventually(func() error { + _, err := client.Get(context.Background(), &propertyv1.GetRequest{Metadata: md}) + return err + }, flags.EventuallyTimeout).Should(MatchError("rpc error: code = NotFound desc = banyandb: resource not found")) + }) }) var _ = Describe("Property application", func() {
