This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch client
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5267d6c13c2ef820c766cc551c371c2a82c8c823
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Oct 13 04:27:49 2022 +0000

    Fixes some issues found by Java client
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/liaison/grpc/server.go                 | 18 ++++++++++--------
 banyand/liaison/http/server.go                 | 23 +++++++++++++----------
 banyand/tsdb/index/writer.go                   |  4 +++-
 bydbctl/internal/cmd/cmd_suite_test.go         |  9 +++++++++
 pkg/partition/index.go                         |  4 +++-
 pkg/query/logical/common.go                    |  4 +++-
 pkg/query/logical/index_filter.go              |  8 ++++----
 pkg/query/logical/tag_filter.go                |  9 ++++++---
 test/cases/stream/data/input/global_index.yaml | 15 +++++++++------
 test/cases/stream/stream.go                    |  2 +-
 10 files changed, 61 insertions(+), 35 deletions(-)

diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index aa23da8..66cc2b4 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -178,13 +178,6 @@ func (s *Server) Validate() error {
 }
 
 func (s *Server) Serve() run.StopNotify {
-       lis, err := net.Listen("tcp", s.addr)
-       if err != nil {
-               s.log.Fatal().Err(err).Msg("Failed to listen")
-       }
-       if errValidate := s.Validate(); errValidate != nil {
-               s.log.Fatal().Err(errValidate).Msg("Failed to validate data")
-       }
        var opts []grpclib.ServerOption
        if s.tls {
                opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
@@ -208,8 +201,17 @@ func (s *Server) Serve() run.StopNotify {
 
        s.stopCh = make(chan struct{})
        go func() {
+               lis, err := net.Listen("tcp", s.addr)
+               if err != nil {
+                       s.log.Error().Err(err).Msg("Failed to listen")
+                       close(s.stopCh)
+                       return
+               }
                s.log.Info().Str("addr", s.addr).Msg("Listening to")
-               _ = s.ser.Serve(lis)
+               err = s.ser.Serve(lis)
+               if err != nil {
+                       s.log.Error().Err(err).Msg("server is interrupted")
+               }
                close(s.stopCh)
        }()
        return s.stopCh
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index 7800296..e416286 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -91,7 +91,14 @@ func (p *service) PreRun() error {
        fileServer := stdhttp.FileServer(stdhttp.FS(fSys))
        serveIndex := serveFileContents("index.html", httpFS)
        p.mux.Mount("/", intercept404(fileServer, serveIndex))
+       p.srv = &stdhttp.Server{
+               Addr:    p.listenAddr,
+               Handler: p.mux,
+       }
+       return nil
+}
 
+func (p *service) Serve() run.StopNotify {
        var ctx context.Context
        ctx, p.clientCloser = context.WithCancel(context.Background())
        opts := []grpc.DialOption{
@@ -100,7 +107,9 @@ func (p *service) PreRun() error {
        }
        client, err := newHealthCheckClient(ctx, p.l, p.grpcAddr, opts)
        if err != nil {
-               return err
+               p.l.Error().Err(err).Msg("Failed to health check client")
+               close(p.stopCh)
+               return p.stopCh
        }
        gwMux := runtime.NewServeMux(runtime.WithHealthzEndpoint(client))
        err = multierr.Combine(
@@ -114,17 +123,11 @@ func (p *service) PreRun() error {
                property_v1.RegisterPropertyServiceHandlerFromEndpoint(ctx, 
gwMux, p.grpcAddr, opts),
        )
        if err != nil {
-               return err
+               p.l.Error().Err(err).Msg("Failed to register endpoints")
+               close(p.stopCh)
+               return p.stopCh
        }
        p.mux.Mount("/api", http.StripPrefix("/api", gwMux))
-       p.srv = &stdhttp.Server{
-               Addr:    p.listenAddr,
-               Handler: p.mux,
-       }
-       return nil
-}
-
-func (p *service) Serve() run.StopNotify {
        go func() {
                p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start liaison 
http server")
                if err := p.srv.ListenAndServe(); err != http.ErrServerClosed {
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index a7b72f4..95dcf48 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -242,7 +242,9 @@ func getIndexValue(ruleIndex *partition.IndexRuleLocator, 
value Value) (val [][]
        val = make([][]byte, 0)
        var existInt bool
        if len(ruleIndex.TagIndices) != 1 {
-               return nil, false, errors.Wrap(ErrUnsupportedIndexType, "the 
index rule didn't support composited tags")
+               return nil, false, errors.WithMessagef(ErrUnsupportedIndexType,
+                       "the index rule %s(%v) didn't support composited tags",
+                       ruleIndex.Rule.Metadata.Name, ruleIndex.Rule.Tags)
        }
        tIndex := ruleIndex.TagIndices[0]
        tag, err := partition.GetTagByOffset(value.TagFamilies, 
tIndex.FamilyOffset, tIndex.TagOffset)
diff --git a/bydbctl/internal/cmd/cmd_suite_test.go 
b/bydbctl/internal/cmd/cmd_suite_test.go
index 36313e2..6dc56ae 100644
--- a/bydbctl/internal/cmd/cmd_suite_test.go
+++ b/bydbctl/internal/cmd/cmd_suite_test.go
@@ -22,9 +22,18 @@ import (
 
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 func TestCmd(t *testing.T) {
        RegisterFailHandler(Fail)
        RunSpecs(t, "Cmd Suite")
 }
+
+var _ = BeforeSuite(func() {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: "warn",
+       })).To(Succeed())
+})
diff --git a/pkg/partition/index.go b/pkg/partition/index.go
index a6f8b81..0cf38b3 100644
--- a/pkg/partition/index.go
+++ b/pkg/partition/index.go
@@ -36,7 +36,9 @@ func ParseIndexRuleLocators(families 
[]*databasev1.TagFamilySpec, indexRules []*
                                tagIndices = append(tagIndices, 
TagLocator{FamilyOffset: fIndex, TagOffset: tIndex})
                        }
                }
-               locators = append(locators, &IndexRuleLocator{Rule: rule, 
TagIndices: tagIndices})
+               if len(tagIndices) > 0 {
+                       locators = append(locators, &IndexRuleLocator{Rule: 
rule, TagIndices: tagIndices})
+               }
        }
        return locators
 }
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index a80b0fc..3537ce7 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -31,7 +31,9 @@ import (
 var (
        ErrTagNotDefined              = errors.New("tag is not defined")
        ErrFieldNotDefined            = errors.New("field is not defined")
-       ErrInvalidConditionType       = errors.New("invalid pair type")
+       ErrUnsupportedConditionOp     = errors.New("unsupported condition 
operation")
+       ErrUnsupportedConditionValue  = errors.New("unsupported condition value 
type")
+       ErrInvalidCriteriaType        = errors.New("invalid criteria type")
        ErrIncompatibleQueryCondition = errors.New("incompatible query 
condition type")
        ErrIndexNotDefined            = errors.New("index is not define for the 
tag")
        ErrMultipleGlobalIndexes      = errors.New("multiple global indexes are 
not supported")
diff --git a/pkg/query/logical/index_filter.go 
b/pkg/query/logical/index_filter.go
index 059ef34..c92ecf8 100644
--- a/pkg/query/logical/index_filter.go
+++ b/pkg/query/logical/index_filter.go
@@ -96,7 +96,7 @@ func BuildLocalFilter(criteria *model_v1.Criteria, schema 
Schema, entityDict map
                }
 
        }
-       return nil, nil, ErrInvalidConditionType
+       return nil, nil, ErrInvalidCriteriaType
 }
 
 func parseCondition(cond *model_v1.Condition, indexRule 
*database_v1.IndexRule, expr LiteralExpr, entity tsdb.Entity) (index.Filter, 
[]tsdb.Entity, error) {
@@ -140,7 +140,7 @@ func parseCondition(cond *model_v1.Condition, indexRule 
*database_v1.IndexRule,
                }
                return newNot(indexRule, and), []tsdb.Entity{entity}, nil
        }
-       return nil, nil, ErrInvalidConditionType
+       return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, "index 
filter parses %v", cond)
 }
 
 func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond 
*model_v1.Condition) (LiteralExpr, tsdb.Entity, error) {
@@ -148,7 +148,7 @@ func parseExprOrEntity(entityDict map[string]int, entity 
tsdb.Entity, cond *mode
        copy(parsedEntity, entity)
        entityIdx, ok := entityDict[cond.Name]
        if ok && cond.Op != model_v1.Condition_BINARY_OP_EQ {
-               return nil, nil, errors.WithMessagef(ErrInvalidConditionType, 
"tag belongs to the entity only supports EQ operation in condition(%v)", cond)
+               return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, 
"tag belongs to the entity only supports EQ operation in condition(%v)", cond)
        }
        switch v := cond.Value.Value.(type) {
        case *model_v1.TagValue_Str:
@@ -183,7 +183,7 @@ func parseExprOrEntity(entityDict map[string]int, entity 
tsdb.Entity, cond *mode
        case *model_v1.TagValue_Null:
                return nullLiteralExpr, nil, nil
        }
-       return nil, nil, ErrInvalidConditionType
+       return nil, nil, errors.WithMessagef(ErrUnsupportedConditionValue, 
"index filter parses %v", cond)
 }
 
 func parseEntities(op model_v1.LogicalExpression_LogicalOp, input tsdb.Entity, 
left, right []tsdb.Entity) []tsdb.Entity {
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index fd0c58a..5e0ef95 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -35,6 +35,9 @@ type TagFilter interface {
 }
 
 func BuildTagFilter(criteria *model_v1.Criteria, entityDict map[string]int, 
schema Schema, hasGlobalIndex bool) (TagFilter, error) {
+       if criteria == nil {
+               return BypassFilter, nil
+       }
        switch criteria.GetExp().(type) {
        case *model_v1.Criteria_Condition:
                cond := criteria.GetCondition()
@@ -77,7 +80,7 @@ func BuildTagFilter(criteria *model_v1.Criteria, entityDict 
map[string]int, sche
                }
 
        }
-       return nil, ErrInvalidConditionType
+       return nil, ErrInvalidCriteriaType
 }
 
 func parseFilter(cond *model_v1.Condition, expr ComparableExpr) (TagFilter, 
error) {
@@ -109,7 +112,7 @@ func parseFilter(cond *model_v1.Condition, expr 
ComparableExpr) (TagFilter, erro
        case model_v1.Condition_BINARY_OP_NOT_HAVING:
                return newNotTag(newHavingTag(cond.Name, expr)), nil
        }
-       return nil, ErrInvalidConditionType
+       return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag filter 
parses %v", cond)
 }
 
 func parseExpr(value *model_v1.TagValue) (ComparableExpr, error) {
@@ -133,7 +136,7 @@ func parseExpr(value *model_v1.TagValue) (ComparableExpr, 
error) {
        case *model_v1.TagValue_Null:
                return nullLiteralExpr, nil
        }
-       return nil, ErrInvalidConditionType
+       return nil, errors.WithMessagef(ErrUnsupportedConditionValue, "tag 
filter parses %v", value)
 }
 
 var BypassFilter = new(emptyFilter)
diff --git a/test/cases/stream/data/input/global_index.yaml 
b/test/cases/stream/data/input/global_index.yaml
index 84267ef..fb1b2b0 100644
--- a/test/cases/stream/data/input/global_index.yaml
+++ b/test/cases/stream/data/input/global_index.yaml
@@ -25,9 +25,12 @@ projection:
   - name: "data"
     tags: ["data_binary"]
 criteria:
-  condition:
-    name: "trace_id"
-    op: "BINARY_OP_EQ"
-    value:
-      str:
-        value: "1"
+  le:
+    op: "LOGICAL_OP_AND"
+    left:
+      condition:
+        name: "trace_id"
+        op: "BINARY_OP_EQ"
+        value:
+          str:
+            value: "1"
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index 210b973..d99d11c 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -48,7 +48,7 @@ var _ = g.DescribeTable("Scanning Streams", verify,
                End:   timestamppb.New(time.Unix(0, 
math.MaxInt64).Truncate(time.Millisecond)),
        }),
        g.Entry("sort desc", helpers.Args{Input: "sort_desc", Duration: 1 * 
time.Hour}),
-       g.Entry("global index", helpers.Args{Input: "global_index", Duration: 1 
* time.Hour}),
+       g.FEntry("global index", helpers.Args{Input: "global_index", Duration: 
1 * time.Hour}),
        g.Entry("filter by non-indexed tag", helpers.Args{Input: "filter_tag", 
Duration: 1 * time.Hour}),
        g.Entry("get empty result by non-indexed tag", helpers.Args{Input: 
"filter_tag_empty", Duration: 1 * time.Hour, WantEmpty: true}),
        g.Entry("numeric local index: less", helpers.Args{Input: "less", 
Duration: 1 * time.Hour}),

Reply via email to