This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new bc60047d Implement versioning properties and replace physical deletion with the tombstone mechanism for the property database (#681) bc60047d is described below commit bc60047db2f2d8acd9ca80d3a633e93612046b16 Author: mrproliu <741550...@qq.com> AuthorDate: Tue Jun 17 10:33:13 2025 +0800 Implement versioning properties and replace physical deletion with the tombstone mechanism for the property database (#681) --- CHANGES.md | 1 + api/proto/banyandb/property/v1/rpc.proto | 3 + banyand/liaison/grpc/property.go | 261 ++++++++++++++++-------- banyand/liaison/grpc/server.go | 9 +- banyand/property/db.go | 13 +- banyand/property/listener.go | 12 +- banyand/property/property.go | 19 +- banyand/property/shard.go | 105 ++++++++-- bydbctl/internal/cmd/property_test.go | 327 ++++++++++++++++++++++--------- docs/api-reference.md | 1 + pkg/convert/number.go | 16 ++ pkg/convert/number_test.go | 19 ++ pkg/index/index.go | 4 +- pkg/index/inverted/inverted.go | 4 + pkg/index/inverted/inverted_series.go | 9 + pkg/test/setup/setup.go | 47 +++-- ui/README.md | 2 +- 17 files changed, 627 insertions(+), 225 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e819a022..43cc1e2b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ Release Notes. - Replica: Support configurable replica count on Group. - Replica: Move the TopN pre-calculation flow from the Data Node to the Liaison Node. - Add a wait and retry to write handlers to avoid the local metadata cache being loaded. +- Implement versioning properties and replace physical deletion with the tombstone mechanism for the property database. ### Bug Fixes diff --git a/api/proto/banyandb/property/v1/rpc.proto b/api/proto/banyandb/property/v1/rpc.proto index 8f70291b..4ea60cef 100644 --- a/api/proto/banyandb/property/v1/rpc.proto +++ b/api/proto/banyandb/property/v1/rpc.proto @@ -122,4 +122,7 @@ message InternalDeleteRequest { message InternalQueryResponse { repeated bytes sources = 1; common.v1.Trace trace = 2; + // deletes indicates the property is deleted or not + // it's mapping to the sources in the same order + repeated bool deletes = 3; } diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go index b26569be..61c4808a 100644 --- a/banyand/liaison/grpc/property.go +++ b/banyand/liaison/grpc/property.go @@ -37,8 +37,6 @@ package grpc import ( "context" "math" - "strconv" - "strings" "time" "github.com/pkg/errors" @@ -54,6 +52,7 @@ import ( propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + propertypkg "github.com/apache/skywalking-banyandb/banyand/property" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" @@ -67,6 +66,7 @@ const defaultQueryTimeout = 10 * time.Second type propertyServer struct { propertyv1.UnimplementedPropertyServiceServer + *discoveryService schemaRegistry metadata.Repo pipeline queue.Client nodeRegistry NodeRegistry @@ -139,7 +139,7 @@ func (ps *propertyServer) Apply(ctx context.Context, req *propertyv1.ApplyReques return nil, errors.Errorf("property %s tag %s not found", property.Metadata.Name, tag.Key) } } - qResp, err := ps.Query(ctx, &propertyv1.QueryRequest{ + nodeProperties, _, err := ps.queryProperties(ctx, &propertyv1.QueryRequest{ Groups: []string{g}, Name: property.Metadata.Name, Ids: []string{property.Id}, @@ -147,41 +147,71 @@ func (ps *propertyServer) Apply(ctx context.Context, req *propertyv1.ApplyReques if err != nil { return nil, err } - var prev *propertyv1.Property - if len(qResp.Properties) > 0 { - prev = qResp.Properties[0] - defer func() { - if err == nil { - var ids [][]byte - for _, p := range qResp.Properties { - ids = append(ids, getPropertyID(p)) - } - if err = ps.remove(ids); err != nil { - err = multierr.Append(err, errors.New("fail to remove old properties")) - } - } - }() - } - entity := getEntity(property) + prevPropertyWithMetadata, olderProperties := ps.findPrevAndOlderProperties(nodeProperties) + entity := propertypkg.GetEntity(property) id, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum) if err != nil { return nil, err } - node, err := ps.nodeRegistry.Locate(g, entity, uint32(id), 0) - if err != nil { - return nil, err + copies, ok := ps.groupRepo.copies(property.Metadata.GetGroup()) + if !ok { + return nil, errors.New("failed to get group copies") + } + + nodes := make([]string, 0, copies) + var nodeID string + for i := range copies { + nodeID, err = ps.nodeRegistry.Locate(property.GetMetadata().GetGroup(), property.GetMetadata().GetName(), uint32(id), i) + if err != nil { + return nil, err + } + nodes = append(nodes, nodeID) } + var prev *propertyv1.Property + if prevPropertyWithMetadata != nil && !prevPropertyWithMetadata.deleted { + prev = prevPropertyWithMetadata.Property + } + defer func() { + // if their no older properties or have error when apply the new property + // then ignore cleanup the older properties + if len(olderProperties) == 0 || err != nil { + return + } + var ids [][]byte + for _, p := range olderProperties { + ids = append(ids, propertypkg.GetPropertyID(p.Property)) + } + _ = ps.remove(ids) + }() if req.Strategy == propertyv1.ApplyRequest_STRATEGY_REPLACE { - return ps.replaceProperty(ctx, start, uint64(id), node, prev, property) + return ps.replaceProperty(ctx, start, uint64(id), nodes, prev, property) + } + return ps.mergeProperty(ctx, start, uint64(id), nodes, prev, property) +} + +func (ps *propertyServer) findPrevAndOlderProperties(nodeProperties [][]*propertyWithMetadata) (*propertyWithMetadata, []*propertyWithMetadata) { + var prevPropertyWithMetadata *propertyWithMetadata + var olderProperties []*propertyWithMetadata + for _, properties := range nodeProperties { + for _, p := range properties { + // if the property is not deleted, then added to the older properties list to delete after apply success + if !p.deleted { + olderProperties = append(olderProperties, p) + } + // update the prov property + if prevPropertyWithMetadata == nil || p.Metadata.ModRevision > prevPropertyWithMetadata.Metadata.ModRevision { + prevPropertyWithMetadata = p + } + } } - return ps.mergeProperty(ctx, start, uint64(id), node, prev, property) + return prevPropertyWithMetadata, olderProperties } -func (ps *propertyServer) mergeProperty(ctx context.Context, now time.Time, shardID uint64, node string, +func (ps *propertyServer) mergeProperty(ctx context.Context, now time.Time, shardID uint64, nodes []string, prev, cur *propertyv1.Property, ) (*propertyv1.ApplyResponse, error) { if prev == nil { - return ps.replaceProperty(ctx, now, shardID, node, prev, cur) + return ps.replaceProperty(ctx, now, shardID, nodes, prev, cur) } tagCount, err := tagLen(prev) if err != nil { @@ -202,7 +232,7 @@ func (ps *propertyServer) mergeProperty(ctx context.Context, now time.Time, shar } } cur.Tags = append(cur.Tags, tags...) - return ps.replaceProperty(ctx, now, shardID, node, prev, cur) + return ps.replaceProperty(ctx, now, shardID, nodes, prev, cur) } func tagLen(property *propertyv1.Property) (uint32, error) { @@ -214,7 +244,7 @@ func tagLen(property *propertyv1.Property) (uint32, error) { return tagsNum, nil } -func (ps *propertyServer) replaceProperty(ctx context.Context, now time.Time, shardID uint64, node string, +func (ps *propertyServer) replaceProperty(ctx context.Context, now time.Time, shardID uint64, nodes []string, prev, cur *propertyv1.Property, ) (*propertyv1.ApplyResponse, error) { ns := now.UnixNano() @@ -225,17 +255,38 @@ func (ps *propertyServer) replaceProperty(ctx context.Context, now time.Time, sh } cur.Metadata.ModRevision = ns cur.UpdatedAt = timestamppb.New(now) - f, err := ps.pipeline.Publish(ctx, data.TopicPropertyUpdate, bus.NewMessageWithNode(bus.MessageID(time.Now().Unix()), node, &propertyv1.InternalUpdateRequest{ + req := &propertyv1.InternalUpdateRequest{ ShardId: shardID, - Id: getPropertyID(cur), + Id: propertypkg.GetPropertyID(cur), Property: cur, - })) - if err != nil { - return nil, err } - if _, err := f.Get(); err != nil { - return nil, err + futures := make([]bus.Future, 0, len(nodes)) + for _, node := range nodes { + f, err := ps.pipeline.Publish(ctx, data.TopicPropertyUpdate, + bus.NewMessageWithNode(bus.MessageID(time.Now().Unix()), node, req)) + if err != nil { + return nil, errors.Wrapf(err, "failed to publish property update to node %s", node) + } + futures = append(futures, f) + } + // Wait for all futures to complete, and which should last have one success + haveSuccess := false + var lastestError error + for _, f := range futures { + _, err := f.Get() + if err == nil { + haveSuccess = true + } else { + lastestError = multierr.Append(lastestError, err) + } + } + if !haveSuccess { + if lastestError != nil { + return nil, lastestError + } + return nil, errors.New("failed to apply property, no replicas success") } + return &propertyv1.ApplyResponse{ Created: prev == nil, TagsNum: uint32(len(cur.Tags)), @@ -266,16 +317,22 @@ func (ps *propertyServer) Delete(ctx context.Context, req *propertyv1.DeleteRequ if len(req.Id) > 0 { qReq.Ids = []string{req.Id} } - qResp, err := ps.Query(ctx, qReq) + nodeProperties, _, err := ps.queryProperties(ctx, qReq) if err != nil { return nil, err } - if len(qResp.Properties) == 0 { + if len(nodeProperties) == 0 { return &propertyv1.DeleteResponse{Deleted: false}, nil } var ids [][]byte - for _, p := range qResp.Properties { - ids = append(ids, getPropertyID(p)) + for _, properties := range nodeProperties { + for _, p := range properties { + // if the property already delete, then ignore execute twice + if p.deleted { + continue + } + ids = append(ids, propertypkg.GetPropertyID(p.Property)) + } } if err := ps.remove(ids); err != nil { return nil, err @@ -299,6 +356,64 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques if req.Limit == 0 { req.Limit = 100 } + + nodeProperties, trace, err := ps.queryProperties(ctx, req) + if err != nil { + return nil, err + } + res := make(map[string]*propertyWithMetadata) + for _, nodeWithProperties := range nodeProperties { + for _, propertyMetadata := range nodeWithProperties { + entity := propertypkg.GetEntity(propertyMetadata.Property) + cur, ok := res[entity] + if !ok { + res[entity] = propertyMetadata + continue + } + if cur.Metadata.ModRevision < propertyMetadata.Metadata.ModRevision { + res[entity] = propertyMetadata + // TODO(mrproliu) handle the case where the property detected multiple versions + } + } + } + if len(res) == 0 { + return &propertyv1.QueryResponse{Properties: nil, Trace: trace}, nil + } + properties := make([]*propertyv1.Property, 0, len(res)) + for _, p := range res { + // ignore deleted property + if p.deleted { + continue + } + if len(req.TagProjection) > 0 { + var tags []*modelv1.Tag + for _, tag := range p.Tags { + for _, tp := range req.TagProjection { + if tp == tag.Key { + tags = append(tags, tag) + break + } + } + } + p.Tags = tags + } + properties = append(properties, p.Property) + if len(properties) >= int(req.Limit) { + break + } + } + return &propertyv1.QueryResponse{Properties: properties, Trace: trace}, nil +} + +// queryProperties internal properties query, return all properties with related nodes. +func (ps *propertyServer) queryProperties( + ctx context.Context, + req *propertyv1.QueryRequest, +) (nodeProperties [][]*propertyWithMetadata, trace *commonv1.Trace, err error) { + start := time.Now() + if req.Limit == 0 { + req.Limit = 100 + } var span *query.Span if req.Trace { tracer, _ := query.NewTracer(ctx, start.Format(time.RFC3339Nano)) @@ -308,23 +423,23 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques if err != nil { span.Error(err) } else { - resp.Trace = tracer.ToProto() + trace = tracer.ToProto() } span.Stop() }() } for _, gn := range req.Groups { if g, getGroupErr := ps.schemaRegistry.GroupRegistry().GetGroup(ctx, gn); getGroupErr != nil { - return nil, errors.Errorf("group %s not found", gn) + return nil, trace, errors.Errorf("group %s not found", gn) } else if g.Catalog != commonv1.Catalog_CATALOG_PROPERTY { - return nil, errors.Errorf("group %s is not allowed to have properties", gn) + return nil, trace, errors.Errorf("group %s is not allowed to have properties", gn) } } ff, err := ps.pipeline.Broadcast(defaultQueryTimeout, data.TopicPropertyQuery, bus.NewMessage(bus.MessageID(start.Unix()), req)) if err != nil { - return nil, err + return nil, trace, err } - res := make(map[string]*propertyv1.Property) + res := make([][]*propertyWithMetadata, 0) for _, f := range ff { if m, getErr := f.Get(); getErr != nil { err = multierr.Append(err, getErr) @@ -333,62 +448,41 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques if d == nil { continue } + nodeWithProperties := make([]*propertyWithMetadata, 0) switch v := d.(type) { case *propertyv1.InternalQueryResponse: - for _, s := range v.Sources { + for i, s := range v.Sources { var p propertyv1.Property + var deleted bool err = protojson.Unmarshal(s, &p) if err != nil { - return nil, err + return nil, trace, err } - entity := getEntity(&p) - cur, ok := res[entity] - if !ok { - res[entity] = &p - continue + if i < len(v.Deletes) { + deleted = v.Deletes[i] } - if cur.Metadata.ModRevision < p.Metadata.ModRevision { - res[entity] = &p - err = ps.remove([][]byte{getPropertyID(cur)}) - if err != nil { - return nil, err - } + property := &propertyWithMetadata{ + Property: &p, + deleted: deleted, } + nodeWithProperties = append(nodeWithProperties, property) } if span != nil { span.AddSubTrace(v.Trace) } + res = append(res, nodeWithProperties) case *common.Error: err = multierr.Append(err, errors.New(v.Error())) } } } if err != nil { - return nil, err + return nil, trace, err } if len(res) == 0 { - return &propertyv1.QueryResponse{Properties: nil}, nil - } - properties := make([]*propertyv1.Property, 0, len(res)) - for _, p := range res { - if len(req.TagProjection) > 0 { - var tags []*modelv1.Tag - for _, tag := range p.Tags { - for _, tp := range req.TagProjection { - if tp == tag.Key { - tags = append(tags, tag) - break - } - } - } - p.Tags = tags - } - properties = append(properties, p) - if len(properties) >= int(req.Limit) { - break - } + return res, trace, nil } - return &propertyv1.QueryResponse{Properties: properties}, nil + return res, trace, nil } func (ps *propertyServer) remove(ids [][]byte) error { @@ -406,10 +500,7 @@ func (ps *propertyServer) remove(ids [][]byte) error { return nil } -func getPropertyID(prop *propertyv1.Property) []byte { - return convert.StringToBytes(getEntity(prop) + "/" + strconv.FormatInt(prop.Metadata.ModRevision, 10)) -} - -func getEntity(prop *propertyv1.Property) string { - return strings.Join([]string{prop.Metadata.Group, prop.Metadata.Name, prop.Id}, "/") +type propertyWithMetadata struct { + *propertyv1.Property + deleted bool } diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index 00c384c4..b12d1839 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -147,9 +147,10 @@ func NewServer(_ context.Context, pipeline, broadcaster queue.Client, topNPipeli schemaRegistry: schemaRegistry, }, propertyServer: &propertyServer{ - schemaRegistry: schemaRegistry, - pipeline: pipeline, - nodeRegistry: nr.PropertyNodeRegistry, + schemaRegistry: schemaRegistry, + pipeline: pipeline, + nodeRegistry: nr.PropertyNodeRegistry, + discoveryService: newDiscoveryService(schema.KindProperty, schemaRegistry, nr.MeasureNodeRegistry), }, propertyRegistryServer: &propertyRegistryServer{ schemaRegistry: schemaRegistry, @@ -164,9 +165,11 @@ func (s *server) PreRun(_ context.Context) error { s.log = logger.GetLogger("liaison-grpc") s.streamSVC.setLogger(s.log) s.measureSVC.setLogger(s.log) + s.propertyServer.SetLogger(s.log) components := []*discoveryService{ s.streamSVC.discoveryService, s.measureSVC.discoveryService, + s.propertyServer.discoveryService, } for _, c := range components { c.SetLogger(s.log) diff --git a/banyand/property/db.go b/banyand/property/db.go index 8cc28629..34c552e1 100644 --- a/banyand/property/db.go +++ b/banyand/property/db.go @@ -111,19 +111,19 @@ func (db *database) update(ctx context.Context, shardID common.ShardID, id []byt return nil } -func (db *database) delete(docIDs [][]byte) error { +func (db *database) delete(ctx context.Context, docIDs [][]byte) error { sLst := db.sLst.Load() if sLst == nil { return nil } var err error for _, s := range *sLst { - multierr.AppendInto(&err, s.delete(docIDs)) + multierr.AppendInto(&err, s.delete(ctx, docIDs)) } return err } -func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) ([][]byte, error) { +func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) ([]*queryProperty, error) { iq, err := inverted.BuildPropertyQuery(req, groupField, entityID) if err != nil { return nil, err @@ -132,7 +132,7 @@ func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) ([] if sLst == nil { return nil, nil } - var res [][]byte + var res []*queryProperty for _, s := range *sLst { r, err := s.search(ctx, iq, int(req.Limit)) if err != nil { @@ -224,3 +224,8 @@ func walkDir(root, prefix string, wf walkFn) error { } return nil } + +type queryProperty struct { + source []byte + deleted bool +} diff --git a/banyand/property/listener.go b/banyand/property/listener.go index f02b7ac2..f204816f 100644 --- a/banyand/property/listener.go +++ b/banyand/property/listener.go @@ -110,7 +110,7 @@ type deleteListener struct { s *service } -func (h *deleteListener) Rev(_ context.Context, message bus.Message) (resp bus.Message) { +func (h *deleteListener) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { n := time.Now() now := n.UnixNano() var protoReq proto.Message @@ -130,7 +130,7 @@ func (h *deleteListener) Rev(_ context.Context, message bus.Message) (resp bus.M resp = bus.NewMessage(bus.MessageID(now), common.NewError("id is empty")) return } - err := h.s.db.delete(d.Ids) + err := h.s.db.delete(ctx, d.Ids) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to delete property: %v", err)) return @@ -180,7 +180,7 @@ func (h *queryListener) Rev(ctx context.Context, message bus.Message) (resp bus. span.Stop() }() } - sources, err := h.s.db.query(ctx, d) + properties, err := h.s.db.query(ctx, d) if err != nil { if tracer != nil { span.Error(err) @@ -192,8 +192,10 @@ func (h *queryListener) Rev(ctx context.Context, message bus.Message) (resp bus. resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to query property: %v", err)) return } - qResp := &propertyv1.InternalQueryResponse{ - Sources: sources, + qResp := &propertyv1.InternalQueryResponse{} + for _, p := range properties { + qResp.Sources = append(qResp.Sources, p.source) + qResp.Deletes = append(qResp.Deletes, p.deleted) } if tracer != nil { qResp.Trace = tracer.ToProto() diff --git a/banyand/property/property.go b/banyand/property/property.go index 59602ff4..98f33304 100644 --- a/banyand/property/property.go +++ b/banyand/property/property.go @@ -18,7 +18,14 @@ // Package property provides the property service interface. package property -import "github.com/apache/skywalking-banyandb/pkg/run" +import ( + "strconv" + "strings" + + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/run" +) // Service is the interface for property service. type Service interface { @@ -26,3 +33,13 @@ type Service interface { run.Config run.Service } + +// GetPropertyID returns the property ID based on the property metadata and revision. +func GetPropertyID(prop *propertyv1.Property) []byte { + return convert.StringToBytes(GetEntity(prop) + "/" + strconv.FormatInt(prop.Metadata.ModRevision, 10)) +} + +// GetEntity returns the entity string for the property. +func GetEntity(prop *propertyv1.Property) string { + return strings.Join([]string{prop.Metadata.Group, prop.Metadata.Name, prop.Id}, "/") +} diff --git a/banyand/property/shard.go b/banyand/property/shard.go index 3fd8d3bd..7a4bef78 100644 --- a/banyand/property/shard.go +++ b/banyand/property/shard.go @@ -22,6 +22,7 @@ import ( "fmt" "path" "strconv" + "sync" "google.golang.org/protobuf/encoding/protojson" @@ -42,14 +43,16 @@ const ( groupField = "_group" nameField = index.IndexModeName entityID = "_entity_id" + deleteField = "_deleted" ) var ( - sourceFieldKey = index.FieldKey{TagName: sourceField} - entityFieldKey = index.FieldKey{TagName: entityID} - groupFieldKey = index.FieldKey{TagName: groupField} - nameFieldKey = index.FieldKey{TagName: nameField} - projection = []index.FieldKey{sourceFieldKey} + sourceFieldKey = index.FieldKey{TagName: sourceField} + entityFieldKey = index.FieldKey{TagName: entityID} + groupFieldKey = index.FieldKey{TagName: groupField} + nameFieldKey = index.FieldKey{TagName: nameField} + deletedFieldKey = index.FieldKey{TagName: deleteField} + projection = []index.FieldKey{sourceFieldKey, deletedFieldKey} ) type shard struct { @@ -66,8 +69,7 @@ func (s *shard) close() error { return nil } -func (db *database) newShard(ctx context.Context, id common.ShardID, flushTimeoutSeconds int64, -) (*shard, error) { +func (db *database) newShard(ctx context.Context, id common.ShardID, _ int64) (*shard, error) { location := path.Join(db.location, fmt.Sprintf(shardTemplate, int(id))) sName := "shard" + strconv.Itoa(int(id)) si := &shard{ @@ -79,7 +81,7 @@ func (db *database) newShard(ctx context.Context, id common.ShardID, flushTimeou Path: location, Logger: si.l, Metrics: inverted.NewMetrics(db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard": sName}))), - BatchWaitSec: flushTimeoutSeconds, + BatchWaitSec: 0, } var err error if si.store, err = inverted.NewStore(opts); err != nil { @@ -89,9 +91,17 @@ func (db *database) newShard(ctx context.Context, id common.ShardID, flushTimeou } func (s *shard) update(id []byte, property *propertyv1.Property) error { + document, err := s.buildUpdateDocument(id, property) + if err != nil { + return fmt.Errorf("build update document failure: %w", err) + } + return s.updateDocuments(index.Documents{*document}) +} + +func (s *shard) buildUpdateDocument(id []byte, property *propertyv1.Property) (*index.Document, error) { pj, err := protojson.Marshal(property) if err != nil { - return err + return nil, err } sourceField := index.NewBytesField(sourceFieldKey, pj) sourceField.NoSort = true @@ -110,24 +120,78 @@ func (s *shard) update(id []byte, property *propertyv1.Property) error { for _, t := range property.Tags { tv, err := pbv1.MarshalTagValue(t.Value) if err != nil { - return err + return nil, err } tagField := index.NewBytesField(index.FieldKey{IndexRuleID: uint32(convert.HashStr(t.Key))}, tv) tagField.Index = true tagField.NoSort = true doc.Fields = append(doc.Fields, tagField) } - return s.store.UpdateSeriesBatch(index.Batch{ - Documents: index.Documents{doc}, - }) + return &doc, nil } -func (s *shard) delete(docID [][]byte) error { - return s.store.Delete(docID) +func (s *shard) delete(ctx context.Context, docID [][]byte) error { + // search the original documents by docID + seriesMatchers := make([]index.SeriesMatcher, 0, len(docID)) + for _, id := range docID { + seriesMatchers = append(seriesMatchers, index.SeriesMatcher{ + Match: id, + Type: index.SeriesMatcherTypeExact, + }) + } + iq, err := s.store.BuildQuery(seriesMatchers, nil, nil) + if err != nil { + return fmt.Errorf("build property query failure: %w", err) + } + exisingDocList, err := s.search(ctx, iq, len(docID)) + if err != nil { + return fmt.Errorf("search existing documents failure: %w", err) + } + removeDocList := make(index.Documents, 0, len(exisingDocList)) + for _, property := range exisingDocList { + p := &propertyv1.Property{} + if err := protojson.Unmarshal(property.source, p); err != nil { + return fmt.Errorf("unmarshal property failure: %w", err) + } + // update the property to mark it as deleted + document, err := s.buildUpdateDocument(GetPropertyID(p), p) + if err != nil { + return fmt.Errorf("build delete document failure: %w", err) + } + // mark the document as deleted + document.Deleted = true + removeDocList = append(removeDocList, *document) + } + return s.updateDocuments(removeDocList) +} + +func (s *shard) updateDocuments(docs index.Documents) error { + if len(docs) == 0 { + return nil + } + var updateErr, persistentError error + wg := sync.WaitGroup{} + wg.Add(1) + + updateErr = s.store.UpdateSeriesBatch(index.Batch{ + Documents: docs, + PersistentCallback: func(err error) { + persistentError = err + wg.Done() + }, + }) + if updateErr != nil { + return updateErr + } + wg.Wait() + if persistentError != nil { + return fmt.Errorf("persistent failure: %w", persistentError) + } + return nil } func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, -) (data [][]byte, err error) { +) (data []*queryProperty, err error) { tracer := query.GetTracer(ctx) if tracer != nil { span, _ := tracer.StartSpan(ctx, "property.search") @@ -150,9 +214,14 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, if len(ss) == 0 { return nil, nil } - data = make([][]byte, 0, len(ss)) + data = make([]*queryProperty, 0, len(ss)) for _, s := range ss { - data = append(data, s.Fields[sourceField]) + bytes := s.Fields[sourceField] + deleted := convert.BytesToBool(s.Fields[deleteField]) + data = append(data, &queryProperty{ + source: bytes, + deleted: deleted, + }) } return data, nil } diff --git a/bydbctl/internal/cmd/property_test.go b/bydbctl/internal/cmd/property_test.go index 9ef1d04b..b4813cf3 100644 --- a/bydbctl/internal/cmd/property_test.go +++ b/bydbctl/internal/cmd/property_test.go @@ -18,6 +18,8 @@ package cmd_test import ( + "errors" + "fmt" "strings" . "github.com/onsi/ginkgo/v2" @@ -27,46 +29,55 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" ) -var _ = Describe("Property Operation", func() { - var addr string - var deferFunc func() - var rootCmd *cobra.Command - p1YAML := ` +var ( + propertyGroup = "ui-template" + propertyTagCount = 2 + p1YAML = fmt.Sprintf(` metadata: - group: ui-template + group: %s name: service id: kubernetes tags: - key: content value: str: - value: foo + value: foo111 - key: state value: int: value: 1 -` - p2YAML := ` +`, propertyGroup) + + p2YAML = fmt.Sprintf(` metadata: - group: ui-template + group: %s name: service id: kubernetes tags: - key: content value: str: - value: foo + value: foo333 - key: state value: int: value: 3 -` +`, propertyGroup) +) + +var _ = Describe("Property Operation", func() { + var addr string + var deferFunc func() + var rootCmd *cobra.Command p1Proto := new(propertyv1.Property) helpers.UnmarshalYAML([]byte(p1YAML), p1Proto) @@ -78,72 +89,35 @@ tags: // extracting the operation of creating property schema rootCmd = &cobra.Command{Use: "root"} cmd.RootCmdFlags(rootCmd) - rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"}) - createGroup := func() string { - rootCmd.SetIn(strings.NewReader(` -metadata: - name: ui-template -catalog: CATALOG_PROPERTY -resource_opts: - shard_num: 2 -`)) - return capturer.CaptureStdout(func() { - err := rootCmd.Execute() - if err != nil { - GinkgoWriter.Printf("execution fails:%v", err) - } - }) - } - Eventually(createGroup, flags.EventuallyTimeout).Should(ContainSubstring("group ui-template is created")) - rootCmd.SetArgs([]string{"property", "schema", "create", "-a", addr, "-f", "-"}) - createPropertySchema := func() string { - rootCmd.SetIn(strings.NewReader(` -metadata: - name: service - group: ui-template -tags: - - name: content - type: TAG_TYPE_STRING - - name: state - type: TAG_TYPE_INT -`)) - return capturer.CaptureStdout(func() { - err := rootCmd.Execute() - if err != nil { - GinkgoWriter.Printf("execution fails:%v", err) - } - }) - } - Eventually(createPropertySchema, flags.EventuallyTimeout).Should(ContainSubstring("property schema ui-template.service is created")) - rootCmd.SetArgs([]string{"property", "data", "apply", "-a", addr, "-f", "-"}) - rootCmd.SetIn(strings.NewReader(p1YAML)) + defUITemplateWithSchema(rootCmd, addr, 2) + applyData(rootCmd, addr, p1YAML, true, propertyTagCount) + }) + + It("update property", func() { + // update the property + applyData(rootCmd, addr, p2YAML, false, propertyTagCount) + + // check the property + queryData(rootCmd, addr, propertyGroup, 1, func(data string, resp *propertyv1.QueryResponse) { + Expect(data).To(ContainSubstring("foo333")) + }) + }) + + It("apply same property after delete", func() { + // delete + rootCmd.SetArgs([]string{"property", "data", "delete", "-g", "ui-template", "-n", "service", "-i", "kubernetes"}) out := capturer.CaptureStdout(func() { err := rootCmd.Execute() Expect(err).NotTo(HaveOccurred()) }) - GinkgoWriter.Println(out) - Expect(out).To(ContainSubstring("created: true")) - Expect(out).To(ContainSubstring("tagsNum: 2")) + Expect(out).To(ContainSubstring("deleted: true")) + + // apply property(created should be true) + applyData(rootCmd, addr, p2YAML, true, propertyTagCount) }) It("query all properties", func() { - rootCmd.SetArgs([]string{"property", "data", "query", "-a", addr, "-f", "-"}) - issue := func() string { - rootCmd.SetIn(strings.NewReader(` -groups: ["ui-template"] -`)) - return capturer.CaptureStdout(func() { - err := rootCmd.Execute() - Expect(err).NotTo(HaveOccurred()) - }) - } - Eventually(func() int { - out := issue() - resp := new(propertyv1.QueryResponse) - helpers.UnmarshalYAML([]byte(out), resp) - GinkgoWriter.Println(resp) - return len(resp.Properties) - }, flags.EventuallyTimeout).Should(Equal(1)) + queryData(rootCmd, addr, propertyGroup, 1, nil) }) It("delete property", func() { @@ -154,23 +128,8 @@ groups: ["ui-template"] Expect(err).NotTo(HaveOccurred()) }) Expect(out).To(ContainSubstring("deleted: true")) - rootCmd.SetArgs([]string{"property", "data", "query", "-a", addr, "-f", "-"}) - issue := func() string { - rootCmd.SetIn(strings.NewReader(` -groups: ["ui-template"] -`)) - return capturer.CaptureStdout(func() { - err := rootCmd.Execute() - Expect(err).NotTo(HaveOccurred()) - }) - } - Eventually(func() int { - out := issue() - resp := new(propertyv1.QueryResponse) - helpers.UnmarshalYAML([]byte(out), resp) - GinkgoWriter.Println(resp) - return len(resp.Properties) - }, flags.EventuallyTimeout).Should(Equal(0)) + + queryData(rootCmd, addr, propertyGroup, 0, nil) }) AfterEach(func() { @@ -368,8 +327,8 @@ metadata: name: endpoint group: ui-template tags: - - name: content - type: TAG_TYPE_STRING +- name: content + type: TAG_TYPE_STRING `)) out = capturer.CaptureStdout(func() { err := rootCmd.Execute() @@ -395,3 +354,191 @@ tags: deferFunc() }) }) + +var _ = Describe("Property Cluster Operation", func() { + Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })).To(Succeed()) + + var addr string + var deferFunc func() + var rootCmd *cobra.Command + BeforeEach(func() { + rootCmd = &cobra.Command{Use: "root"} + cmd.RootCmdFlags(rootCmd) + var ports []int + + // first creating + By("Starting node1 with data") + node1Dir, spaceDef1, err := test.NewSpace() + Expect(err).NotTo(HaveOccurred()) + ports, err = test.AllocateFreePorts(4) + Expect(err).NotTo(HaveOccurred()) + _, node1Addr, node1Defer := setup.ClosableStandalone(node1Dir, ports) + node1Addr = httpSchema + node1Addr + defUITemplateWithSchema(rootCmd, node1Addr, 1) + applyData(rootCmd, node1Addr, p1YAML, true, propertyTagCount) + queryData(rootCmd, node1Addr, propertyGroup, 1, func(data string, resp *propertyv1.QueryResponse) { + Expect(data).To(ContainSubstring("foo111")) + }) + node1Defer() + + By("Starting node2 with data") + node2Dir, spaceDef2, err := test.NewSpace() + Expect(err).NotTo(HaveOccurred()) + ports, err = test.AllocateFreePorts(4) + Expect(err).NotTo(HaveOccurred()) + _, node2Addr, node2Defer := setup.ClosableStandalone(node2Dir, ports) + node2Addr = httpSchema + node2Addr + defUITemplateWithSchema(rootCmd, node2Addr, 1) + applyData(rootCmd, node2Addr, p2YAML, true, propertyTagCount) + queryData(rootCmd, node2Addr, propertyGroup, 1, func(data string, resp *propertyv1.QueryResponse) { + Expect(data).To(ContainSubstring("foo333")) + }) + node2Defer() + + // setup cluster with two data nodes + By("Starting etcd server") + ports, err = test.AllocateFreePorts(2) + Expect(err).NotTo(HaveOccurred()) + dir, spaceDef, err := test.NewSpace() + Expect(err).NotTo(HaveOccurred()) + ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0]) + server, err := embeddedetcd.NewServer( + embeddedetcd.ConfigureListener([]string{ep}, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), + embeddedetcd.RootDir(dir), + ) + Expect(err).ShouldNot(HaveOccurred()) + <-server.ReadyNotify() + By("Starting data node 0") + closeDataNode0 := setup.DataNodeFromDataDir(ep, node1Dir) + By("Starting data node 1") + closeDataNode1 := setup.DataNodeFromDataDir(ep, node2Dir) + By("Starting liaison node") + _, liaisonHTTPAddr, closerLiaisonNode := setup.LiaisonNodeWithHTTP(ep) + By("Initializing test cases") + + deferFunc = func() { + closerLiaisonNode() + closeDataNode0() + closeDataNode1() + _ = server.Close() + <-server.StopNotify() + spaceDef() + spaceDef1() + spaceDef2() + } + addr = httpSchema + liaisonHTTPAddr + // creating schema + defUITemplateWithSchema(rootCmd, addr, 1) + }) + + AfterEach(func() { + deferFunc() + }) + + It("query from difference version", func() { + queryData(rootCmd, addr, propertyGroup, 1, func(data string, resp *propertyv1.QueryResponse) { + Expect(data).Should(ContainSubstring("foo333")) + }) + }) + + It("delete property", func() { + // delete properties + rootCmd.SetArgs([]string{"property", "data", "delete", "-g", "ui-template", "-n", "service", "-i", "kubernetes"}) + out := capturer.CaptureStdout(func() { + err := rootCmd.Execute() + Expect(err).NotTo(HaveOccurred()) + }) + Expect(out).To(ContainSubstring("deleted: true")) + + // should no properties after deletion + queryData(rootCmd, addr, propertyGroup, 0, nil) + + // created again, the created should be true + applyData(rootCmd, addr, p1YAML, true, propertyTagCount) + }) +}) + +func defUITemplateWithSchema(rootCmd *cobra.Command, addr string, shardCount int) { + rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"}) + createGroup := func() string { + rootCmd.SetIn(strings.NewReader(fmt.Sprintf(` +metadata: + name: ui-template +catalog: CATALOG_PROPERTY +resource_opts: + shard_num: %d +`, shardCount))) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("execution fails:%v", err) + } + }) + } + Eventually(createGroup, flags.EventuallyTimeout).Should(ContainSubstring("group ui-template is created")) + + rootCmd.SetArgs([]string{"property", "schema", "create", "-a", addr, "-f", "-"}) + createPropertySchema := func() string { + rootCmd.SetIn(strings.NewReader(` +metadata: + name: service + group: ui-template +tags: + - name: content + type: TAG_TYPE_STRING + - name: state + type: TAG_TYPE_INT +`)) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("execution fails:%v", err) + } + }) + } + Eventually(createPropertySchema, flags.EventuallyTimeout).Should(ContainSubstring("property schema ui-template.service is created")) +} + +func applyData(rootCmd *cobra.Command, addr, data string, created bool, tagsNum int) { + rootCmd.SetArgs([]string{"property", "data", "apply", "-a", addr, "-f", "-"}) + rootCmd.SetIn(strings.NewReader(data)) + out := capturer.CaptureStdout(func() { + err := rootCmd.Execute() + Expect(err).NotTo(HaveOccurred()) + }) + GinkgoWriter.Println(out) + Expect(out).To(ContainSubstring(fmt.Sprintf("created: %t", created))) + Expect(out).To(ContainSubstring(fmt.Sprintf("tagsNum: %d", tagsNum))) +} + +func queryData(rootCmd *cobra.Command, addr, group string, dataCount int, verify func(data string, resp *propertyv1.QueryResponse)) { + rootCmd.SetArgs([]string{"property", "data", "query", "-a", addr, "-f", "-"}) + issue := func() string { + rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`groups: ["%s"]`, group))) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + Expect(err).NotTo(HaveOccurred()) + }) + } + Eventually(func() error { + out := issue() + resp := new(propertyv1.QueryResponse) + helpers.UnmarshalYAML([]byte(out), resp) + GinkgoWriter.Println(resp) + + failures := InterceptGomegaFailures(func() { + Expect(len(resp.Properties)).To(Equal(dataCount)) + if verify != nil { + verify(out, resp) + } + }) + + if len(failures) > 0 { + return errors.New(strings.Join(failures, "\n")) + } + return nil + }, flags.EventuallyTimeout).Should(Succeed()) +} diff --git a/docs/api-reference.md b/docs/api-reference.md index 40f72f7b..72185c0d 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -3401,6 +3401,7 @@ Property stores the user defined data | ----- | ---- | ----- | ----------- | | sources | [bytes](#bytes) | repeated | | | trace | [banyandb.common.v1.Trace](#banyandb-common-v1-Trace) | | | +| deletes | [bool](#bool) | repeated | deletes indicates the property is deleted or not it's mapping to the sources in the same order | diff --git a/pkg/convert/number.go b/pkg/convert/number.go index 0074c588..4667b088 100644 --- a/pkg/convert/number.go +++ b/pkg/convert/number.go @@ -51,6 +51,14 @@ func Uint32ToBytes(u uint32) []byte { return bs } +// BoolToBytes converts bool to bytes. +func BoolToBytes(b bool) []byte { + if b { + return []byte{1} + } + return []byte{0} +} + // BytesToInt64 converts bytes to int64. func BytesToInt64(b []byte) int64 { u := binary.BigEndian.Uint64(b) @@ -87,3 +95,11 @@ func Float64ToBytes(f float64) []byte { func BytesToFloat64(b []byte) float64 { return math.Float64frombits(binary.BigEndian.Uint64(b)) } + +// BytesToBool converts bytes to bool. +func BytesToBool(b []byte) bool { + if len(b) == 0 { + return false + } + return b[0] != 0 +} diff --git a/pkg/convert/number_test.go b/pkg/convert/number_test.go index 9fcd8473..1a5b310c 100644 --- a/pkg/convert/number_test.go +++ b/pkg/convert/number_test.go @@ -46,3 +46,22 @@ func TestInt64ToBytes(t *testing.T) { }) } } + +func TestBoolToBytes(t *testing.T) { + testCases := []struct { + expected []byte + input bool + }{ + {[]byte{1}, true}, + {[]byte{0}, false}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("BoolToBytes(%t)", tc.input), func(t *testing.T) { + result := BoolToBytes(tc.input) + if !bytes.Equal(result, tc.expected) { + t.Errorf("Expected %v, got %v", tc.expected, result) + } + }) + } +} diff --git a/pkg/index/index.go b/pkg/index/index.go index e13334b2..9ec98899 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -300,6 +300,7 @@ type Document struct { Timestamp int64 DocID uint64 Version int64 + Deleted bool // for logical deletion } // Documents is a collection of documents. @@ -307,7 +308,8 @@ type Documents []Document // Batch is a collection of documents. type Batch struct { - Documents Documents + PersistentCallback func(error) + Documents Documents } // Writer allows writing fields and docID in a document to an index. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 9ac005af..8450e647 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -52,6 +52,7 @@ const ( timestampField = "_timestamp" versionField = "_version" sourceField = "_source" + deletedField = "_deleted" ) var ( @@ -141,6 +142,9 @@ func (s *store) Batch(batch index.Batch) error { if d.Timestamp > 0 { doc.AddField(bluge.NewDateTimeField(timestampField, time.Unix(0, d.Timestamp)).StoreValue()) } + if d.Deleted { + doc.AddField(bluge.NewStoredOnlyField(deletedField, convert.BoolToBytes(true)).StoreValue()) + } b.Insert(doc) } return s.writer.Batch(b) diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index 93c95c88..c99e5aa5 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -47,6 +47,9 @@ func (s *store) InsertSeriesBatch(batch index.Batch) error { } defer s.closer.Done() b := generateBatch() + if batch.PersistentCallback != nil { + b.SetPersistedCallback(batch.PersistentCallback) + } defer releaseBatch(b) for _, d := range batch.Documents { doc, ff := toDoc(d, true) @@ -65,6 +68,9 @@ func (s *store) UpdateSeriesBatch(batch index.Batch) error { defer s.closer.Done() b := generateBatch() defer releaseBatch(b) + if batch.PersistentCallback != nil { + b.SetPersistedCallback(batch.PersistentCallback) + } for _, d := range batch.Documents { doc, _ := toDoc(d, false) b.Update(doc.ID(), doc) @@ -121,6 +127,9 @@ func toDoc(d index.Document, toParseFieldNames bool) (*bluge.Document, []string) vf := bluge.NewStoredOnlyField(versionField, convert.Int64ToBytes(d.Version)) doc.AddField(vf) } + if d.Deleted { + doc.AddField(bluge.NewStoredOnlyField(deletedField, convert.BoolToBytes(d.Deleted)).StoreValue()) + } return doc, fieldNames } diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 53f87cad..6cd6b7ba 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -204,10 +204,7 @@ func CMD(flags ...string) func() { } } -func startDataNode(etcdEndpoint string, flags ...string) (string, string, func(), func()) { - path, deferFn, err := test.NewSpace() - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - +func startDataNode(etcdEndpoint, dataDir string, flags ...string) (string, func()) { ports, err := test.AllocateFreePorts(1) gomega.Expect(err).NotTo(gomega.HaveOccurred()) @@ -218,9 +215,9 @@ func startDataNode(etcdEndpoint string, flags ...string) (string, string, func() "data", "--grpc-host="+host, fmt.Sprintf("--grpc-port=%d", ports[0]), - "--stream-root-path="+path, - "--measure-root-path="+path, - "--property-root-path="+path, + "--stream-root-path="+dataDir, + "--measure-root-path="+dataDir, + "--property-root-path="+dataDir, "--etcd-endpoints", etcdEndpoint, "--node-host-provider", "flag", "--node-host", nodeHost, @@ -236,48 +233,64 @@ func startDataNode(etcdEndpoint string, flags ...string) (string, string, func() return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0])) }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1)) - return addr, path, closeFn, deferFn + return addr, closeFn } // DataNode runs a data node. func DataNode(etcdEndpoint string, flags ...string) func() { - _, _, closeFn, deferFn := startDataNode(etcdEndpoint, flags...) + path, deferFn, err := test.NewSpace() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + closeFn := DataNodeFromDataDir(etcdEndpoint, path, flags...) return func() { closeFn() deferFn() } } +// DataNodeFromDataDir runs a data node with a specific data directory. +func DataNodeFromDataDir(etcdEndpoint, dataDir string, flags ...string) func() { + _, closeFn := startDataNode(etcdEndpoint, dataDir, flags...) + return closeFn +} + // DataNodeWithAddrAndDir runs a data node and returns the address and root path. func DataNodeWithAddrAndDir(etcdEndpoint string, flags ...string) (string, string, func()) { - addr, dir, closeFn, deferFn := startDataNode(etcdEndpoint, flags...) - return addr, dir, func() { + path, deferFn, err := test.NewSpace() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + addr, closeFn := startDataNode(etcdEndpoint, path, flags...) + return addr, path, func() { closeFn() deferFn() } } // LiaisonNode runs a liaison node. -func LiaisonNode(etcdEndpoint string, flags ...string) (string, func()) { +func LiaisonNode(etcdEndpoint string, flags ...string) (grpcAddr string, closeFn func()) { + grpcAddr, _, closeFn = LiaisonNodeWithHTTP(etcdEndpoint, flags...) + return +} + +// LiaisonNodeWithHTTP runs a liaison node with HTTP enabled and returns the gRPC and HTTP addresses. +func LiaisonNodeWithHTTP(etcdEndpoint string, flags ...string) (grpcAddr, httpAddr string, closeFn func()) { ports, err := test.AllocateFreePorts(2) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - addr := fmt.Sprintf("%s:%d", host, ports[0]) - httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) + grpcAddr = fmt.Sprintf("%s:%d", host, ports[0]) + httpAddr = fmt.Sprintf("%s:%d", host, ports[1]) nodeHost := "127.0.0.1" flags = append(flags, "liaison", "--grpc-host="+host, fmt.Sprintf("--grpc-port=%d", ports[0]), "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), - "--http-grpc-addr="+addr, + "--http-grpc-addr="+grpcAddr, "--etcd-endpoints", etcdEndpoint, "--node-host-provider", "flag", "--node-host", nodeHost, ) - closeFn := CMD(flags...) + closeFn = CMD(flags...) gomega.Eventually(helpers.HTTPHealthCheck(httpAddr, ""), testflags.EventuallyTimeout).Should(gomega.Succeed()) gomega.Eventually(func() (map[string]*databasev1.Node, error) { return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0])) }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1)) - return addr, closeFn + return } diff --git a/ui/README.md b/ui/README.md index 612ab135..435f9a9c 100644 --- a/ui/README.md +++ b/ui/README.md @@ -4,7 +4,7 @@ This template should help get you started developing with Vue 3 in Vite. ## Recommended IDE Setup -[VSCode](https://code.visualstudio.com/) + [Volar](https://marketplace.visualstudio.com/items?itemName=Vue.volar) (and disable Vetur) + [TypeScript Vue Plugin (Volar)](https://marketplace.visualstudio.com/items?itemName=Vue.vscode-typescript-vue-plugin). +VSCode + Volar (and disable Vetur) + TypeScript Vue Plugin (Volar). ## Setting up Environment Variables