This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch client in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5267d6c13c2ef820c766cc551c371c2a82c8c823 Author: Gao Hongtao <[email protected]> AuthorDate: Thu Oct 13 04:27:49 2022 +0000 Fixes some issues found by Java client Signed-off-by: Gao Hongtao <[email protected]> --- banyand/liaison/grpc/server.go | 18 ++++++++++-------- banyand/liaison/http/server.go | 23 +++++++++++++---------- banyand/tsdb/index/writer.go | 4 +++- bydbctl/internal/cmd/cmd_suite_test.go | 9 +++++++++ pkg/partition/index.go | 4 +++- pkg/query/logical/common.go | 4 +++- pkg/query/logical/index_filter.go | 8 ++++---- pkg/query/logical/tag_filter.go | 9 ++++++--- test/cases/stream/data/input/global_index.yaml | 15 +++++++++------ test/cases/stream/stream.go | 2 +- 10 files changed, 61 insertions(+), 35 deletions(-) diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index aa23da8..66cc2b4 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -178,13 +178,6 @@ func (s *Server) Validate() error { } func (s *Server) Serve() run.StopNotify { - lis, err := net.Listen("tcp", s.addr) - if err != nil { - s.log.Fatal().Err(err).Msg("Failed to listen") - } - if errValidate := s.Validate(); errValidate != nil { - s.log.Fatal().Err(errValidate).Msg("Failed to validate data") - } var opts []grpclib.ServerOption if s.tls { opts = []grpclib.ServerOption{grpclib.Creds(s.creds)} @@ -208,8 +201,17 @@ func (s *Server) Serve() run.StopNotify { s.stopCh = make(chan struct{}) go func() { + lis, err := net.Listen("tcp", s.addr) + if err != nil { + s.log.Error().Err(err).Msg("Failed to listen") + close(s.stopCh) + return + } s.log.Info().Str("addr", s.addr).Msg("Listening to") - _ = s.ser.Serve(lis) + err = s.ser.Serve(lis) + if err != nil { + s.log.Error().Err(err).Msg("server is interrupted") + } close(s.stopCh) }() return s.stopCh diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go index 7800296..e416286 100644 --- a/banyand/liaison/http/server.go +++ b/banyand/liaison/http/server.go @@ -91,7 +91,14 @@ func (p *service) PreRun() error { fileServer := stdhttp.FileServer(stdhttp.FS(fSys)) serveIndex := serveFileContents("index.html", httpFS) p.mux.Mount("/", intercept404(fileServer, serveIndex)) + p.srv = &stdhttp.Server{ + Addr: p.listenAddr, + Handler: p.mux, + } + return nil +} +func (p *service) Serve() run.StopNotify { var ctx context.Context ctx, p.clientCloser = context.WithCancel(context.Background()) opts := []grpc.DialOption{ @@ -100,7 +107,9 @@ func (p *service) PreRun() error { } client, err := newHealthCheckClient(ctx, p.l, p.grpcAddr, opts) if err != nil { - return err + p.l.Error().Err(err).Msg("Failed to health check client") + close(p.stopCh) + return p.stopCh } gwMux := runtime.NewServeMux(runtime.WithHealthzEndpoint(client)) err = multierr.Combine( @@ -114,17 +123,11 @@ func (p *service) PreRun() error { property_v1.RegisterPropertyServiceHandlerFromEndpoint(ctx, gwMux, p.grpcAddr, opts), ) if err != nil { - return err + p.l.Error().Err(err).Msg("Failed to register endpoints") + close(p.stopCh) + return p.stopCh } p.mux.Mount("/api", http.StripPrefix("/api", gwMux)) - p.srv = &stdhttp.Server{ - Addr: p.listenAddr, - Handler: p.mux, - } - return nil -} - -func (p *service) Serve() run.StopNotify { go func() { p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start liaison http server") if err := p.srv.ListenAndServe(); err != http.ErrServerClosed { diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go index a7b72f4..95dcf48 100644 --- a/banyand/tsdb/index/writer.go +++ b/banyand/tsdb/index/writer.go @@ -242,7 +242,9 @@ func getIndexValue(ruleIndex *partition.IndexRuleLocator, value Value) (val [][] val = make([][]byte, 0) var existInt bool if len(ruleIndex.TagIndices) != 1 { - return nil, false, errors.Wrap(ErrUnsupportedIndexType, "the index rule didn't support composited tags") + return nil, false, errors.WithMessagef(ErrUnsupportedIndexType, + "the index rule %s(%v) didn't support composited tags", + ruleIndex.Rule.Metadata.Name, ruleIndex.Rule.Tags) } tIndex := ruleIndex.TagIndices[0] tag, err := partition.GetTagByOffset(value.TagFamilies, tIndex.FamilyOffset, tIndex.TagOffset) diff --git a/bydbctl/internal/cmd/cmd_suite_test.go b/bydbctl/internal/cmd/cmd_suite_test.go index 36313e2..6dc56ae 100644 --- a/bydbctl/internal/cmd/cmd_suite_test.go +++ b/bydbctl/internal/cmd/cmd_suite_test.go @@ -22,9 +22,18 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + + "github.com/apache/skywalking-banyandb/pkg/logger" ) func TestCmd(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Cmd Suite") } + +var _ = BeforeSuite(func() { + Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: "warn", + })).To(Succeed()) +}) diff --git a/pkg/partition/index.go b/pkg/partition/index.go index a6f8b81..0cf38b3 100644 --- a/pkg/partition/index.go +++ b/pkg/partition/index.go @@ -36,7 +36,9 @@ func ParseIndexRuleLocators(families []*databasev1.TagFamilySpec, indexRules []* tagIndices = append(tagIndices, TagLocator{FamilyOffset: fIndex, TagOffset: tIndex}) } } - locators = append(locators, &IndexRuleLocator{Rule: rule, TagIndices: tagIndices}) + if len(tagIndices) > 0 { + locators = append(locators, &IndexRuleLocator{Rule: rule, TagIndices: tagIndices}) + } } return locators } diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go index a80b0fc..3537ce7 100644 --- a/pkg/query/logical/common.go +++ b/pkg/query/logical/common.go @@ -31,7 +31,9 @@ import ( var ( ErrTagNotDefined = errors.New("tag is not defined") ErrFieldNotDefined = errors.New("field is not defined") - ErrInvalidConditionType = errors.New("invalid pair type") + ErrUnsupportedConditionOp = errors.New("unsupported condition operation") + ErrUnsupportedConditionValue = errors.New("unsupported condition value type") + ErrInvalidCriteriaType = errors.New("invalid criteria type") ErrIncompatibleQueryCondition = errors.New("incompatible query condition type") ErrIndexNotDefined = errors.New("index is not define for the tag") ErrMultipleGlobalIndexes = errors.New("multiple global indexes are not supported") diff --git a/pkg/query/logical/index_filter.go b/pkg/query/logical/index_filter.go index 059ef34..c92ecf8 100644 --- a/pkg/query/logical/index_filter.go +++ b/pkg/query/logical/index_filter.go @@ -96,7 +96,7 @@ func BuildLocalFilter(criteria *model_v1.Criteria, schema Schema, entityDict map } } - return nil, nil, ErrInvalidConditionType + return nil, nil, ErrInvalidCriteriaType } func parseCondition(cond *model_v1.Condition, indexRule *database_v1.IndexRule, expr LiteralExpr, entity tsdb.Entity) (index.Filter, []tsdb.Entity, error) { @@ -140,7 +140,7 @@ func parseCondition(cond *model_v1.Condition, indexRule *database_v1.IndexRule, } return newNot(indexRule, and), []tsdb.Entity{entity}, nil } - return nil, nil, ErrInvalidConditionType + return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, "index filter parses %v", cond) } func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *model_v1.Condition) (LiteralExpr, tsdb.Entity, error) { @@ -148,7 +148,7 @@ func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *mode copy(parsedEntity, entity) entityIdx, ok := entityDict[cond.Name] if ok && cond.Op != model_v1.Condition_BINARY_OP_EQ { - return nil, nil, errors.WithMessagef(ErrInvalidConditionType, "tag belongs to the entity only supports EQ operation in condition(%v)", cond) + return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag belongs to the entity only supports EQ operation in condition(%v)", cond) } switch v := cond.Value.Value.(type) { case *model_v1.TagValue_Str: @@ -183,7 +183,7 @@ func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *mode case *model_v1.TagValue_Null: return nullLiteralExpr, nil, nil } - return nil, nil, ErrInvalidConditionType + return nil, nil, errors.WithMessagef(ErrUnsupportedConditionValue, "index filter parses %v", cond) } func parseEntities(op model_v1.LogicalExpression_LogicalOp, input tsdb.Entity, left, right []tsdb.Entity) []tsdb.Entity { diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go index fd0c58a..5e0ef95 100644 --- a/pkg/query/logical/tag_filter.go +++ b/pkg/query/logical/tag_filter.go @@ -35,6 +35,9 @@ type TagFilter interface { } func BuildTagFilter(criteria *model_v1.Criteria, entityDict map[string]int, schema Schema, hasGlobalIndex bool) (TagFilter, error) { + if criteria == nil { + return BypassFilter, nil + } switch criteria.GetExp().(type) { case *model_v1.Criteria_Condition: cond := criteria.GetCondition() @@ -77,7 +80,7 @@ func BuildTagFilter(criteria *model_v1.Criteria, entityDict map[string]int, sche } } - return nil, ErrInvalidConditionType + return nil, ErrInvalidCriteriaType } func parseFilter(cond *model_v1.Condition, expr ComparableExpr) (TagFilter, error) { @@ -109,7 +112,7 @@ func parseFilter(cond *model_v1.Condition, expr ComparableExpr) (TagFilter, erro case model_v1.Condition_BINARY_OP_NOT_HAVING: return newNotTag(newHavingTag(cond.Name, expr)), nil } - return nil, ErrInvalidConditionType + return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag filter parses %v", cond) } func parseExpr(value *model_v1.TagValue) (ComparableExpr, error) { @@ -133,7 +136,7 @@ func parseExpr(value *model_v1.TagValue) (ComparableExpr, error) { case *model_v1.TagValue_Null: return nullLiteralExpr, nil } - return nil, ErrInvalidConditionType + return nil, errors.WithMessagef(ErrUnsupportedConditionValue, "tag filter parses %v", value) } var BypassFilter = new(emptyFilter) diff --git a/test/cases/stream/data/input/global_index.yaml b/test/cases/stream/data/input/global_index.yaml index 84267ef..fb1b2b0 100644 --- a/test/cases/stream/data/input/global_index.yaml +++ b/test/cases/stream/data/input/global_index.yaml @@ -25,9 +25,12 @@ projection: - name: "data" tags: ["data_binary"] criteria: - condition: - name: "trace_id" - op: "BINARY_OP_EQ" - value: - str: - value: "1" + le: + op: "LOGICAL_OP_AND" + left: + condition: + name: "trace_id" + op: "BINARY_OP_EQ" + value: + str: + value: "1" diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go index 210b973..d99d11c 100644 --- a/test/cases/stream/stream.go +++ b/test/cases/stream/stream.go @@ -48,7 +48,7 @@ var _ = g.DescribeTable("Scanning Streams", verify, End: timestamppb.New(time.Unix(0, math.MaxInt64).Truncate(time.Millisecond)), }), g.Entry("sort desc", helpers.Args{Input: "sort_desc", Duration: 1 * time.Hour}), - g.Entry("global index", helpers.Args{Input: "global_index", Duration: 1 * time.Hour}), + g.FEntry("global index", helpers.Args{Input: "global_index", Duration: 1 * time.Hour}), g.Entry("filter by non-indexed tag", helpers.Args{Input: "filter_tag", Duration: 1 * time.Hour}), g.Entry("get empty result by non-indexed tag", helpers.Args{Input: "filter_tag_empty", Duration: 1 * time.Hour, WantEmpty: true}), g.Entry("numeric local index: less", helpers.Args{Input: "less", Duration: 1 * time.Hour}),
