This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 2bad32a5 Add slow query logging (#526)
2bad32a5 is described below

commit 2bad32a58f0eb375c6594ada54909ca90021a209
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Sep 3 20:44:11 2024 +0800

    Add slow query logging (#526)
---
 CHANGES.md                                         |  3 +-
 banyand/dquery/dquery.go                           | 12 ++++++
 banyand/dquery/measure.go                          |  8 +++-
 banyand/dquery/stream.go                           | 14 +++---
 banyand/dquery/topn.go                             | 34 ++++++++++++++-
 banyand/liaison/grpc/measure.go                    |  2 +-
 banyand/liaison/grpc/stream.go                     |  2 +-
 banyand/metadata/embeddedetcd/server.go            |  2 +-
 banyand/metadata/schema/watcher.go                 |  2 +-
 banyand/query/processor.go                         | 50 ++++++----------------
 banyand/query/processor_topn.go                    |  6 +++
 banyand/query/query.go                             | 47 ++++++++++++++++++++
 banyand/queue/sub/sub.go                           | 10 ++---
 docs/operation/configuration.md                    |  2 +
 docs/operation/observability.md                    |  8 ++++
 .../logical/stream/stream_plan_distributed.go      |  2 +-
 16 files changed, 149 insertions(+), 55 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index dd11093b..4cafc211 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -25,6 +25,7 @@ Release Notes.
 - Optimize query performance of series index.
 - Add liaison, remote queue, storage(rotation), time-series tables, metadata 
cache and scheduler metrics.
 - Add HTTP health check endpoint for the data node.
+- Add slow query log for the distributed query and local query.
 
 ### Bugs
 
@@ -60,7 +61,7 @@ Release Notes.
 - Update CI to publish linux/amd64 and linux/arm64 Docker images.
 - Make the build system compiles the binary based on the platform which is 
running on.
 - Push "skywalking-banyandb-test" image for e2e and stress test. This image 
contains bydbctl to do a health check.
-- Set etcd-client log level to "error" and etcd-server log level to "warn".
+- Set etcd-client log level to "error" and etcd-server log level to "error".
 
 ## 0.6.1
 
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index 14e9e775..62ae16f1 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -21,6 +21,7 @@ package dquery
 import (
        "context"
        "errors"
+       "time"
 
        "go.uber.org/multierr"
 
@@ -60,6 +61,7 @@ type queryService struct {
        tqp         *topNQueryProcessor
        closer      *run.Closer
        nodeID      string
+       slowQuery   time.Duration
 }
 
 // NewService return a new query service.
@@ -90,6 +92,16 @@ func (q *queryService) Name() string {
        return moduleName
 }
 
+func (q *queryService) FlagSet() *run.FlagSet {
+       fs := run.NewFlagSet("distributed-query")
+       fs.DurationVar(&q.slowQuery, "dst-slow-query", 0, "distributed slow 
query threshold, 0 means no slow query log")
+       return fs
+}
+
+func (q *queryService) Validate() error {
+       return nil
+}
+
 func (q *queryService) PreRun(ctx context.Context) error {
        val := ctx.Value(common.ContextNodeKey)
        if val == nil {
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index 20f8f76f..04be967a 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -108,7 +108,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                timeRange:   queryCriteria.TimeRange,
        }))
        if err != nil {
-               ml.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
+               ml.Error().Err(err).Dur("latency", 
time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to query")
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to execute the query plan for measure %s: %v", meta.GetName(), err))
                return
        }
@@ -144,5 +144,11 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
        }
        resp = bus.NewMessage(bus.MessageID(now), qr)
+       if !queryCriteria.Trace && p.slowQuery > 0 {
+               latency := time.Since(n)
+               if latency > p.slowQuery {
+                       p.log.Warn().Dur("latency", latency).RawJSON("req", 
logger.Proto(queryCriteria)).Int("resp_count", len(result)).Msg("measure slow 
query")
+               }
+       }
        return
 }
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index a9b34eec..751a8331 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -24,7 +24,6 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/bus"
@@ -80,9 +79,9 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                p.log.Debug().Str("plan", plan.String()).Msg("query plan")
        }
        ctx := context.Background()
-       var tracer *query.Tracer
-       var span *query.Span
        if queryCriteria.Trace {
+               var tracer *query.Tracer
+               var span *query.Span
                tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
                span, ctx = tracer.StartSpan(ctx, "distributed-%s", 
p.queryService.nodeID)
                span.Tag("plan", plan.String())
@@ -93,7 +92,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                                d.Trace = tracer.ToProto()
                        case common.Error:
                                span.Error(errors.New(d.Msg()))
-                               resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.QueryResponse{Trace: tracer.ToProto()})
+                               resp = bus.NewMessage(bus.MessageID(now), 
&streamv1.QueryResponse{Trace: tracer.ToProto()})
                        default:
                                panic("unexpected data type")
                        }
@@ -113,6 +112,11 @@ func (p *streamQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        }
 
        resp = bus.NewMessage(bus.MessageID(now), 
&streamv1.QueryResponse{Elements: entities})
-
+       if !queryCriteria.Trace && p.slowQuery > 0 {
+               latency := time.Since(n)
+               if latency > p.slowQuery {
+                       p.log.Warn().Dur("latency", latency).RawJSON("req", 
logger.Proto(queryCriteria)).Int("resp_count", len(entities)).Msg("stream slow 
query")
+               }
+       }
        return
 }
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index f63ae7c5..a066b6ff 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -18,6 +18,8 @@
 package dquery
 
 import (
+       "context"
+       "errors"
        "time"
 
        "go.uber.org/multierr"
@@ -30,7 +32,9 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/iter/sort"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       pkgquery "github.com/apache/skywalking-banyandb/pkg/query"
 )
 
 const defaultTopNQueryTimeout = 10 * time.Second
@@ -46,6 +50,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                t.log.Warn().Msg("invalid event data type")
                return
        }
+       n := time.Now()
        now := bus.MessageID(request.TimeRange.Begin.Nanos)
        if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
                resp = bus.NewMessage(now, common.NewError("unspecified 
requested sort direction"))
@@ -56,7 +61,25 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                return
        }
        if e := t.log.Debug(); e.Enabled() {
-               e.Stringer("req", request).Msg("received a topN query event")
+               e.RawJSON("req", logger.Proto(request)).Msg("received a topN 
query event")
+       }
+       if request.Trace {
+               tracer, ctx := pkgquery.NewTracer(context.TODO(), 
n.Format(time.RFC3339Nano))
+               span, _ := tracer.StartSpan(ctx, "distributed-client")
+               span.Tag("request", 
convert.BytesToString(logger.Proto(request)))
+               defer func() {
+                       data := resp.Data()
+                       switch d := data.(type) {
+                       case *measurev1.TopNResponse:
+                               d.Trace = tracer.ToProto()
+                       case common.Error:
+                               span.Error(errors.New(d.Msg()))
+                               resp = bus.NewMessage(now, 
&measurev1.TopNResponse{Trace: tracer.ToProto()})
+                       default:
+                               panic("unexpected data type")
+                       }
+                       span.Stop()
+               }()
        }
        agg := request.Agg
        request.Agg = 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
@@ -103,9 +126,16 @@ func (t *topNQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                resp = bus.NewMessage(now, &measurev1.TopNResponse{})
                return
        }
+       lists := aggregator.Val(tags)
        resp = bus.NewMessage(now, &measurev1.TopNResponse{
-               Lists: aggregator.Val(tags),
+               Lists: lists,
        })
+       if !request.Trace && t.slowQuery > 0 {
+               latency := time.Since(n)
+               if latency > t.slowQuery {
+                       t.log.Warn().Dur("latency", latency).RawJSON("req", 
logger.Proto(request)).Int("resp_count", len(lists)).Msg("top_n slow query")
+               }
+       }
        return
 }
 
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index de1a860d..00adeb36 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -71,7 +71,7 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                }
                ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, 
"measure", "write")
                if errResp := measure.Send(&measurev1.WriteResponse{Metadata: 
metadata, Status: status, MessageId: messageId}); errResp != nil {
-                       logger.Debug().Err(errResp).Msg("failed to send 
response")
+                       logger.Debug().Err(errResp).Msg("failed to send measure 
write response")
                        ms.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, 
"measure", "write")
                }
        }
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index ba264fdc..64458a02 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -71,7 +71,7 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                }
                s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, 
"stream", "write")
                if errResp := stream.Send(&streamv1.WriteResponse{Metadata: 
metadata, Status: status, MessageId: messageId}); errResp != nil {
-                       logger.Debug().Err(errResp).Msg("failed to send 
response")
+                       logger.Debug().Err(errResp).Msg("failed to send stream 
write response")
                        s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, 
"stream", "write")
                }
        }
diff --git a/banyand/metadata/embeddedetcd/server.go 
b/banyand/metadata/embeddedetcd/server.go
index 0271df40..0da9153e 100644
--- a/banyand/metadata/embeddedetcd/server.go
+++ b/banyand/metadata/embeddedetcd/server.go
@@ -100,7 +100,7 @@ func NewServer(options ...Option) (Server, error) {
        for _, opt := range options {
                opt(conf)
        }
-       zapCfg := 
logger.GetLogger("etcd-server").DefaultLevel(zerolog.WarnLevel).ToZapConfig()
+       zapCfg := 
logger.GetLogger("etcd-server").DefaultLevel(zerolog.ErrorLevel).ToZapConfig()
 
        var l *zap.Logger
        var err error
diff --git a/banyand/metadata/schema/watcher.go 
b/banyand/metadata/schema/watcher.go
index e4945866..2464efe4 100644
--- a/banyand/metadata/schema/watcher.go
+++ b/banyand/metadata/schema/watcher.go
@@ -127,7 +127,7 @@ OUTER:
                for {
                        select {
                        case <-w.closer.CloseNotify():
-                               w.l.Warn().Msgf("watcher closed")
+                               w.l.Info().Msgf("watcher closed")
                                return
                        case watchResp, ok := <-wch:
                                if !ok {
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index da3640c5..a6b47bbb 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -24,16 +24,11 @@ import (
        "runtime/debug"
        "time"
 
-       "go.uber.org/multierr"
-
        "github.com/apache/skywalking-banyandb/api/common"
-       "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
-       "github.com/apache/skywalking-banyandb/banyand/metadata"
-       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -55,16 +50,6 @@ var (
        _ bus.MessageListener = (*topNQueryProcessor)(nil)
 )
 
-type queryService struct {
-       metaService metadata.Repo
-       pipeline    queue.Server
-       log         *logger.Logger
-       sqp         *streamQueryProcessor
-       mqp         *measureQueryProcessor
-       tqp         *topNQueryProcessor
-       nodeID      string
-}
-
 type streamQueryProcessor struct {
        streamService stream.Service
        *queryService
@@ -147,6 +132,12 @@ func (p *streamQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
 
        resp = bus.NewMessage(bus.MessageID(now), 
&streamv1.QueryResponse{Elements: entities})
 
+       if !queryCriteria.Trace && p.slowQuery > 0 {
+               latency := time.Since(n)
+               if latency > p.slowQuery {
+                       p.log.Warn().Dur("latency", latency).RawJSON("req", 
logger.Proto(queryCriteria)).Int("resp_count", len(entities)).Msg("stream slow 
query")
+               }
+       }
        return
 }
 
@@ -227,13 +218,13 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
 
        mIterator, err := 
plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(ctx,
 ec))
        if err != nil {
-               ml.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
+               ml.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to query")
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to execute the query plan for measure %s: %v", meta.GetName(), err))
                return
        }
        defer func() {
                if err = mIterator.Close(); err != nil {
-                       ml.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
+                       ml.Error().Err(err).Dur("latency", 
time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close 
the query plan")
                        if span != nil {
                                span.Error(fmt.Errorf("fail to close the query 
plan: %w", err))
                        }
@@ -264,24 +255,11 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
        }
        resp = bus.NewMessage(bus.MessageID(now), qr)
-       return
-}
-
-func (q *queryService) Name() string {
-       return moduleName
-}
-
-func (q *queryService) PreRun(ctx context.Context) error {
-       val := ctx.Value(common.ContextNodeKey)
-       if val == nil {
-               return errors.New("node id is empty")
+       if !queryCriteria.Trace && p.slowQuery > 0 {
+               latency := time.Since(n)
+               if latency > p.slowQuery {
+                       p.log.Warn().Dur("latency", latency).RawJSON("req", 
logger.Proto(queryCriteria)).Int("resp_count", len(result)).Msg("measure slow 
query")
+               }
        }
-       node := val.(common.Node)
-       q.nodeID = node.NodeID
-       q.log = logger.GetLogger(moduleName)
-       return multierr.Combine(
-               q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
-               q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
-               q.pipeline.Subscribe(data.TopicTopNQuery, q.tqp),
-       )
+       return
 }
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index b3ca5143..865b2f44 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -171,6 +171,12 @@ func (t *topNQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        }()
 
        resp = bus.NewMessage(bus.MessageID(now), toTopNResponse(result))
+       if !request.Trace && t.slowQuery > 0 {
+               latency := time.Since(n)
+               if latency > t.slowQuery {
+                       t.log.Warn().Dur("latency", latency).RawJSON("req", 
logger.Proto(request)).Int("resp_count", len(result)).Msg("top_n slow query")
+               }
+       }
        return
 }
 
diff --git a/banyand/query/query.go b/banyand/query/query.go
index 396fe2a0..ef9315af 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -20,14 +20,32 @@ package query
 
 import (
        "context"
+       "errors"
+       "time"
 
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/data"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/stream"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+type queryService struct {
+       metaService metadata.Repo
+       pipeline    queue.Server
+       log         *logger.Logger
+       sqp         *streamQueryProcessor
+       mqp         *measureQueryProcessor
+       tqp         *topNQueryProcessor
+       nodeID      string
+       slowQuery   time.Duration
+}
+
 // NewService return a new query service.
 func NewService(_ context.Context, streamService stream.Service, 
measureService measure.Service,
        metaService metadata.Repo, pipeline queue.Server,
@@ -53,3 +71,32 @@ func NewService(_ context.Context, streamService 
stream.Service, measureService
        }
        return svc, nil
 }
+
+func (q *queryService) Name() string {
+       return moduleName
+}
+
+func (q *queryService) PreRun(ctx context.Context) error {
+       val := ctx.Value(common.ContextNodeKey)
+       if val == nil {
+               return errors.New("node id is empty")
+       }
+       node := val.(common.Node)
+       q.nodeID = node.NodeID
+       q.log = logger.GetLogger(moduleName)
+       return multierr.Combine(
+               q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
+               q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
+               q.pipeline.Subscribe(data.TopicTopNQuery, q.tqp),
+       )
+}
+
+func (q *queryService) FlagSet() *run.FlagSet {
+       fs := run.NewFlagSet("query")
+       fs.DurationVar(&q.slowQuery, "slow-query", 0, "slow query threshold, 0 
means no slow query log")
+       return fs
+}
+
+func (q *queryService) Validate() error {
+       return nil
+}
diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go
index a9e9e3cb..41736bcc 100644
--- a/banyand/queue/sub/sub.go
+++ b/banyand/queue/sub/sub.go
@@ -37,14 +37,14 @@ import (
 
 func (s *server) Send(stream clusterv1.Service_SendServer) error {
        reply := func(writeEntity *clusterv1.SendRequest, err error, message 
string) {
-               s.log.Error().Stringer("written", 
writeEntity).Err(err).Msg(message)
+               s.log.Error().Stringer("request", 
writeEntity).Err(err).Msg(message)
                s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic)
                s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
                if errResp := stream.Send(&clusterv1.SendResponse{
                        MessageId: writeEntity.MessageId,
                        Error:     message,
                }); errResp != nil {
-                       s.log.Err(errResp).Msg("failed to send response")
+                       s.log.Err(errResp).AnErr("original", 
err).Stringer("request", writeEntity).Msg("failed to send error response")
                        s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
                }
        }
@@ -120,7 +120,7 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
                        if errSend := stream.Send(&clusterv1.SendResponse{
                                MessageId: writeEntity.MessageId,
                        }); errSend != nil {
-                               s.log.Error().Stringer("written", 
writeEntity).Err(errSend).Msg("failed to send response")
+                               s.log.Error().Stringer("written", 
writeEntity).Err(errSend).Msg("failed to send write response")
                                s.metrics.totalMsgSentErr.Inc(1, 
writeEntity.Topic)
                                continue
                        }
@@ -139,7 +139,7 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
                        if errSend := stream.Send(&clusterv1.SendResponse{
                                MessageId: writeEntity.MessageId,
                        }); errSend != nil {
-                               s.log.Error().Stringer("written", 
writeEntity).Err(errSend).Msg("failed to send response")
+                               s.log.Error().Stringer("request", 
writeEntity).Err(errSend).Msg("failed to send empty response")
                                s.metrics.totalMsgSentErr.Inc(1, 
writeEntity.Topic)
                                continue
                        }
@@ -166,7 +166,7 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
                        MessageId: writeEntity.MessageId,
                        Body:      anyMessage,
                }); err != nil {
-                       s.log.Error().Stringer("written", 
writeEntity).Err(err).Msg("failed to send response")
+                       s.log.Error().Stringer("request", 
writeEntity).Dur("latency", time.Since(start)).Err(err).Msg("failed to send 
query response")
                        s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
                        continue
                }
diff --git a/docs/operation/configuration.md b/docs/operation/configuration.md
index 4a9b0db6..449725d4 100644
--- a/docs/operation/configuration.md
+++ b/docs/operation/configuration.md
@@ -87,6 +87,8 @@ The following flags are used to configure the embedded etcd 
storage engine which
 - `--observability-listener-addr string`: Listen address for observability 
(default: ":2121").
 - `--observability-modes strings`: Modes for observability (default: 
[prometheus]).
 - `--pprof-listener-addr string`: Listen address for pprof (default: ":6060").
+- `--dst-slow-query duration`: distributed slow query threshold, 0 means no 
slow query log. This is only used for the liaison server (default: 0).
+- `--slow-query duration`: slow query threshold, 0 means no slow query log. 
This is only used for the data and standalone server (default: 0).
 
 ### Other
 
diff --git a/docs/operation/observability.md b/docs/operation/observability.md
index d28b5f41..7119b4cf 100644
--- a/docs/operation/observability.md
+++ b/docs/operation/observability.md
@@ -14,6 +14,14 @@ BanyanDB uses the [zerolo](https://github.com/rs/zerolog) 
library for logging. T
 --logging-modules=storage --logging-levels=debug
 ```
 
+### Slow Query Logging
+
+BanyanDB supports slow query logging. The `slow-query` flag is used to set the 
slow query threshold. If a query takes longer than the threshold, it will be 
logged as a slow query. The default value is `0`, which means no slow query 
logging. This flag is only used for the data and standalone servers.
+
+The `dst-slow-query` flag is used to set the distributed slow query threshold. 
This flag is only used for the liaison server. The default value is `0`, which 
means no distributed slow query logging.
+
+When query tracing is enabled, the slow query log won't be generated.
+
 ## Metrics
 
 BanyanDB has built-in support for metrics collection. Currently, there are two 
supported metrics provider: `prometheus` and `native`. These can be enabled 
through `observability-modes` flag, allowing you to activate one or both of 
them.
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go 
b/pkg/query/logical/stream/stream_plan_distributed.go
index ac55dbdd..6de4ce5d 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -38,7 +38,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
-const defaultQueryTimeout = 10 * time.Second
+const defaultQueryTimeout = 30 * time.Second
 
 var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
 

Reply via email to