This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch doc in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 03d2629635113f7a7b4e593012def7e65d95f5a1 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Oct 2 02:07:54 2024 +0000 Fix some nits Signed-off-by: Gao Hongtao <[email protected]> --- banyand/Dockerfile | 3 +++ banyand/measure/query.go | 29 ++++++++++++++++++---- banyand/measure/write.go | 8 +----- banyand/metadata/schema/etcd.go | 6 ++--- banyand/queue/sub/sub.go | 9 +++++-- banyand/stream/query.go | 20 ++++++++++++--- banyand/stream/write.go | 8 +----- pkg/index/inverted/inverted.go | 2 +- pkg/index/inverted/inverted_series.go | 9 +++++-- .../logical/stream/stream_plan_indexscan_local.go | 5 ++++ test/docker/Dockerfile | 2 ++ 11 files changed, 69 insertions(+), 32 deletions(-) diff --git a/banyand/Dockerfile b/banyand/Dockerfile index 42db44e5..5f21348f 100644 --- a/banyand/Dockerfile +++ b/banyand/Dockerfile @@ -34,6 +34,9 @@ COPY build/bin/windows/${TARGETARCH}/banyand-server-static "/banyand" FROM build-${TARGETOS} AS final +ENV GRPC_GO_LOG_SEVERITY_LEVEL=ERROR +ENV GRPC_GO_LOG_FORMATTER=json + EXPOSE 17912 EXPOSE 17913 EXPOSE 6060 diff --git a/banyand/measure/query.go b/banyand/measure/query.go index ac1ccfa0..d9d19918 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -41,7 +41,8 @@ import ( ) const ( - preloadSize = 100 + preloadSize = 100 + checkDoneEvery = 128 ) // Query allow to retrieve measure data points. @@ -252,12 +253,16 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids [] if tstIter.Error() != nil { return fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) } + var hit int for tstIter.nextBlock() { - select { - case <-ctx.Done(): - return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(result.data), len(tstIter.piHeap), len(tstIter.piPool)) - default: + if hit%checkDoneEvery == 0 { + select { + case <-ctx.Done(): + return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(result.data), len(tstIter.piHeap), len(tstIter.piPool)) + default: + } } + hit++ bc := generateBlockCursor() p := tstIter.piHeap[0] bc.init(p.p, p.curBlock, qo) @@ -426,6 +431,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue { type queryResult struct { ctx context.Context + hit int sidToIndex map[common.SeriesID]int storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue tagProjection []model.TagProjection @@ -438,6 +444,13 @@ type queryResult struct { } func (qr *queryResult) Pull() *model.MeasureResult { + select { + case <-qr.ctx.Done(): + return &model.MeasureResult{ + Error: errors.WithMessagef(qr.ctx.Err(), "interrupt: hit %d", qr.hit), + } + default: + } if !qr.loaded { if len(qr.data) == 0 { return nil @@ -446,6 +459,12 @@ func (qr *queryResult) Pull() *model.MeasureResult { cursorChan := make(chan int, len(qr.data)) for i := 0; i < len(qr.data); i++ { go func(i int) { + select { + case <-qr.ctx.Done(): + cursorChan <- i + return + default: + } tmpBlock := generateBlock() defer releaseBlock(tmpBlock) if !qr.data[i].loadData(tmpBlock) { diff --git a/banyand/measure/write.go b/banyand/measure/write.go index da5ba404..64362ab2 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -238,7 +238,7 @@ func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPoi return dpt, nil } -func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { +func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Message) { events, ok := message.Data().([]any) if !ok { w.l.Warn().Msg("invalid event data type") @@ -250,12 +250,6 @@ func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus. } groups := make(map[string]*dataPointsInGroup) for i := range events { - select { - case <-ctx.Done(): - w.l.Warn().Msgf("context is done, handled %d events", i) - break - default: - } var writeEvent *measurev1.InternalWriteRequest switch e := events[i].(type) { case *measurev1.InternalWriteRequest: diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index b9a7e601..0e112039 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -521,10 +521,8 @@ func (e *etcdSchemaRegistry) revokeLease(lease *clientv3.LeaseGrantResponse) { ctx, cancel := context.WithTimeout(context.Background(), leaseDuration) defer cancel() _, err := e.client.Lease.Revoke(ctx, lease.ID) - if err != nil { - if !errors.Is(err, context.DeadlineExceeded) { - e.l.Error().Err(err).Msgf("failed to revoke lease %d", lease.ID) - } + if err != nil && e.l.Debug().Enabled() { + e.l.Debug().Err(err).Msgf("failed to revoke lease %d", lease.ID) } } diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go index 0b3f2ae9..46db4174 100644 --- a/banyand/queue/sub/sub.go +++ b/banyand/queue/sub/sub.go @@ -39,12 +39,11 @@ func (s *server) Send(stream clusterv1.Service_SendServer) error { reply := func(writeEntity *clusterv1.SendRequest, err error, message string) { s.log.Error().Stringer("request", writeEntity).Err(err).Msg(message) s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic) - s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic) if errResp := stream.Send(&clusterv1.SendResponse{ MessageId: writeEntity.MessageId, Error: message, }); errResp != nil { - s.log.Err(errResp).AnErr("original", err).Stringer("request", writeEntity).Msg("failed to send error response") + s.log.Error().Err(errResp).AnErr("original", err).Stringer("request", writeEntity).Msg("failed to send error response") s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic) } } @@ -153,6 +152,12 @@ func (s *server) Send(stream clusterv1.Service_SendServer) error { case proto.Message: message = d case common.Error: + select { + case <-ctx.Done(): + s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic) + return ctx.Err() + default: + } reply(writeEntity, nil, d.Msg()) continue default: diff --git a/banyand/stream/query.go b/banyand/stream/query.go index ee335c3c..d96e39ee 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -42,6 +42,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/model" ) +const checkDoneEvery = 128 + func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr model.StreamQueryResult, err error) { if sqo.TimeRange == nil || len(sqo.Entities) < 1 { return nil, errors.New("invalid query options: timeRange and series are required") @@ -198,12 +200,16 @@ func (qr *queryResult) scanParts(ctx context.Context, qo queryOptions) error { if ti.Error() != nil { return fmt.Errorf("cannot init tstIter: %w", ti.Error()) } + var hit int for ti.nextBlock() { - select { - case <-ctx.Done(): - return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), len(ti.piHeap), len(ti.piPool)) - default: + if hit%checkDoneEvery == 0 { + select { + case <-ctx.Done(): + return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), len(ti.piHeap), len(ti.piPool)) + default: + } } + hit++ bc := generateBlockCursor() p := ti.piHeap[0] bc.init(p.p, p.curBlock, qo) @@ -229,6 +235,12 @@ func (qr *queryResult) load(ctx context.Context, qo queryOptions) *model.StreamR cursorChan := make(chan int, len(qr.data)) for i := 0; i < len(qr.data); i++ { go func(i int) { + select { + case <-ctx.Done(): + cursorChan <- i + return + default: + } tmpBlock := generateBlock() defer releaseBlock(tmpBlock) if !qr.data[i].loadData(tmpBlock) { diff --git a/banyand/stream/write.go b/banyand/stream/write.go index 1349eeca..610a6781 100644 --- a/banyand/stream/write.go +++ b/banyand/stream/write.go @@ -219,7 +219,7 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre return dst, nil } -func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { +func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Message) { events, ok := message.Data().([]any) if !ok { w.l.Warn().Msg("invalid event data type") @@ -232,12 +232,6 @@ func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus. groups := make(map[string]*elementsInGroup) var builder strings.Builder for i := range events { - select { - case <-ctx.Done(): - w.l.Warn().Msgf("context is done, handled %d events", i) - break - default: - } var writeEvent *streamv1.InternalWriteRequest switch e := events[i].(type) { case *streamv1.InternalWriteRequest: diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 4987ddb5..5d35d6aa 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -380,7 +380,7 @@ func (bmi *blugeMatchIterator) Next() bool { bmi.err = io.EOF return false } - bmi.hit++ + bmi.hit = match.HitNumber for i := range bmi.current.Values { bmi.current.Values[i] = nil } diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index f9b700f4..7288dd31 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -113,12 +113,17 @@ func (s *store) Search(ctx context.Context, func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey) ([]index.SeriesDocument, error) { result := make([]index.SeriesDocument, 0, 10) next, err := dmi.Next() + if err != nil { + return nil, errors.WithMessage(err, "iterate document match iterator") + } docIDMap := make(map[uint64]struct{}) fields := make([]string, 0, len(loadedFields)) for i := range loadedFields { fields = append(fields, loadedFields[i].Marshal()) } + var hitNumber int for err == nil && next != nil { + hitNumber = next.HitNumber var doc index.SeriesDocument if len(loadedFields) > 0 { doc.Fields = make(map[string][]byte) @@ -144,7 +149,7 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey return true }) if err != nil { - return nil, errors.WithMessagef(err, "visit stored fields, hit: %d", len(result)) + return nil, errors.WithMessagef(err, "visit stored fields, hit: %d", hitNumber) } if doc.Key.ID > 0 { result = append(result, doc) @@ -152,7 +157,7 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey next, err = dmi.Next() } if err != nil { - return nil, errors.WithMessagef(err, "iterate document match iterator, hit: %d", len(result)) + return nil, errors.WithMessagef(err, "iterate document match iterator, hit: %d", hitNumber) } return result, nil } diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go b/pkg/query/logical/stream/stream_plan_indexscan_local.go index 49723340..f7db768a 100644 --- a/pkg/query/logical/stream/stream_plan_indexscan_local.go +++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go @@ -73,6 +73,11 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) { } func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } if i.result != nil { return BuildElementsFromStreamResult(ctx, i.result), nil } diff --git a/test/docker/Dockerfile b/test/docker/Dockerfile index a8b068c4..4ce236a5 100644 --- a/test/docker/Dockerfile +++ b/test/docker/Dockerfile @@ -26,6 +26,8 @@ COPY banyand/build/bin/linux/${TARGETARCH}/banyand-server-static /banyand COPY bydbctl/build/bin/linux/${TARGETARCH}/bydbctl-cli-static /bydbctl COPY --from=certs /etc/ssl/certs /etc/ssl/certs +ENV GRPC_GO_LOG_SEVERITY_LEVEL=INFO +ENV GRPC_GO_LOG_FORMATTER=json EXPOSE 17912 EXPOSE 17913
