This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch doc
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 03d2629635113f7a7b4e593012def7e65d95f5a1
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Oct 2 02:07:54 2024 +0000

    Fix some nits
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/Dockerfile                                 |  3 +++
 banyand/measure/query.go                           | 29 ++++++++++++++++++----
 banyand/measure/write.go                           |  8 +-----
 banyand/metadata/schema/etcd.go                    |  6 ++---
 banyand/queue/sub/sub.go                           |  9 +++++--
 banyand/stream/query.go                            | 20 ++++++++++++---
 banyand/stream/write.go                            |  8 +-----
 pkg/index/inverted/inverted.go                     |  2 +-
 pkg/index/inverted/inverted_series.go              |  9 +++++--
 .../logical/stream/stream_plan_indexscan_local.go  |  5 ++++
 test/docker/Dockerfile                             |  2 ++
 11 files changed, 69 insertions(+), 32 deletions(-)

diff --git a/banyand/Dockerfile b/banyand/Dockerfile
index 42db44e5..5f21348f 100644
--- a/banyand/Dockerfile
+++ b/banyand/Dockerfile
@@ -34,6 +34,9 @@ COPY build/bin/windows/${TARGETARCH}/banyand-server-static 
"/banyand"
 
 FROM build-${TARGETOS} AS final
 
+ENV GRPC_GO_LOG_SEVERITY_LEVEL=ERROR
+ENV GRPC_GO_LOG_FORMATTER=json
+
 EXPOSE 17912
 EXPOSE 17913
 EXPOSE 6060
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index ac1ccfa0..d9d19918 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -41,7 +41,8 @@ import (
 )
 
 const (
-       preloadSize = 100
+       preloadSize    = 100
+       checkDoneEvery = 128
 )
 
 // Query allow to retrieve measure data points.
@@ -252,12 +253,16 @@ func (s *measure) searchBlocks(ctx context.Context, 
result *queryResult, sids []
        if tstIter.Error() != nil {
                return fmt.Errorf("cannot init tstIter: %w", tstIter.Error())
        }
+       var hit int
        for tstIter.nextBlock() {
-               select {
-               case <-ctx.Done():
-                       return errors.WithMessagef(ctx.Err(), "interrupt: 
scanned %d blocks, remained %d/%d parts to scan", len(result.data), 
len(tstIter.piHeap), len(tstIter.piPool))
-               default:
+               if hit%checkDoneEvery == 0 {
+                       select {
+                       case <-ctx.Done():
+                               return errors.WithMessagef(ctx.Err(), 
"interrupt: scanned %d blocks, remained %d/%d parts to scan", len(result.data), 
len(tstIter.piHeap), len(tstIter.piPool))
+                       default:
+                       }
                }
+               hit++
                bc := generateBlockCursor()
                p := tstIter.piHeap[0]
                bc.init(p.p, p.curBlock, qo)
@@ -426,6 +431,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue 
{
 
 type queryResult struct {
        ctx              context.Context
+       hit              int
        sidToIndex       map[common.SeriesID]int
        storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue
        tagProjection    []model.TagProjection
@@ -438,6 +444,13 @@ type queryResult struct {
 }
 
 func (qr *queryResult) Pull() *model.MeasureResult {
+       select {
+       case <-qr.ctx.Done():
+               return &model.MeasureResult{
+                       Error: errors.WithMessagef(qr.ctx.Err(), "interrupt: 
hit %d", qr.hit),
+               }
+       default:
+       }
        if !qr.loaded {
                if len(qr.data) == 0 {
                        return nil
@@ -446,6 +459,12 @@ func (qr *queryResult) Pull() *model.MeasureResult {
                cursorChan := make(chan int, len(qr.data))
                for i := 0; i < len(qr.data); i++ {
                        go func(i int) {
+                               select {
+                               case <-qr.ctx.Done():
+                                       cursorChan <- i
+                                       return
+                               default:
+                               }
                                tmpBlock := generateBlock()
                                defer releaseBlock(tmpBlock)
                                if !qr.data[i].loadData(tmpBlock) {
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index da5ba404..64362ab2 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -238,7 +238,7 @@ func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, 
option], dpg *dataPoi
        return dpt, nil
 }
 
-func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp 
bus.Message) {
+func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp 
bus.Message) {
        events, ok := message.Data().([]any)
        if !ok {
                w.l.Warn().Msg("invalid event data type")
@@ -250,12 +250,6 @@ func (w *writeCallback) Rev(ctx context.Context, message 
bus.Message) (resp bus.
        }
        groups := make(map[string]*dataPointsInGroup)
        for i := range events {
-               select {
-               case <-ctx.Done():
-                       w.l.Warn().Msgf("context is done, handled %d events", i)
-                       break
-               default:
-               }
                var writeEvent *measurev1.InternalWriteRequest
                switch e := events[i].(type) {
                case *measurev1.InternalWriteRequest:
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index b9a7e601..0e112039 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -521,10 +521,8 @@ func (e *etcdSchemaRegistry) revokeLease(lease 
*clientv3.LeaseGrantResponse) {
        ctx, cancel := context.WithTimeout(context.Background(), leaseDuration)
        defer cancel()
        _, err := e.client.Lease.Revoke(ctx, lease.ID)
-       if err != nil {
-               if !errors.Is(err, context.DeadlineExceeded) {
-                       e.l.Error().Err(err).Msgf("failed to revoke lease %d", 
lease.ID)
-               }
+       if err != nil && e.l.Debug().Enabled() {
+               e.l.Debug().Err(err).Msgf("failed to revoke lease %d", lease.ID)
        }
 }
 
diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go
index 0b3f2ae9..46db4174 100644
--- a/banyand/queue/sub/sub.go
+++ b/banyand/queue/sub/sub.go
@@ -39,12 +39,11 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
        reply := func(writeEntity *clusterv1.SendRequest, err error, message 
string) {
                s.log.Error().Stringer("request", 
writeEntity).Err(err).Msg(message)
                s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic)
-               s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
                if errResp := stream.Send(&clusterv1.SendResponse{
                        MessageId: writeEntity.MessageId,
                        Error:     message,
                }); errResp != nil {
-                       s.log.Err(errResp).AnErr("original", 
err).Stringer("request", writeEntity).Msg("failed to send error response")
+                       s.log.Error().Err(errResp).AnErr("original", 
err).Stringer("request", writeEntity).Msg("failed to send error response")
                        s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
                }
        }
@@ -153,6 +152,12 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
                case proto.Message:
                        message = d
                case common.Error:
+                       select {
+                       case <-ctx.Done():
+                               s.metrics.totalMsgReceivedErr.Inc(1, 
writeEntity.Topic)
+                               return ctx.Err()
+                       default:
+                       }
                        reply(writeEntity, nil, d.Msg())
                        continue
                default:
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index ee335c3c..d96e39ee 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -42,6 +42,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
+const checkDoneEvery = 128
+
 func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr 
model.StreamQueryResult, err error) {
        if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
                return nil, errors.New("invalid query options: timeRange and 
series are required")
@@ -198,12 +200,16 @@ func (qr *queryResult) scanParts(ctx context.Context, qo 
queryOptions) error {
        if ti.Error() != nil {
                return fmt.Errorf("cannot init tstIter: %w", ti.Error())
        }
+       var hit int
        for ti.nextBlock() {
-               select {
-               case <-ctx.Done():
-                       return errors.WithMessagef(ctx.Err(), "interrupt: 
scanned %d blocks, remained %d/%d parts to scan", len(qr.data), len(ti.piHeap), 
len(ti.piPool))
-               default:
+               if hit%checkDoneEvery == 0 {
+                       select {
+                       case <-ctx.Done():
+                               return errors.WithMessagef(ctx.Err(), 
"interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), 
len(ti.piHeap), len(ti.piPool))
+                       default:
+                       }
                }
+               hit++
                bc := generateBlockCursor()
                p := ti.piHeap[0]
                bc.init(p.p, p.curBlock, qo)
@@ -229,6 +235,12 @@ func (qr *queryResult) load(ctx context.Context, qo 
queryOptions) *model.StreamR
                cursorChan := make(chan int, len(qr.data))
                for i := 0; i < len(qr.data); i++ {
                        go func(i int) {
+                               select {
+                               case <-ctx.Done():
+                                       cursorChan <- i
+                                       return
+                               default:
+                               }
                                tmpBlock := generateBlock()
                                defer releaseBlock(tmpBlock)
                                if !qr.data[i].loadData(tmpBlock) {
diff --git a/banyand/stream/write.go b/banyand/stream/write.go
index 1349eeca..610a6781 100644
--- a/banyand/stream/write.go
+++ b/banyand/stream/write.go
@@ -219,7 +219,7 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
        return dst, nil
 }
 
-func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp 
bus.Message) {
+func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp 
bus.Message) {
        events, ok := message.Data().([]any)
        if !ok {
                w.l.Warn().Msg("invalid event data type")
@@ -232,12 +232,6 @@ func (w *writeCallback) Rev(ctx context.Context, message 
bus.Message) (resp bus.
        groups := make(map[string]*elementsInGroup)
        var builder strings.Builder
        for i := range events {
-               select {
-               case <-ctx.Done():
-                       w.l.Warn().Msgf("context is done, handled %d events", i)
-                       break
-               default:
-               }
                var writeEvent *streamv1.InternalWriteRequest
                switch e := events[i].(type) {
                case *streamv1.InternalWriteRequest:
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 4987ddb5..5d35d6aa 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -380,7 +380,7 @@ func (bmi *blugeMatchIterator) Next() bool {
                bmi.err = io.EOF
                return false
        }
-       bmi.hit++
+       bmi.hit = match.HitNumber
        for i := range bmi.current.Values {
                bmi.current.Values[i] = nil
        }
diff --git a/pkg/index/inverted/inverted_series.go 
b/pkg/index/inverted/inverted_series.go
index f9b700f4..7288dd31 100644
--- a/pkg/index/inverted/inverted_series.go
+++ b/pkg/index/inverted/inverted_series.go
@@ -113,12 +113,17 @@ func (s *store) Search(ctx context.Context,
 func parseResult(dmi search.DocumentMatchIterator, loadedFields 
[]index.FieldKey) ([]index.SeriesDocument, error) {
        result := make([]index.SeriesDocument, 0, 10)
        next, err := dmi.Next()
+       if err != nil {
+               return nil, errors.WithMessage(err, "iterate document match 
iterator")
+       }
        docIDMap := make(map[uint64]struct{})
        fields := make([]string, 0, len(loadedFields))
        for i := range loadedFields {
                fields = append(fields, loadedFields[i].Marshal())
        }
+       var hitNumber int
        for err == nil && next != nil {
+               hitNumber = next.HitNumber
                var doc index.SeriesDocument
                if len(loadedFields) > 0 {
                        doc.Fields = make(map[string][]byte)
@@ -144,7 +149,7 @@ func parseResult(dmi search.DocumentMatchIterator, 
loadedFields []index.FieldKey
                        return true
                })
                if err != nil {
-                       return nil, errors.WithMessagef(err, "visit stored 
fields, hit: %d", len(result))
+                       return nil, errors.WithMessagef(err, "visit stored 
fields, hit: %d", hitNumber)
                }
                if doc.Key.ID > 0 {
                        result = append(result, doc)
@@ -152,7 +157,7 @@ func parseResult(dmi search.DocumentMatchIterator, 
loadedFields []index.FieldKey
                next, err = dmi.Next()
        }
        if err != nil {
-               return nil, errors.WithMessagef(err, "iterate document match 
iterator, hit: %d", len(result))
+               return nil, errors.WithMessagef(err, "iterate document match 
iterator, hit: %d", hitNumber)
        }
        return result, nil
 }
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go 
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 49723340..f7db768a 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -73,6 +73,11 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) {
 }
 
 func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, 
error) {
+       select {
+       case <-ctx.Done():
+               return nil, ctx.Err()
+       default:
+       }
        if i.result != nil {
                return BuildElementsFromStreamResult(ctx, i.result), nil
        }
diff --git a/test/docker/Dockerfile b/test/docker/Dockerfile
index a8b068c4..4ce236a5 100644
--- a/test/docker/Dockerfile
+++ b/test/docker/Dockerfile
@@ -26,6 +26,8 @@ COPY 
banyand/build/bin/linux/${TARGETARCH}/banyand-server-static /banyand
 COPY bydbctl/build/bin/linux/${TARGETARCH}/bydbctl-cli-static /bydbctl
 COPY --from=certs /etc/ssl/certs /etc/ssl/certs
 
+ENV GRPC_GO_LOG_SEVERITY_LEVEL=INFO
+ENV GRPC_GO_LOG_FORMATTER=json
 
 EXPOSE 17912
 EXPOSE 17913

Reply via email to