This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch cleanup-todo
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit f5ac356ae52352b1a4ae2354c55e6934ad696047
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Jul 16 01:08:11 2024 +0000

    Clean up todo
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 api/proto/banyandb/measure/v1/topn.proto           |   1 -
 banyand/Makefile                                   |   3 -
 banyand/dquery/measure.go                          |   1 -
 banyand/dquery/stream.go                           |   1 -
 banyand/dquery/topn.go                             |   8 +-
 banyand/internal/bucket/bucket.go                  | 152 ---------
 banyand/internal/bucket/bucket_suite_test.go       |  38 ---
 banyand/internal/bucket/queue.go                   | 290 ----------------
 banyand/internal/bucket/queue_test.go              | 145 --------
 banyand/internal/bucket/strategy.go                | 179 ----------
 banyand/internal/bucket/strategy_test.go           | 154 ---------
 banyand/internal/storage/index.go                  |   2 -
 banyand/internal/storage/segment.go                |  15 +-
 banyand/internal/storage/shard.go                  |   2 +-
 banyand/liaison/grpc/measure.go                    |   7 +-
 banyand/liaison/grpc/stream.go                     |   7 +-
 banyand/measure/column.go                          |   2 -
 banyand/measure/part_iter_test.go                  |   1 -
 banyand/measure/topn.go                            |   2 -
 banyand/metadata/embeddedetcd/server.go            |   1 -
 banyand/query/processor.go                         |   2 -
 banyand/query/processor_topn.go                    |   1 -
 banyand/stream/part_iter_test.go                   |   1 -
 banyand/stream/tag.go                              |   2 -
 bydbctl/internal/cmd/rest.go                       |   1 -
 dist/LICENSE                                       |   1 -
 .../license-github.com-hashicorp-golang-lru-v2.txt | 364 ---------------------
 docs/api-reference.md                              |   2 +-
 docs/observability.md                              |   2 -
 go.mod                                             |   1 -
 go.sum                                             |   2 -
 pkg/cmdsetup/data.go                               |   1 -
 pkg/convert/number_test.go                         |  32 +-
 pkg/flow/streaming/streaming.go                    |   2 -
 pkg/flow/streaming/unary.go                        |   1 -
 pkg/fs/local_file_system.go                        |   3 -
 pkg/query/logical/measure/schema.go                |   1 -
 pkg/schema/cache.go                                |   1 -
 test/docker/base-compose.yml                       |   1 -
 test/integration/load/load_suite_test.go           |   2 +-
 test/stress/trace/docker-compose-cluster.yaml      |   1 -
 test/stress/trace/docker-compose-single.yaml       |   3 +-
 42 files changed, 43 insertions(+), 1395 deletions(-)

diff --git a/api/proto/banyandb/measure/v1/topn.proto 
b/api/proto/banyandb/measure/v1/topn.proto
index 1b349bc6..dc15a444 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -60,7 +60,6 @@ message TopNRequest {
   // top_n set the how many items should be returned in each list.
   int32 top_n = 4 [(validate.rules).int32.gt = 0];
   // agg aggregates lists grouped by field names in the time_range
-  // TODO validate enum defined_only
   model.v1.AggregationFunction agg = 5;
   // criteria select counters. Only equals are acceptable.
   repeated model.v1.Condition conditions = 6;
diff --git a/banyand/Makefile b/banyand/Makefile
index 65ab37df..f071b537 100644
--- a/banyand/Makefile
+++ b/banyand/Makefile
@@ -21,9 +21,6 @@ SERVER := $(NAME)-server
 BINARIES := $(SERVER)
 DEBUG_BINARIES := $(SERVER)-debug
 
-# // TODO: add jemalloc installation on Linux
-# BUILD_TAGS := jemalloc
-
 IMG_NAME := skywalking-banyandb
 
 include ../scripts/build/version.mk
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index be6a81d5..20f8f76f 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -48,7 +48,6 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
                return
        }
-       // TODO: support multiple groups
        if len(queryCriteria.Groups) > 1 {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("only 
support one group in the query request"))
                return
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index 8931ebb5..a9b34eec 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -51,7 +51,6 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
        if p.log.Debug().Enabled() {
                p.log.Debug().RawJSON("criteria", 
logger.Proto(queryCriteria)).Msg("received a query request")
        }
-       // TODO: support multiple groups
        if len(queryCriteria.Groups) > 1 {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("only 
support one group in the query request"))
                return
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index 9aefc08c..f63ae7c5 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -46,8 +46,13 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                t.log.Warn().Msg("invalid event data type")
                return
        }
+       now := bus.MessageID(request.TimeRange.Begin.Nanos)
        if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
-               t.log.Warn().Msg("invalid requested sort direction")
+               resp = bus.NewMessage(now, common.NewError("unspecified 
requested sort direction"))
+               return
+       }
+       if request.GetAgg() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+               resp = bus.NewMessage(now, common.NewError("unspecified 
requested aggregation function"))
                return
        }
        if e := t.log.Debug(); e.Enabled() {
@@ -55,7 +60,6 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
        }
        agg := request.Agg
        request.Agg = 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
-       now := bus.MessageID(request.TimeRange.Begin.Nanos)
        ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, 
data.TopicTopNQuery, bus.NewMessage(now, request))
        if err != nil {
                resp = bus.NewMessage(now, common.NewError("execute the query 
%s: %v", request.GetName(), err))
diff --git a/banyand/internal/bucket/bucket.go 
b/banyand/internal/bucket/bucket.go
deleted file mode 100644
index e05a400c..00000000
--- a/banyand/internal/bucket/bucket.go
+++ /dev/null
@@ -1,152 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package bucket implements a rolling bucket system.
-package bucket
-
-import (
-       "errors"
-       "fmt"
-       "sync/atomic"
-       "time"
-
-       "github.com/robfig/cron/v3"
-
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/timestamp"
-)
-
-var errReporterClosed = errors.New("reporter is closed")
-
-// Controller defines the provider of a Reporter.
-type Controller interface {
-       Current() (Reporter, error)
-       Next() (Reporter, error)
-       OnMove(prev, next Reporter)
-}
-
-// Status is a sample of the Reporter's status.
-type Status struct {
-       Capacity int
-       Volume   int
-}
-
-// Channel reports the status of a Reporter.
-type Channel chan Status
-
-// Reporter allows reporting status to its supervisor.
-type Reporter interface {
-       // TODO: refactor Report to return a status. It's too complicated to 
return a channel
-       Report() (Channel, error)
-       String() string
-}
-
-var (
-       _ Reporter = (*dummyReporter)(nil)
-       _ Reporter = (*timeBasedReporter)(nil)
-       // DummyReporter is a special Reporter to avoid nil errors.
-       DummyReporter = &dummyReporter{}
-)
-
-type dummyReporter struct{}
-
-func (*dummyReporter) Report() (Channel, error) {
-       return nil, errReporterClosed
-}
-
-func (*dummyReporter) Stop() {
-}
-
-func (*dummyReporter) String() string {
-       return "dummy-reporter"
-}
-
-type timeBasedReporter struct {
-       clock     timestamp.Clock
-       scheduler *timestamp.Scheduler
-       count     *atomic.Uint32
-       timestamp.TimeRange
-       name string
-}
-
-// NewTimeBasedReporter returns a Reporter which sends report based on time.
-func NewTimeBasedReporter(name string, timeRange timestamp.TimeRange, clock 
timestamp.Clock, scheduler *timestamp.Scheduler) Reporter {
-       if timeRange.End.Before(clock.Now()) {
-               return DummyReporter
-       }
-       t := &timeBasedReporter{
-               TimeRange: timeRange,
-               scheduler: scheduler,
-               clock:     clock,
-               name:      name,
-               count:     &atomic.Uint32{},
-       }
-       return t
-}
-
-func (tr *timeBasedReporter) Report() (Channel, error) {
-       if tr.scheduler.Closed() {
-               return nil, errReporterClosed
-       }
-       now := tr.clock.Now()
-       if now.After(tr.End) {
-               return nil, errReporterClosed
-       }
-       ch := make(Channel, 1)
-       interval := tr.Duration() >> 4
-       if interval < 100*time.Millisecond {
-               interval = 100 * time.Millisecond
-       }
-       ms := interval / time.Millisecond
-       if err := tr.scheduler.Register(
-               fmt.Sprintf("%s-%d", tr.name, tr.count.Add(1)),
-               cron.Descriptor,
-               fmt.Sprintf("@every %dms", ms),
-               func(now time.Time, l *logger.Logger) bool {
-                       status := Status{
-                               Capacity: int(tr.End.UnixNano() - 
tr.Start.UnixNano()),
-                               Volume:   int(now.UnixNano() - 
tr.Start.UnixNano()),
-                       }
-                       if e := l.Debug(); e.Enabled() {
-                               e.Int("volume", status.Volume).Int("capacity", 
status.Capacity).Int("progress%", 
status.Volume*100/status.Capacity).Msg("reporting a status")
-                       }
-                       select {
-                       case ch <- status:
-                       default:
-                               // TODO: this's too complicated, we should not 
use the channel anymore.
-                               if status.Volume >= status.Capacity {
-                                       l.Warn().Int("volume", 
status.Volume).Int("capacity", status.Capacity).Int("progress%", 
status.Volume*100/status.Capacity).Msg("the end status must be reported")
-                                       ch <- status
-                               } else {
-                                       l.Warn().Int("volume", 
status.Volume).Int("capacity", status.Capacity).Int("progress%", 
status.Volume*100/status.Capacity).Msg("ignore a status")
-                               }
-                       }
-                       l.Info().Int("volume", status.Volume).Int("capacity", 
status.Capacity).Int("progress%", 
status.Volume*100/status.Capacity).Msg("reported a status")
-                       if status.Volume < status.Capacity {
-                               return true
-                       }
-                       close(ch)
-                       return false
-               }); err != nil {
-               close(ch)
-               if errors.Is(err, timestamp.ErrSchedulerClosed) {
-                       return nil, errReporterClosed
-               }
-               return nil, err
-       }
-       return ch, nil
-}
diff --git a/banyand/internal/bucket/bucket_suite_test.go 
b/banyand/internal/bucket/bucket_suite_test.go
deleted file mode 100644
index 289cd7da..00000000
--- a/banyand/internal/bucket/bucket_suite_test.go
+++ /dev/null
@@ -1,38 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package bucket_test
-
-import (
-       "testing"
-
-       . "github.com/onsi/ginkgo/v2"
-       . "github.com/onsi/gomega"
-
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/test/flags"
-)
-
-func TestBucket(t *testing.T) {
-       RegisterFailHandler(Fail)
-       BeforeSuite(func() {
-               Expect(logger.Init(logger.Logging{
-                       Env:   "dev",
-                       Level: flags.LogLevel,
-               })).Should(Succeed())
-       })
-       RunSpecs(t, "Bucket Suite")
-}
diff --git a/banyand/internal/bucket/queue.go b/banyand/internal/bucket/queue.go
deleted file mode 100644
index a1d757be..00000000
--- a/banyand/internal/bucket/queue.go
+++ /dev/null
@@ -1,290 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package bucket
-
-import (
-       "context"
-       "errors"
-       "fmt"
-       "sync"
-       "time"
-
-       "github.com/hashicorp/golang-lru/v2/simplelru"
-       "github.com/robfig/cron/v3"
-
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/timestamp"
-)
-
-type (
-       // EvictFn is a closure executed on evicting an item.
-       EvictFn func(ctx context.Context, id interface{}) error
-       // OnAddRecentFn is a notifier on adding an item into the recent queue.
-       OnAddRecentFn func() error
-)
-
-// Queue is a LRU queue.
-type Queue interface {
-       Touch(id fmt.Stringer) bool
-       Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error
-       Remove(id fmt.Stringer)
-       Len() int
-       Volume() int
-       All() []fmt.Stringer
-}
-
-const (
-       // QueueName is identity of the queue.
-       QueueName = "block-queue-cleanup"
-
-       defaultRecentRatio    = 0.25
-       defaultEvictBatchSize = 10
-)
-
-var errInvalidSize = errors.New("invalid size")
-
-type lruQueue struct {
-       recent      simplelru.LRUCache[fmt.Stringer, any]
-       frequent    simplelru.LRUCache[fmt.Stringer, any]
-       recentEvict simplelru.LRUCache[fmt.Stringer, any]
-       l           *logger.Logger
-       evictFn     EvictFn
-       size        int
-       recentSize  int
-       evictSize   int
-       lock        sync.RWMutex
-}
-
-// NewQueue return a Queue for blocks eviction.
-func NewQueue(l *logger.Logger, size int, maxSize int, scheduler 
*timestamp.Scheduler, evictFn EvictFn) (Queue, error) {
-       if size <= 0 {
-               return nil, errInvalidSize
-       }
-
-       recentSize := int(float64(size) * defaultRecentRatio)
-       evictSize := maxSize - size
-
-       recent, err := simplelru.NewLRU[fmt.Stringer, any](size, nil)
-       if err != nil {
-               return nil, err
-       }
-       frequent, err := simplelru.NewLRU[fmt.Stringer, any](size, nil)
-       if err != nil {
-               return nil, err
-       }
-       recentEvict, err := simplelru.NewLRU[fmt.Stringer, any](evictSize, nil)
-       if err != nil {
-               return nil, err
-       }
-       c := &lruQueue{
-               size:        size,
-               recentSize:  recentSize,
-               recent:      recent,
-               frequent:    frequent,
-               recentEvict: recentEvict,
-               evictSize:   evictSize,
-               evictFn:     evictFn,
-               l:           l,
-       }
-       if err := scheduler.Register(QueueName, cron.Descriptor, "@every 5m", 
c.cleanEvict); err != nil {
-               return nil, err
-       }
-       return c, nil
-}
-
-func (q *lruQueue) Touch(id fmt.Stringer) bool {
-       q.lock.Lock()
-       defer q.lock.Unlock()
-
-       if q.frequent.Contains(id) {
-               if e := q.l.Debug(); e.Enabled() {
-                       e.Stringer("id", id).Msg("get from frequent")
-               }
-               return true
-       }
-
-       if q.recent.Contains(id) {
-               if e := q.l.Debug(); e.Enabled() {
-                       e.Stringer("id", id).Msg("promote from recent to 
frequent")
-               }
-               q.recent.Remove(id)
-               q.frequent.Add(id, nil)
-               return true
-       }
-       return false
-}
-
-func (q *lruQueue) Push(ctx context.Context, id fmt.Stringer, fn 
OnAddRecentFn) error {
-       q.lock.Lock()
-       defer q.lock.Unlock()
-
-       if q.frequent.Contains(id) {
-               if e := q.l.Debug(); e.Enabled() {
-                       e.Stringer("id", id).Msg("push to frequent")
-               }
-               q.frequent.Add(id, nil)
-               return nil
-       }
-
-       if q.recent.Contains(id) {
-               if e := q.l.Debug(); e.Enabled() {
-                       e.Stringer("id", id).Msg("promote from recent to 
frequent")
-               }
-               q.recent.Remove(id)
-               q.frequent.Add(id, nil)
-               return nil
-       }
-
-       if q.recentEvict.Contains(id) {
-               if e := q.l.Debug(); e.Enabled() {
-                       e.Stringer("id", id).Msg("restore from recentEvict")
-               }
-               if err := q.ensureSpace(ctx, true); err != nil {
-                       return err
-               }
-               q.recentEvict.Remove(id)
-               q.frequent.Add(id, nil)
-               return nil
-       }
-
-       if err := q.ensureSpace(ctx, false); err != nil {
-               return err
-       }
-       q.recent.Add(id, nil)
-       if fn == nil {
-               return nil
-       }
-       return fn()
-}
-
-func (q *lruQueue) Remove(id fmt.Stringer) {
-       q.lock.Lock()
-       defer q.lock.Unlock()
-
-       if q.frequent.Contains(id) {
-               q.frequent.Remove(id)
-               return
-       }
-
-       if q.recent.Contains(id) {
-               q.recent.Remove(id)
-               return
-       }
-
-       if q.recentEvict.Contains(id) {
-               q.recentEvict.Remove(id)
-       }
-}
-
-func (q *lruQueue) Len() int {
-       q.lock.RLock()
-       defer q.lock.RUnlock()
-       return q.recent.Len() + q.frequent.Len()
-}
-
-func (q *lruQueue) Volume() int {
-       return q.size + q.recentSize + q.evictSize
-}
-
-func (q *lruQueue) All() []fmt.Stringer {
-       q.lock.RLock()
-       defer q.lock.RUnlock()
-       all := make([]fmt.Stringer, 
q.recent.Len()+q.frequent.Len()+q.recentEvict.Len())
-       copy(all, q.recent.Keys())
-       copy(all[q.recent.Len():], q.frequent.Keys())
-       copy(all[q.recent.Len()+q.frequent.Len():], q.recentEvict.Keys())
-       return all
-}
-
-func (q *lruQueue) evictLen() int {
-       q.lock.RLock()
-       defer q.lock.RUnlock()
-       return q.recentEvict.Len()
-}
-
-func (q *lruQueue) ensureSpace(ctx context.Context, recentEvict bool) error {
-       recentLen := q.recent.Len()
-       freqLen := q.frequent.Len()
-       if recentLen+freqLen < q.size {
-               return nil
-       }
-       if recentLen > 0 && (recentLen > q.recentSize || (recentLen == 
q.recentSize && !recentEvict)) {
-               k, _, ok := q.recent.GetOldest()
-               if !ok {
-                       return errors.New("failed to get oldest from recent 
queue")
-               }
-               if err := q.addLst(ctx, q.recentEvict, q.evictSize, k); err != 
nil {
-                       return err
-               }
-               q.recent.Remove(k)
-               return nil
-       }
-       return q.removeOldest(ctx, q.frequent)
-}
-
-func (q *lruQueue) addLst(ctx context.Context, lst 
simplelru.LRUCache[fmt.Stringer, any], size int, id fmt.Stringer) error {
-       if lst.Len() < size {
-               lst.Add(id, nil)
-               return nil
-       }
-       if err := q.removeOldest(ctx, lst); err != nil {
-               return err
-       }
-       lst.Add(id, nil)
-       return nil
-}
-
-func (q *lruQueue) removeOldest(ctx context.Context, lst 
simplelru.LRUCache[fmt.Stringer, any]) error {
-       oldestID, _, ok := lst.GetOldest()
-       if ok && q.evictFn != nil {
-               if err := q.evictFn(ctx, oldestID); err != nil {
-                       return err
-               }
-               _ = lst.Remove(oldestID)
-       }
-       return nil
-}
-
-func (q *lruQueue) cleanEvict(now time.Time, l *logger.Logger) bool {
-       if e := l.Debug(); e.Enabled() {
-               e.Time("now", now).Msg("block queue wakes")
-       }
-       if q.evictLen() < 1 {
-               return true
-       }
-       for i := 0; i < defaultEvictBatchSize; i++ {
-               if q.remove() {
-                       break
-               }
-       }
-       return true
-}
-
-func (q *lruQueue) remove() bool {
-       q.lock.Lock()
-       defer q.lock.Unlock()
-       ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
-       defer cancel()
-       if err := q.removeOldest(ctx, q.recentEvict); err != nil {
-               q.l.Error().Err(err).Msg("failed to remove oldest blocks")
-       }
-       if q.recentEvict.Len() < 1 {
-               return true
-       }
-       return false
-}
diff --git a/banyand/internal/bucket/queue_test.go 
b/banyand/internal/bucket/queue_test.go
deleted file mode 100644
index c0defbfd..00000000
--- a/banyand/internal/bucket/queue_test.go
+++ /dev/null
@@ -1,145 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package bucket_test
-
-import (
-       "context"
-       "strconv"
-       "sync"
-       "time"
-
-       . "github.com/onsi/ginkgo/v2"
-       . "github.com/onsi/gomega"
-       "github.com/onsi/gomega/gleak"
-
-       "github.com/apache/skywalking-banyandb/banyand/internal/bucket"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/test/flags"
-       "github.com/apache/skywalking-banyandb/pkg/timestamp"
-)
-
-type queueEntryID struct {
-       first  uint16
-       second uint16
-}
-
-func (q queueEntryID) String() string {
-       return strconv.Itoa(int(q.first))
-}
-
-func entryID(id uint16) queueEntryID {
-       return queueEntryID{
-               first:  id,
-               second: id + 1,
-       }
-}
-
-var _ = Describe("Queue", func() {
-       var lock sync.Mutex
-       var evictLst []queueEntryID
-       var l bucket.Queue
-       var clock timestamp.MockClock
-       var scheduler *timestamp.Scheduler
-       BeforeEach(func() {
-               goods := gleak.Goroutines()
-               DeferCleanup(func() {
-                       Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
-               })
-               evictLst = make([]queueEntryID, 0)
-               clock = timestamp.NewMockClock()
-               clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
-               scheduler = 
timestamp.NewScheduler(logger.GetLogger("queue-test"), clock)
-               var err error
-               l, err = bucket.NewQueue(logger.GetLogger("test"), 128, 192, 
scheduler, func(_ context.Context, id interface{}) error {
-                       lock.Lock()
-                       defer lock.Unlock()
-                       evictLst = append(evictLst, id.(queueEntryID))
-                       return nil
-               })
-               Expect(err).ShouldNot(HaveOccurred())
-               DeferCleanup(func() {
-                       scheduler.Close()
-                       evictLst = evictLst[:0]
-               })
-       })
-       It("pushes to recent", func() {
-               enRecentSize := 0
-               for i := 0; i < 256; i++ {
-                       Expect(l.Push(context.Background(), entryID(uint16(i)), 
func() error {
-                               enRecentSize++
-                               return nil
-                       })).To(Succeed())
-               }
-               Expect(enRecentSize).To(Equal(256))
-               Expect(l.Len()).To(Equal(128))
-               Expect(len(evictLst)).To(Equal(64))
-               for i := 0; i < 64; i++ {
-                       Expect(evictLst[i]).To(Equal(entryID(uint16(i))))
-               }
-       })
-
-       It("promotes to frequent", func() {
-               enRecentSize := 0
-               for i := 0; i < 128; i++ {
-                       Expect(l.Push(context.Background(), entryID(uint16(i)), 
func() error {
-                               enRecentSize++
-                               return nil
-                       })).To(Succeed())
-               }
-               Expect(enRecentSize).To(Equal(128))
-               Expect(l.Len()).To(Equal(128))
-               Expect(len(evictLst)).To(Equal(0))
-               for i := 0; i < 64; i++ {
-                       Expect(l.Touch(entryID(uint16(i)))).To(BeTrue())
-               }
-               enRecentSize = 0
-               for i := 128; i < 256; i++ {
-                       Expect(l.Push(context.Background(), entryID(uint16(i)), 
func() error {
-                               enRecentSize++
-                               return nil
-                       })).To(Succeed())
-               }
-               Expect(enRecentSize).To(Equal(128))
-               Expect(l.Len()).To(Equal(128))
-               Expect(len(evictLst)).To(Equal(64))
-               for i := 0; i < 64; i++ {
-                       Expect(evictLst[i]).To(Equal(entryID(uint16(i + 64))))
-               }
-       })
-
-       It("cleans up evict queue", func() {
-               enRecentSize := 0
-               for i := 0; i < 192; i++ {
-                       Expect(l.Push(context.Background(), entryID(uint16(i)), 
func() error {
-                               enRecentSize++
-                               return nil
-                       })).To(Succeed())
-               }
-               Expect(enRecentSize).To(Equal(192))
-               Expect(l.Len()).To(Equal(128))
-               Expect(len(evictLst)).To(Equal(0))
-               clock.Add(6 * time.Minute)
-               if !scheduler.Trigger(bucket.QueueName) {
-                       Fail("trigger fails")
-               }
-               Eventually(func() int {
-                       lock.Lock()
-                       defer lock.Unlock()
-                       return len(evictLst)
-               
}).WithTimeout(flags.EventuallyTimeout).Should(BeNumerically(">", 1))
-       })
-})
diff --git a/banyand/internal/bucket/strategy.go 
b/banyand/internal/bucket/strategy.go
deleted file mode 100644
index d9bb7edc..00000000
--- a/banyand/internal/bucket/strategy.go
+++ /dev/null
@@ -1,179 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package bucket
-
-import (
-       "fmt"
-       "math"
-       "sync/atomic"
-
-       "github.com/pkg/errors"
-       "go.uber.org/multierr"
-
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/run"
-)
-
-var (
-       // ErrInvalidParameter denotes input parameters are invalid.
-       ErrInvalidParameter = errors.New("parameters are invalid")
-       // ErrNoMoreBucket denotes the bucket volume reaches the limitation.
-       ErrNoMoreBucket = errors.New("no more buckets")
-)
-
-type ratio float64
-
-// Strategy controls Reporters with Controller's help.
-type Strategy struct {
-       optionsErr   error
-       ctrl         Controller
-       current      atomic.Value
-       logger       *logger.Logger
-       closer       *run.Closer
-       ratio        ratio
-       currentRatio uint64
-}
-
-// StrategyOptions sets how to create a Strategy.
-type StrategyOptions func(*Strategy)
-
-// WithNextThreshold sets a ratio to creat the next Reporter.
-func WithNextThreshold(r ratio) StrategyOptions {
-       return func(s *Strategy) {
-               if r > 1.0 {
-                       s.optionsErr = multierr.Append(s.optionsErr,
-                               errors.Wrapf(ErrInvalidParameter, "ratio %v is 
more than 1.0", r))
-                       return
-               }
-               s.ratio = r
-       }
-}
-
-// WithLogger sets a logger.Logger.
-func WithLogger(logger *logger.Logger) StrategyOptions {
-       return func(s *Strategy) {
-               s.logger = logger
-       }
-}
-
-// NewStrategy returns a Strategy.
-func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, 
error) {
-       if ctrl == nil {
-               return nil, errors.Wrap(ErrInvalidParameter, "controller is 
absent")
-       }
-       strategy := &Strategy{
-               ctrl:   ctrl,
-               ratio:  0.8,
-               closer: run.NewCloser(1),
-       }
-       for _, opt := range options {
-               opt(strategy)
-       }
-       if strategy.optionsErr != nil {
-               return nil, strategy.optionsErr
-       }
-       if strategy.logger == nil {
-               strategy.logger = logger.GetLogger("bucket-strategy")
-       }
-       if err := strategy.resetCurrent(); err != nil {
-               return nil, err
-       }
-       return strategy, nil
-}
-
-func (s *Strategy) resetCurrent() error {
-       c, err := s.ctrl.Current()
-       if err != nil {
-               return err
-       }
-       s.current.Store(c)
-       return nil
-}
-
-// Run the Strategy in the background.
-func (s *Strategy) Run() {
-       go func(s *Strategy) {
-               defer s.closer.Done()
-               for {
-                       c, err := s.current.Load().(Reporter).Report()
-                       if errors.Is(err, errReporterClosed) {
-                               return
-                       }
-                       if err != nil {
-                               s.logger.Error().Err(err).Msg("failed to get 
reporter")
-                               if err := s.resetCurrent(); err != nil {
-                                       panic(err)
-                               }
-                               continue
-                       }
-                       if !s.observe(c) {
-                               return
-                       }
-               }
-       }(s)
-}
-
-func (s *Strategy) String() string {
-       c := s.current.Load()
-       if c == nil {
-               return "nil"
-       }
-       return fmt.Sprintf("%s:%f", c.(Reporter).String(),
-               math.Float64frombits(atomic.LoadUint64(&s.currentRatio)))
-}
-
-func (s *Strategy) observe(c Channel) bool {
-       var next Reporter
-       moreBucket := true
-       for {
-               select {
-               case status, more := <-c:
-                       if !more {
-                               return moreBucket
-                       }
-                       r := ratio(status.Volume) / ratio(status.Capacity)
-                       atomic.StoreUint64(&s.currentRatio, 
math.Float64bits(float64(r)))
-                       if r >= s.ratio && next == nil && moreBucket {
-                               n, err := s.ctrl.Next()
-                               switch {
-                               case errors.Is(err, ErrNoMoreBucket):
-                                       moreBucket = false
-                               case err != nil:
-                                       s.logger.Err(err).Msg("failed to create 
the next bucket")
-                               default:
-                                       s.logger.Info().Stringer("next", 
n).Msg("created the next bucket")
-                                       next = n
-                               }
-                       }
-                       if r >= 1.0 {
-                               s.ctrl.OnMove(s.current.Load().(Reporter), next)
-                               if next != nil {
-                                       s.current.Store(next)
-                               }
-                               return moreBucket
-                       }
-               case <-s.closer.CloseNotify():
-                       return false
-               }
-       }
-}
-
-// Close the Strategy running in the background.
-func (s *Strategy) Close() {
-       s.closer.CloseThenWait()
-}
diff --git a/banyand/internal/bucket/strategy_test.go 
b/banyand/internal/bucket/strategy_test.go
deleted file mode 100644
index 0ca374c0..00000000
--- a/banyand/internal/bucket/strategy_test.go
+++ /dev/null
@@ -1,154 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package bucket_test
-
-import (
-       "sync"
-
-       . "github.com/onsi/ginkgo/v2"
-       . "github.com/onsi/gomega"
-       "github.com/onsi/gomega/gleak"
-
-       "github.com/apache/skywalking-banyandb/banyand/internal/bucket"
-       "github.com/apache/skywalking-banyandb/pkg/test/flags"
-)
-
-var _ = Describe("Strategy", func() {
-       BeforeEach(func() {
-               goods := gleak.Goroutines()
-               DeferCleanup(func() {
-                       Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
-               })
-       })
-       Context("Applying the strategy", func() {
-               var strategy *bucket.Strategy
-               It("uses the golden settings", func() {
-                       ctrl := newController(2, 1)
-                       var err error
-                       strategy, err = bucket.NewStrategy(ctrl)
-                       Expect(err).NotTo(HaveOccurred())
-                       strategy.Run()
-                       Eventually(ctrl.isFull, 
flags.EventuallyTimeout).Should(BeTrue())
-               })
-               It("never reaches the limit", func() {
-                       ctrl := newController(1, 0)
-                       var err error
-                       strategy, err = bucket.NewStrategy(ctrl)
-                       Expect(err).NotTo(HaveOccurred())
-                       strategy.Run()
-                       Consistently(ctrl.isFull).ShouldNot(BeTrue())
-               })
-               It("exceeds the limit", func() {
-                       ctrl := newController(2, 3)
-                       var err error
-                       strategy, err = bucket.NewStrategy(ctrl)
-                       Expect(err).NotTo(HaveOccurred())
-                       strategy.Run()
-                       Eventually(ctrl.isFull, 
flags.EventuallyTimeout).Should(BeTrue())
-               })
-               It("'s first step exceeds the limit", func() {
-                       ctrl := newController(2, 15)
-                       var err error
-                       strategy, err = bucket.NewStrategy(ctrl)
-                       Expect(err).NotTo(HaveOccurred())
-                       strategy.Run()
-                       Eventually(ctrl.isFull, 
flags.EventuallyTimeout).Should(BeTrue())
-               })
-               AfterEach(func() {
-                       if strategy != nil {
-                               strategy.Close()
-                       }
-               })
-       })
-       Context("Invalid parameter", func() {
-               It("passes a ratio > 1.0", func() {
-                       ctrl := newController(2, 3)
-                       _, err := bucket.NewStrategy(ctrl, 
bucket.WithNextThreshold(1.1))
-                       Expect(err).To(MatchError(bucket.ErrInvalidParameter))
-               })
-       })
-})
-
-type controller struct {
-       reporter    *reporter
-       maxBuckets  int
-       usedBuckets int
-       capacity    int
-       step        int
-       mux         sync.RWMutex
-}
-
-func newController(maxBuckets, step int) *controller {
-       ctrl := &controller{step: step, maxBuckets: maxBuckets, capacity: 10}
-       ctrl.newReporter()
-       return ctrl
-}
-
-func (c *controller) Next() (bucket.Reporter, error) {
-       c.mux.Lock()
-       defer c.mux.Unlock()
-       if c.usedBuckets >= c.maxBuckets {
-               return nil, bucket.ErrNoMoreBucket
-       }
-       c.usedBuckets++
-       c.newReporter()
-       return c.reporter, nil
-}
-
-func (c *controller) Current() (bucket.Reporter, error) {
-       c.mux.RLock()
-       defer c.mux.RUnlock()
-       return c.reporter, nil
-}
-
-func (c *controller) OnMove(prev bucket.Reporter, next bucket.Reporter) {
-}
-
-func (c *controller) newReporter() {
-       c.reporter = &reporter{step: c.step, capacity: c.capacity}
-}
-
-func (c *controller) isFull() bool {
-       c.mux.RLock()
-       defer c.mux.RUnlock()
-       return c.usedBuckets >= c.maxBuckets
-}
-
-type reporter struct {
-       capacity int
-       step     int
-}
-
-func (r *reporter) Report() (bucket.Channel, error) {
-       ch := make(bucket.Channel, r.capacity)
-       go func() {
-               var volume int
-               for i := 0; i < r.capacity; i++ {
-                       volume += r.step
-                       ch <- bucket.Status{
-                               Capacity: r.capacity,
-                               Volume:   volume,
-                       }
-               }
-               close(ch)
-       }()
-       return ch, nil
-}
-
-func (r *reporter) String() string {
-       return "default"
-}
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index 25cd8511..d5c935fd 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -203,7 +203,6 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, filter
        pl := seriesList.ToList()
        if filter != nil && filter != logical.ENode {
                var plFilter posting.List
-               // TODO: merge searchPrimary and filter
                func() {
                        if tracer != nil {
                                span, _ := tracer.StartSpan(ctx, "filter")
@@ -251,7 +250,6 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, filter
                        span.Stop()
                }()
        }
-       // TODO:// merge searchPrimary and sort
        iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort, 
preloadSize)
        if err != nil {
                return nil, err
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 60e6727c..507846d6 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -32,7 +32,6 @@ import (
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
-       "github.com/apache/skywalking-banyandb/banyand/internal/bucket"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -42,7 +41,6 @@ import (
 var ErrExpiredData = errors.New("expired data")
 
 type segment[T TSTable] struct {
-       bucket.Reporter
        tsTable  T
        l        *logger.Logger
        position common.Position
@@ -55,7 +53,7 @@ type segment[T TSTable] struct {
 }
 
 func openSegment[T TSTable](ctx context.Context, startTime, endTime time.Time, 
path, suffix string,
-       segmentSize IntervalRule, scheduler *timestamp.Scheduler, tsTable T, p 
common.Position,
+       segmentSize IntervalRule, tsTable T, p common.Position,
 ) (s *segment[T], err error) {
        suffixInteger, err := strconv.Atoi(suffix)
        if err != nil {
@@ -68,14 +66,12 @@ func openSegment[T TSTable](ctx context.Context, startTime, 
endTime time.Time, p
                path:      path,
                suffix:    suffix,
                TimeRange: timeRange,
-               position:  common.GetPosition(ctx),
+               position:  p,
                tsTable:   tsTable,
                refCount:  1,
        }
        l := logger.Fetch(ctx, s.String())
        s.l = l
-       clock, _ := timestamp.GetClock(ctx)
-       s.Reporter = bucket.NewTimeBasedReporter(fmt.Sprintf("%s-%s", p.Shard, 
s.String()), timeRange, clock, scheduler)
        return s, nil
 }
 
@@ -123,7 +119,6 @@ func (s *segment[T]) String() string {
 type segmentController[T TSTable, O any] struct {
        clock          timestamp.Clock
        option         O
-       scheduler      *timestamp.Scheduler
        l              *logger.Logger
        tsTableCreator TSTableCreator[T, O]
        position       common.Position
@@ -135,8 +130,7 @@ type segmentController[T TSTable, O any] struct {
 }
 
 func newSegmentController[T TSTable, O any](ctx context.Context, location 
string,
-       segmentSize IntervalRule, l *logger.Logger, scheduler 
*timestamp.Scheduler,
-       tsTableCreator TSTableCreator[T, O], option O,
+       segmentSize IntervalRule, l *logger.Logger, tsTableCreator 
TSTableCreator[T, O], option O,
 ) *segmentController[T, O] {
        clock, _ := timestamp.GetClock(ctx)
        return &segmentController[T, O]{
@@ -144,7 +138,6 @@ func newSegmentController[T TSTable, O any](ctx 
context.Context, location string
                segmentSize:    segmentSize,
                l:              l,
                clock:          clock,
-               scheduler:      scheduler,
                position:       common.GetPosition(ctx),
                tsTableCreator: tsTableCreator,
                option:         option,
@@ -305,7 +298,7 @@ func (sc *segmentController[T, O]) load(start, end 
time.Time, root string) (seg
        if tsTable, err = sc.tsTableCreator(lfs, segPath, p, sc.l, 
timestamp.NewSectionTimeRange(start, end), sc.option); err != nil {
                return nil, err
        }
-       seg, err = openSegment[T](context.WithValue(context.Background(), 
logger.ContextKey, sc.l), start, end, segPath, suffix, sc.segmentSize, 
sc.scheduler, tsTable, p)
+       seg, err = openSegment[T](context.WithValue(context.Background(), 
logger.ContextKey, sc.l), start, end, segPath, suffix, sc.segmentSize, tsTable, 
p)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/internal/storage/shard.go 
b/banyand/internal/storage/shard.go
index 891b4b1e..d4e32023 100644
--- a/banyand/internal/storage/shard.go
+++ b/banyand/internal/storage/shard.go
@@ -52,7 +52,7 @@ func (d *database[T, O]) openShard(ctx context.Context, id 
common.ShardID) (*sha
                l:        l,
                position: common.GetPosition(shardCtx),
                segmentController: newSegmentController[T](shardCtx, location,
-                       d.opts.SegmentInterval, l, d.scheduler,
+                       d.opts.SegmentInterval, l,
                        d.opts.TSTableCreator, d.opts.Option),
        }
        var err error
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 6bedb050..392fedca 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -122,10 +122,9 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                        }
                }
                iwr := &measurev1.InternalWriteRequest{
-                       Request:    writeRequest,
-                       ShardId:    uint32(shardID),
-                       SeriesHash: pbv1.HashEntity(entity),
-                       // TODO: remove the first value (measure name) of 
tagValues
+                       Request:      writeRequest,
+                       ShardId:      uint32(shardID),
+                       SeriesHash:   pbv1.HashEntity(entity),
                        EntityValues: tagValues[1:].Encode(),
                }
                nodeID, errPickNode := 
ms.nodeRegistry.Locate(writeRequest.GetMetadata().GetGroup(), 
writeRequest.GetMetadata().GetName(), uint32(shardID))
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 7ee22256..4bdaae9a 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -116,10 +116,9 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                        }
                }
                iwr := &streamv1.InternalWriteRequest{
-                       Request:    writeEntity,
-                       ShardId:    uint32(shardID),
-                       SeriesHash: pbv1.HashEntity(entity),
-                       // TODO: remove the first value (stream name) of 
tagValues
+                       Request:      writeEntity,
+                       ShardId:      uint32(shardID),
+                       SeriesHash:   pbv1.HashEntity(entity),
                        EntityValues: tagValues[1:].Encode(),
                }
                nodeID, errPickNode := 
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), 
writeEntity.GetMetadata().GetName(), uint32(shardID))
diff --git a/banyand/measure/column.go b/banyand/measure/column.go
index f208786c..eafa0134 100644
--- a/banyand/measure/column.go
+++ b/banyand/measure/column.go
@@ -57,8 +57,6 @@ func (c *column) mustWriteTo(cm *columnMetadata, columnWriter 
*writer) {
        cm.name = c.name
        cm.valueType = c.valueType
 
-       // TODO: encoding values based on value type
-
        bb := bigValuePool.Generate()
        defer bigValuePool.Release(bb)
 
diff --git a/banyand/measure/part_iter_test.go 
b/banyand/measure/part_iter_test.go
index 45c18d3c..11dae20c 100644
--- a/banyand/measure/part_iter_test.go
+++ b/banyand/measure/part_iter_test.go
@@ -31,7 +31,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
-// TODO: test more scenarios.
 func Test_partIter_nextBlock(t *testing.T) {
        tests := []struct {
                wantErr error
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 85460d5d..bf618242 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -433,7 +433,6 @@ func (manager *topNProcessorManager) buildMapper(fieldName 
string, groupByNames
                                // save string representation of group values 
as the key, i.e. v1
                                "",
                                // field value as v2
-                               // TODO: we only support int64
                                
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
                                // groupBy tag values as v3
                                nil,
@@ -454,7 +453,6 @@ func (manager *topNProcessorManager) buildMapper(fieldName 
string, groupByNames
                                return 
stringify(extractTagValue(dpWithEvs.DataPointValue, locator))
                        }), "|"),
                        // field value as v2
-                       // TODO: we only support int64
                        dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
                        // groupBy tag values as v3
                        transform(groupLocator, func(locator 
partition.TagLocator) *modelv1.TagValue {
diff --git a/banyand/metadata/embeddedetcd/server.go 
b/banyand/metadata/embeddedetcd/server.go
index dfd87b73..c6833ef2 100644
--- a/banyand/metadata/embeddedetcd/server.go
+++ b/banyand/metadata/embeddedetcd/server.go
@@ -106,7 +106,6 @@ func NewServer(options ...Option) (Server, error) {
        if l, err = zapCfg.Build(); err != nil {
                return nil, err
        }
-       // TODO: allow use cluster setting
        embedConfig, err := newEmbedEtcdConfig(conf, l)
        if err != nil {
                return nil, err
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index f1464e2c..da3640c5 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -87,7 +87,6 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                        resp = 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
                }
        }()
-       // TODO: support multiple groups
        if len(queryCriteria.Groups) > 1 {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("only 
support one group in the query request"))
                return
@@ -170,7 +169,6 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                        resp = 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
                }
        }()
-       // TODO: support multiple groups
        if len(queryCriteria.Groups) > 1 {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("only 
support one group in the query request"))
                return
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 7ebb1d48..b3ca5143 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -55,7 +55,6 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                t.log.Warn().Msg("invalid event data type")
                return
        }
-       // TODO: support multiple groups
        if len(request.Groups) > 1 {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("only 
support one group in the query request"))
                return
diff --git a/banyand/stream/part_iter_test.go b/banyand/stream/part_iter_test.go
index 5a7d258a..ef42be37 100644
--- a/banyand/stream/part_iter_test.go
+++ b/banyand/stream/part_iter_test.go
@@ -31,7 +31,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
-// TODO: test more scenarios.
 func Test_partIter_nextBlock(t *testing.T) {
        tests := []struct {
                wantErr error
diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go
index 5d595043..7f5459c0 100644
--- a/banyand/stream/tag.go
+++ b/banyand/stream/tag.go
@@ -57,8 +57,6 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer) 
{
        tm.name = t.name
        tm.valueType = t.valueType
 
-       // TODO: encoding values based on value type
-
        bb := bigValuePool.Generate()
        defer bigValuePool.Release(bb)
 
diff --git a/bydbctl/internal/cmd/rest.go b/bydbctl/internal/cmd/rest.go
index 2645f106..1ba5a5a7 100644
--- a/bydbctl/internal/cmd/rest.go
+++ b/bydbctl/internal/cmd/rest.go
@@ -115,7 +115,6 @@ func parseFromYAML(tryParseGroup bool, reader io.Reader) 
(requests []reqBody, er
                }
 
                var group string
-               // TODO:// bydbctl should support multiple groups
                if metadata, ok := data["metadata"].(map[string]interface{}); 
ok {
                        group, ok = metadata["group"].(string)
                        if !ok && tryParseGroup {
diff --git a/dist/LICENSE b/dist/LICENSE
index e640967f..2de4f6be 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -372,7 +372,6 @@ MPL-2.0 licenses
 ========================================================================
 
     github.com/hashicorp/golang-lru v1.0.2 MPL-2.0
-    github.com/hashicorp/golang-lru/v2 v2.0.7 MPL-2.0
     github.com/hashicorp/hcl v1.0.0 MPL-2.0
     github.com/shoenig/go-m1cpu v0.1.6 MPL-2.0
 
diff --git a/dist/licenses/license-github.com-hashicorp-golang-lru-v2.txt 
b/dist/licenses/license-github.com-hashicorp-golang-lru-v2.txt
deleted file mode 100644
index 0e5d580e..00000000
--- a/dist/licenses/license-github.com-hashicorp-golang-lru-v2.txt
+++ /dev/null
@@ -1,364 +0,0 @@
-Copyright (c) 2014 HashiCorp, Inc.
-
-Mozilla Public License, version 2.0
-
-1. Definitions
-
-1.1. "Contributor"
-
-     means each individual or legal entity that creates, contributes to the
-     creation of, or owns Covered Software.
-
-1.2. "Contributor Version"
-
-     means the combination of the Contributions of others (if any) used by a
-     Contributor and that particular Contributor's Contribution.
-
-1.3. "Contribution"
-
-     means Covered Software of a particular Contributor.
-
-1.4. "Covered Software"
-
-     means Source Code Form to which the initial Contributor has attached the
-     notice in Exhibit A, the Executable Form of such Source Code Form, and
-     Modifications of such Source Code Form, in each case including portions
-     thereof.
-
-1.5. "Incompatible With Secondary Licenses"
-     means
-
-     a. that the initial Contributor has attached the notice described in
-        Exhibit B to the Covered Software; or
-
-     b. that the Covered Software was made available under the terms of
-        version 1.1 or earlier of the License, but not also under the terms of
-        a Secondary License.
-
-1.6. "Executable Form"
-
-     means any form of the work other than Source Code Form.
-
-1.7. "Larger Work"
-
-     means a work that combines Covered Software with other material, in a
-     separate file or files, that is not Covered Software.
-
-1.8. "License"
-
-     means this document.
-
-1.9. "Licensable"
-
-     means having the right to grant, to the maximum extent possible, whether
-     at the time of the initial grant or subsequently, any and all of the
-     rights conveyed by this License.
-
-1.10. "Modifications"
-
-     means any of the following:
-
-     a. any file in Source Code Form that results from an addition to,
-        deletion from, or modification of the contents of Covered Software; or
-
-     b. any new file in Source Code Form that contains any Covered Software.
-
-1.11. "Patent Claims" of a Contributor
-
-      means any patent claim(s), including without limitation, method,
-      process, and apparatus claims, in any patent Licensable by such
-      Contributor that would be infringed, but for the grant of the License,
-      by the making, using, selling, offering for sale, having made, import,
-      or transfer of either its Contributions or its Contributor Version.
-
-1.12. "Secondary License"
-
-      means either the GNU General Public License, Version 2.0, the GNU Lesser
-      General Public License, Version 2.1, the GNU Affero General Public
-      License, Version 3.0, or any later versions of those licenses.
-
-1.13. "Source Code Form"
-
-      means the form of the work preferred for making modifications.
-
-1.14. "You" (or "Your")
-
-      means an individual or a legal entity exercising rights under this
-      License. For legal entities, "You" includes any entity that controls, is
-      controlled by, or is under common control with You. For purposes of this
-      definition, "control" means (a) the power, direct or indirect, to cause
-      the direction or management of such entity, whether by contract or
-      otherwise, or (b) ownership of more than fifty percent (50%) of the
-      outstanding shares or beneficial ownership of such entity.
-
-
-2. License Grants and Conditions
-
-2.1. Grants
-
-     Each Contributor hereby grants You a world-wide, royalty-free,
-     non-exclusive license:
-
-     a. under intellectual property rights (other than patent or trademark)
-        Licensable by such Contributor to use, reproduce, make available,
-        modify, display, perform, distribute, and otherwise exploit its
-        Contributions, either on an unmodified basis, with Modifications, or
-        as part of a Larger Work; and
-
-     b. under Patent Claims of such Contributor to make, use, sell, offer for
-        sale, have made, import, and otherwise transfer either its
-        Contributions or its Contributor Version.
-
-2.2. Effective Date
-
-     The licenses granted in Section 2.1 with respect to any Contribution
-     become effective for each Contribution on the date the Contributor first
-     distributes such Contribution.
-
-2.3. Limitations on Grant Scope
-
-     The licenses granted in this Section 2 are the only rights granted under
-     this License. No additional rights or licenses will be implied from the
-     distribution or licensing of Covered Software under this License.
-     Notwithstanding Section 2.1(b) above, no patent license is granted by a
-     Contributor:
-
-     a. for any code that a Contributor has removed from Covered Software; or
-
-     b. for infringements caused by: (i) Your and any other third party's
-        modifications of Covered Software, or (ii) the combination of its
-        Contributions with other software (except as part of its Contributor
-        Version); or
-
-     c. under Patent Claims infringed by Covered Software in the absence of
-        its Contributions.
-
-     This License does not grant any rights in the trademarks, service marks,
-     or logos of any Contributor (except as may be necessary to comply with
-     the notice requirements in Section 3.4).
-
-2.4. Subsequent Licenses
-
-     No Contributor makes additional grants as a result of Your choice to
-     distribute the Covered Software under a subsequent version of this
-     License (see Section 10.2) or under the terms of a Secondary License (if
-     permitted under the terms of Section 3.3).
-
-2.5. Representation
-
-     Each Contributor represents that the Contributor believes its
-     Contributions are its original creation(s) or it has sufficient rights to
-     grant the rights to its Contributions conveyed by this License.
-
-2.6. Fair Use
-
-     This License is not intended to limit any rights You have under
-     applicable copyright doctrines of fair use, fair dealing, or other
-     equivalents.
-
-2.7. Conditions
-
-     Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted in
-     Section 2.1.
-
-
-3. Responsibilities
-
-3.1. Distribution of Source Form
-
-     All distribution of Covered Software in Source Code Form, including any
-     Modifications that You create or to which You contribute, must be under
-     the terms of this License. You must inform recipients that the Source
-     Code Form of the Covered Software is governed by the terms of this
-     License, and how they can obtain a copy of this License. You may not
-     attempt to alter or restrict the recipients' rights in the Source Code
-     Form.
-
-3.2. Distribution of Executable Form
-
-     If You distribute Covered Software in Executable Form then:
-
-     a. such Covered Software must also be made available in Source Code Form,
-        as described in Section 3.1, and You must inform recipients of the
-        Executable Form how they can obtain a copy of such Source Code Form by
-        reasonable means in a timely manner, at a charge no more than the cost
-        of distribution to the recipient; and
-
-     b. You may distribute such Executable Form under the terms of this
-        License, or sublicense it under different terms, provided that the
-        license for the Executable Form does not attempt to limit or alter the
-        recipients' rights in the Source Code Form under this License.
-
-3.3. Distribution of a Larger Work
-
-     You may create and distribute a Larger Work under terms of Your choice,
-     provided that You also comply with the requirements of this License for
-     the Covered Software. If the Larger Work is a combination of Covered
-     Software with a work governed by one or more Secondary Licenses, and the
-     Covered Software is not Incompatible With Secondary Licenses, this
-     License permits You to additionally distribute such Covered Software
-     under the terms of such Secondary License(s), so that the recipient of
-     the Larger Work may, at their option, further distribute the Covered
-     Software under the terms of either this License or such Secondary
-     License(s).
-
-3.4. Notices
-
-     You may not remove or alter the substance of any license notices
-     (including copyright notices, patent notices, disclaimers of warranty, or
-     limitations of liability) contained within the Source Code Form of the
-     Covered Software, except that You may alter any license notices to the
-     extent required to remedy known factual inaccuracies.
-
-3.5. Application of Additional Terms
-
-     You may choose to offer, and to charge a fee for, warranty, support,
-     indemnity or liability obligations to one or more recipients of Covered
-     Software. However, You may do so only on Your own behalf, and not on
-     behalf of any Contributor. You must make it absolutely clear that any
-     such warranty, support, indemnity, or liability obligation is offered by
-     You alone, and You hereby agree to indemnify every Contributor for any
-     liability incurred by such Contributor as a result of warranty, support,
-     indemnity or liability terms You offer. You may include additional
-     disclaimers of warranty and limitations of liability specific to any
-     jurisdiction.
-
-4. Inability to Comply Due to Statute or Regulation
-
-   If it is impossible for You to comply with any of the terms of this License
-   with respect to some or all of the Covered Software due to statute,
-   judicial order, or regulation then You must: (a) comply with the terms of
-   this License to the maximum extent possible; and (b) describe the
-   limitations and the code they affect. Such description must be placed in a
-   text file included with all distributions of the Covered Software under
-   this License. Except to the extent prohibited by statute or regulation,
-   such description must be sufficiently detailed for a recipient of ordinary
-   skill to be able to understand it.
-
-5. Termination
-
-5.1. The rights granted under this License will terminate automatically if You
-     fail to comply with any of its terms. However, if You become compliant,
-     then the rights granted under this License from a particular Contributor
-     are reinstated (a) provisionally, unless and until such Contributor
-     explicitly and finally terminates Your grants, and (b) on an ongoing
-     basis, if such Contributor fails to notify You of the non-compliance by
-     some reasonable means prior to 60 days after You have come back into
-     compliance. Moreover, Your grants from a particular Contributor are
-     reinstated on an ongoing basis if such Contributor notifies You of the
-     non-compliance by some reasonable means, this is the first time You have
-     received notice of non-compliance with this License from such
-     Contributor, and You become compliant prior to 30 days after Your receipt
-     of the notice.
-
-5.2. If You initiate litigation against any entity by asserting a patent
-     infringement claim (excluding declaratory judgment actions,
-     counter-claims, and cross-claims) alleging that a Contributor Version
-     directly or indirectly infringes any patent, then the rights granted to
-     You by any and all Contributors for the Covered Software under Section
-     2.1 of this License shall terminate.
-
-5.3. In the event of termination under Sections 5.1 or 5.2 above, all end user
-     license agreements (excluding distributors and resellers) which have been
-     validly granted by You or Your distributors under this License prior to
-     termination shall survive termination.
-
-6. Disclaimer of Warranty
-
-   Covered Software is provided under this License on an "as is" basis,
-   without warranty of any kind, either expressed, implied, or statutory,
-   including, without limitation, warranties that the Covered Software is free
-   of defects, merchantable, fit for a particular purpose or non-infringing.
-   The entire risk as to the quality and performance of the Covered Software
-   is with You. Should any Covered Software prove defective in any respect,
-   You (not any Contributor) assume the cost of any necessary servicing,
-   repair, or correction. This disclaimer of warranty constitutes an essential
-   part of this License. No use of  any Covered Software is authorized under
-   this License except under this disclaimer.
-
-7. Limitation of Liability
-
-   Under no circumstances and under no legal theory, whether tort (including
-   negligence), contract, or otherwise, shall any Contributor, or anyone who
-   distributes Covered Software as permitted above, be liable to You for any
-   direct, indirect, special, incidental, or consequential damages of any
-   character including, without limitation, damages for lost profits, loss of
-   goodwill, work stoppage, computer failure or malfunction, or any and all
-   other commercial damages or losses, even if such party shall have been
-   informed of the possibility of such damages. This limitation of liability
-   shall not apply to liability for death or personal injury resulting from
-   such party's negligence to the extent applicable law prohibits such
-   limitation. Some jurisdictions do not allow the exclusion or limitation of
-   incidental or consequential damages, so this exclusion and limitation may
-   not apply to You.
-
-8. Litigation
-
-   Any litigation relating to this License may be brought only in the courts
-   of a jurisdiction where the defendant maintains its principal place of
-   business and such litigation shall be governed by laws of that
-   jurisdiction, without reference to its conflict-of-law provisions. Nothing
-   in this Section shall prevent a party's ability to bring cross-claims or
-   counter-claims.
-
-9. Miscellaneous
-
-   This License represents the complete agreement concerning the subject
-   matter hereof. If any provision of this License is held to be
-   unenforceable, such provision shall be reformed only to the extent
-   necessary to make it enforceable. Any law or regulation which provides that
-   the language of a contract shall be construed against the drafter shall not
-   be used to construe this License against a Contributor.
-
-
-10. Versions of the License
-
-10.1. New Versions
-
-      Mozilla Foundation is the license steward. Except as provided in Section
-      10.3, no one other than the license steward has the right to modify or
-      publish new versions of this License. Each version will be given a
-      distinguishing version number.
-
-10.2. Effect of New Versions
-
-      You may distribute the Covered Software under the terms of the version
-      of the License under which You originally received the Covered Software,
-      or under the terms of any subsequent version published by the license
-      steward.
-
-10.3. Modified Versions
-
-      If you create software not governed by this License, and you want to
-      create a new license for such software, you may create and use a
-      modified version of this License if you rename the license and remove
-      any references to the name of the license steward (except to note that
-      such modified license differs from this License).
-
-10.4. Distributing Source Code Form that is Incompatible With Secondary
-      Licenses If You choose to distribute Source Code Form that is
-      Incompatible With Secondary Licenses under the terms of this version of
-      the License, the notice described in Exhibit B of this License must be
-      attached.
-
-Exhibit A - Source Code Form License Notice
-
-      This Source Code Form is subject to the
-      terms of the Mozilla Public License, v.
-      2.0. If a copy of the MPL was not
-      distributed with this file, You can
-      obtain one at
-      http://mozilla.org/MPL/2.0/.
-
-If it is not possible or desirable to put the notice in a particular file,
-then You may include the notice in a location (such as a LICENSE file in a
-relevant directory) where a recipient would be likely to look for such a
-notice.
-
-You may add additional accurate notices of copyright ownership.
-
-Exhibit B - "Incompatible With Secondary Licenses" Notice
-
-      This Source Code Form is "Incompatible
-      With Secondary Licenses", as defined by
-      the Mozilla Public License, v. 2.0.
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 2495c817..299e1ef9 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2558,7 +2558,7 @@ TopNRequest is the request contract for query.
 | name | [string](#string) |  | name is the identity of a measure. |
 | time_range | [banyandb.model.v1.TimeRange](#banyandb-model-v1-TimeRange) |  
| time_range is a range query with begin/end time of entities in the timeunit 
of milliseconds. |
 | top_n | [int32](#int32) |  | top_n set the how many items should be returned 
in each list. |
-| agg | 
[banyandb.model.v1.AggregationFunction](#banyandb-model-v1-AggregationFunction) 
|  | agg aggregates lists grouped by field names in the time_range TODO 
validate enum defined_only |
+| agg | 
[banyandb.model.v1.AggregationFunction](#banyandb-model-v1-AggregationFunction) 
|  | agg aggregates lists grouped by field names in the time_range |
 | conditions | [banyandb.model.v1.Condition](#banyandb-model-v1-Condition) | 
repeated | criteria select counters. Only equals are acceptable. |
 | field_value_sort | [banyandb.model.v1.Sort](#banyandb-model-v1-Sort) |  | 
field_value_sort indicates how to sort fields |
 | trace | [bool](#bool) |  | trace is used to enable trace for the query |
diff --git a/docs/observability.md b/docs/observability.md
index e0c7d8d7..2a29e2b7 100644
--- a/docs/observability.md
+++ b/docs/observability.md
@@ -15,5 +15,3 @@ The Docker image is tagged as "prometheus" to facilitate 
cloud-native operations
 Banyand, the server of BanyanDB, supports profiling automatically. The 
profiling data is collected by the `pprof` package and can be accessed through 
the `/debug/pprof` endpoint. The port of the profiling server is `2122` by 
default.
 
 ## Tracing
-
-TODO: Add details about the tracing support in BanyanDB, such as how to enable 
tracing, available tracing tools, and how to analyze tracing data.
diff --git a/go.mod b/go.mod
index b99f7a25..7e949d0a 100644
--- a/go.mod
+++ b/go.mod
@@ -18,7 +18,6 @@ require (
        github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
        github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
        github.com/hashicorp/golang-lru v1.0.2
-       github.com/hashicorp/golang-lru/v2 v2.0.7
        github.com/kkdai/maglev v0.2.0
        github.com/montanaflynn/stats v0.7.1
        github.com/oklog/run v1.1.0
diff --git a/go.sum b/go.sum
index b05ab194..efbfc08b 100644
--- a/go.sum
+++ b/go.sum
@@ -167,8 +167,6 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 
h1:/c3QmbOGMGTOumP2iT/rCwB7b0Q
 github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod 
h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM=
 github.com/hashicorp/golang-lru v1.0.2 
h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
 github.com/hashicorp/golang-lru v1.0.2/go.mod 
h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
-github.com/hashicorp/golang-lru/v2 v2.0.7 
h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
-github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod 
h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
 github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
 github.com/hashicorp/hcl v1.0.0/go.mod 
h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod 
h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 21ab9b4a..50e0c292 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -53,7 +53,6 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
-       // TODO: remove streamSVC and measureSvc from query processor. To use 
metaSvc instead.
        q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, 
pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
diff --git a/pkg/convert/number_test.go b/pkg/convert/number_test.go
index eceebe9c..9fcd8473 100644
--- a/pkg/convert/number_test.go
+++ b/pkg/convert/number_test.go
@@ -18,17 +18,31 @@
 package convert
 
 import (
+       "bytes"
        "fmt"
        "testing"
 )
 
-// TODO: fix this case.
-func TestInt64ToBytes(_ *testing.T) {
-       fmt.Println(Int64ToBytes(-100))
-       fmt.Println(Int64ToBytes(-2))
-       fmt.Println(Int64ToBytes(-1))
-       fmt.Println(Int64ToBytes(0))
-       fmt.Println(Int64ToBytes(1))
-       fmt.Println(Int64ToBytes(2))
-       fmt.Println(Int64ToBytes(100))
+func TestInt64ToBytes(t *testing.T) {
+       testCases := []struct {
+               expected []byte
+               input    int64
+       }{
+               {[]byte{127, 255, 255, 255, 255, 255, 255, 156}, -100},
+               {[]byte{127, 255, 255, 255, 255, 255, 255, 254}, -2},
+               {[]byte{127, 255, 255, 255, 255, 255, 255, 255}, -1},
+               {[]byte{128, 0, 0, 0, 0, 0, 0, 0}, 0},
+               {[]byte{128, 0, 0, 0, 0, 0, 0, 1}, 1},
+               {[]byte{128, 0, 0, 0, 0, 0, 0, 2}, 2},
+               {[]byte{128, 0, 0, 0, 0, 0, 0, 100}, 100},
+       }
+
+       for _, tc := range testCases {
+               t.Run(fmt.Sprintf("Int64ToBytes(%d)", tc.input), func(t 
*testing.T) {
+                       result := Int64ToBytes(tc.input)
+                       if !bytes.Equal(result, tc.expected) {
+                               t.Errorf("Expected %v, got %v", tc.expected, 
result)
+                       }
+               })
+       }
 }
diff --git a/pkg/flow/streaming/streaming.go b/pkg/flow/streaming/streaming.go
index f144aebd..f39fcab9 100644
--- a/pkg/flow/streaming/streaming.go
+++ b/pkg/flow/streaming/streaming.go
@@ -59,8 +59,6 @@ func (f *streamingFlow) prepareContext() {
        if f.ctx == nil {
                f.ctx = context.TODO()
        }
-
-       // TODO: add more runtime utilities
 }
 
 func (f *streamingFlow) To(sink flow.Sink) flow.Flow {
diff --git a/pkg/flow/streaming/unary.go b/pkg/flow/streaming/unary.go
index 6ec94191..e6b389a1 100644
--- a/pkg/flow/streaming/unary.go
+++ b/pkg/flow/streaming/unary.go
@@ -38,7 +38,6 @@ func (f *streamingFlow) Map(mapper flow.UnaryOperation[any]) 
flow.Flow {
 // Transform represents a general unary transformation
 // For example: filter, map, etc.
 func (f *streamingFlow) Transform(op flow.UnaryOperation[any]) flow.Flow {
-       // TODO: support parallelism
        f.ops = append(f.ops, newUnaryOp(op, 1))
        return f
 }
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index b943a6f1..ff7dac5e 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -282,7 +282,6 @@ func (fs *localFileSystem) MustGetFreeSpace(path string) 
uint64 {
 
 // Write adds new data to the end of a file.
 func (file *LocalFile) Write(buffer []byte) (int, error) {
-       // TODO: use bufio.Writer to optimize performance.
        size, err := file.file.Write(buffer)
        switch {
        case err == nil:
@@ -307,7 +306,6 @@ func (file *LocalFile) Write(buffer []byte) (int, error) {
 }
 
 // Writev supports appending consecutive buffers to the end of the file.
-// TODO: Optimizing under Linux.
 func (file *LocalFile) Writev(iov *[][]byte) (int, error) {
        var size int
        for _, buffer := range *iov {
@@ -353,7 +351,6 @@ func (file *LocalFile) Read(offset int64, buffer []byte) 
(int, error) {
 }
 
 // Readv is used to read contiguous regions of a file and disperse them into 
discontinuous buffers.
-// TODO: Optimizing under Linux.
 func (file *LocalFile) Readv(offset int64, iov *[][]byte) (int, error) {
        var size int
        for _, buffer := range *iov {
diff --git a/pkg/query/logical/measure/schema.go 
b/pkg/query/logical/measure/schema.go
index fe8e8947..5ac0b009 100644
--- a/pkg/query/logical/measure/schema.go
+++ b/pkg/query/logical/measure/schema.go
@@ -100,7 +100,6 @@ func (m *schema) ProjFields(fieldRefs ...*logical.FieldRef) 
logical.Schema {
 
 func (m *schema) Equal(s2 logical.Schema) bool {
        if other, ok := s2.(*schema); ok {
-               // TODO: add more equality checks
                return cmp.Equal(other.common.TagSpecMap, m.common.TagSpecMap)
        }
        return false
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index 32663de5..5c0ff1f5 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -223,7 +223,6 @@ func (sr *schemaRepo) Watcher() {
                                                        return
                                                default:
                                                }
-                                               // TODO: Reconcile when the 
retry times is more than 3.
                                                
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. 
retry...")
                                                go func() {
                                                        
sr.SendMetadataEvent(evt)
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index cc2d7f92..77e8c5e6 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -59,7 +59,6 @@ services:
       - sw_agent:/skywalking-java-agent
 
   oap:
-    # TODO: use the main repo image once v0.6.0 is released and merged into 
the main repo
     image: "ghcr.io/apache/skywalking/oap:${SW_OAP_COMMIT}"
     expose:
       - 11800
diff --git a/test/integration/load/load_suite_test.go 
b/test/integration/load/load_suite_test.go
index f2696d4f..1858e380 100644
--- a/test/integration/load/load_suite_test.go
+++ b/test/integration/load/load_suite_test.go
@@ -129,7 +129,7 @@ var _ = Describe("Load Test Suit", func() {
                allQueryLatencyData := make([]float64, 0)
                for i := 0; i < minutes; i++ {
                        GinkgoWriter.Printf("writing data at %s\n", now)
-                       cases_stream_data.Write(connection, "data.json", now, 
interval)
+                       cases_stream_data.Write(connection, "sw", now, interval)
                        if now.Sub(lastQueryTime) > queryInterval {
                                latency := queryFn(now, time.Hour)
                                latest1HourQueryLatencyData = 
append(latest1HourQueryLatencyData, float64(latency.Milliseconds()))
diff --git a/test/stress/trace/docker-compose-cluster.yaml 
b/test/stress/trace/docker-compose-cluster.yaml
index ddf14221..6173801f 100644
--- a/test/stress/trace/docker-compose-cluster.yaml
+++ b/test/stress/trace/docker-compose-cluster.yaml
@@ -92,7 +92,6 @@ services:
     extends:
       file: ../../docker/base-compose.yml
       service: oap
-    # TODO: use the main repo image once v0.6.0 is released and merged into 
the main repo
     image: "hanahmily/data-generator:${SW_OAP_COMMIT}"
     environment:
       SW_STORAGE: banyandb
diff --git a/test/stress/trace/docker-compose-single.yaml 
b/test/stress/trace/docker-compose-single.yaml
index 1f6b4f4d..9cbd2191 100644
--- a/test/stress/trace/docker-compose-single.yaml
+++ b/test/stress/trace/docker-compose-single.yaml
@@ -53,8 +53,7 @@ services:
     extends:
       file: ../../docker/base-compose.yml
       service: oap
-    # TODO: use the main repo image once v0.6.0 is released and merged into 
the main repo
-    image: "ghcr.io/apache/skywalking/data-generator:${SW_OAP_COMMIT}"
+    image: "hanahmily/data-generator:${SW_OAP_COMMIT}"
     environment:
       SW_STORAGE: banyandb
       SW_STORAGE_BANYANDB_FLUSH_INTERVAL: 5

Reply via email to