Copilot commented on code in PR #999:
URL: 
https://github.com/apache/skywalking-banyandb/pull/999#discussion_r2918021971


##########
pkg/idgen/snowflake.go:
##########
@@ -0,0 +1,90 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package idgen
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+const (
+       epoch int64 = 1609459200000
+
+       nodebits = 7
+       sequenceBits = 10
+
+       nodeShift = sequenceBits
+       timeStampShift = nodebits + sequenceBits
+
+       maxNodeID = (1 << nodebits) - 1
+       maxSequence = (1 << sequenceBits) - 1
+)
+
+// [1-bit sign=0] [46-bit ms timestamp] [7-bit node ID] [10-bit sequence]
+type Generator struct {
+       l        *logger.Logger
+       mu       sync.Mutex
+       nodeID   int64
+       sequence int64
+       lastTime int64
+}
+
+// NewGenerator creates a Generator whose 7-bit node component is derived by 
hashing the given nodeID string.
+func NewGenerator(nodeID string, l *logger.Logger) *Generator {
+       nid := int64(convert.HashStr(nodeID)) & maxNodeID
+       return &Generator{
+               nodeID: nid,
+               l:              l,
+       }

Review Comment:
   This file isn’t formatted according to the repo’s Go formatting/import rules 
(gofumpt + gci). The misaligned struct fields, extra parentheses (`if(now == 
...)`), stray whitespace, and import ordering will likely fail CI linters. Run 
gofumpt/gci (or `make lint`/your usual formatter) on this file.



##########
banyand/stream/write_standalone.go:
##########
@@ -169,7 +169,12 @@ func processElements(schemaRepo *schemaRepo, elements 
*elements, writeEvent *str
        req := writeEvent.Request
 
        elements.timestamps = append(elements.timestamps, ts)
-       eID := convert.HashStr(metadata.Group + "|" + metadata.Name + "|" + 
req.Element.ElementId)
+       var eID uint64
+       if req.Element.ElementID != "" {
+               eID = convert.HashStr(metadata.Group + "|" + metadata.Name + 
"|" + req.Element.ElementId)

Review Comment:
   `req.Element` from stream v1 uses the protobuf field `element_id`, which is 
generated as `ElementId`/`GetElementId()` in Go. The new check uses 
`req.Element.ElementID` (and the hash uses `ElementId`), which will not compile 
and is inconsistent. Use the same accessor (`GetElementId()` or `ElementId`) 
for both the emptiness check and hashing input.
   ```suggestion
        if req.Element.GetElementId() != "" {
                eID = convert.HashStr(metadata.Group + "|" + metadata.Name + 
"|" + req.Element.GetElementId())
   ```



##########
pkg/idgen/snowflake.go:
##########
@@ -0,0 +1,90 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package idgen
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+const (
+       epoch int64 = 1609459200000
+
+       nodebits = 7
+       sequenceBits = 10
+
+       nodeShift = sequenceBits
+       timeStampShift = nodebits + sequenceBits
+
+       maxNodeID = (1 << nodebits) - 1
+       maxSequence = (1 << sequenceBits) - 1
+)
+
+// [1-bit sign=0] [46-bit ms timestamp] [7-bit node ID] [10-bit sequence]
+type Generator struct {
+       l        *logger.Logger
+       mu       sync.Mutex
+       nodeID   int64
+       sequence int64
+       lastTime int64
+}
+
+// NewGenerator creates a Generator whose 7-bit node component is derived by 
hashing the given nodeID string.
+func NewGenerator(nodeID string, l *logger.Logger) *Generator {
+       nid := int64(convert.HashStr(nodeID)) & maxNodeID

Review Comment:
   `nid := int64(convert.HashStr(nodeID)) & maxNodeID` converts a `uint64` hash 
to `int64` before masking, which relies on two’s-complement behavior and is 
harder to reason about. Prefer masking in `uint64` first (or store `nodeID` as 
`uint64`) and convert only after the value is within range.
   ```suggestion
        nid := int64(convert.HashStr(nodeID) & uint64(maxNodeID))
   ```



##########
pkg/idgen/snowflake.go:
##########
@@ -0,0 +1,90 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package idgen
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+const (
+       epoch int64 = 1609459200000
+
+       nodebits = 7
+       sequenceBits = 10
+
+       nodeShift = sequenceBits
+       timeStampShift = nodebits + sequenceBits
+
+       maxNodeID = (1 << nodebits) - 1
+       maxSequence = (1 << sequenceBits) - 1
+)
+
+// [1-bit sign=0] [46-bit ms timestamp] [7-bit node ID] [10-bit sequence]
+type Generator struct {
+       l        *logger.Logger
+       mu       sync.Mutex
+       nodeID   int64
+       sequence int64
+       lastTime int64
+}
+
+// NewGenerator creates a Generator whose 7-bit node component is derived by 
hashing the given nodeID string.
+func NewGenerator(nodeID string, l *logger.Logger) *Generator {
+       nid := int64(convert.HashStr(nodeID)) & maxNodeID
+       return &Generator{
+               nodeID: nid,
+               l:              l,
+       }
+}
+
+// NextID returns the next unique 64-bit ID.
+func (g *Generator) NextID() uint64 {
+       g.mu.Lock()
+       defer g.mu.Unlock()
+       
+       now := time.Now().UnixMilli() - epoch
+
+       if now < g.lastTime {
+               // clock regression - keep using the last known timestamp
+               if g.l != nil {
+                       g.l.Warn().Int64("lastTime", g.lastTime).Int64("now", 
now).
+                               Msg("clock regression detected, using logical 
clock")
+               }
+               now = g.lastTime
+       }
+
+       if(now == g.lastTime) {
+               g.sequence++
+               if g.sequence > maxSequence {
+                       // sequence exhausted - wait for the next millisecond
+                       for now <= g.lastTime {
+                               now = time.Now().UnixMilli() - epoch
+                       }
+                       g.sequence = 0
+               }

Review Comment:
   The sequence-overflow path uses a tight busy-wait loop calling 
`time.Now().UnixMilli()` until the next millisecond. Under sustained 
high-throughput (>1024 IDs/ms), this can burn CPU and amplify latency. Consider 
yielding/sleeping briefly in the loop (or using a clock abstraction) to avoid 
spinning.



##########
banyand/stream/metadata.go:
##########
@@ -82,6 +85,8 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
streamDataNodeRegistry grpc
                l:        svc.l,
                path:     path,
                metadata: svc.metadata,
+               nodeID:   nodeID,
+               idGen:    idgen.NewGenerator(nodeID, svc.l),
                role:     databasev1.Role_ROLE_LIAISON,

Review Comment:
   `newLiaisonSchemaRepo` now initializes `schemaRepo.nodeID`/`idGen` but 
references `nodeID` without it being in scope, and its signature still only 
accepts `(path, svc, streamDataNodeRegistry)`. This won’t compile. Add a 
`nodeID string` parameter to the function signature and pass it through from 
the caller.



##########
banyand/stream/write_standalone.go:
##########
@@ -169,7 +169,12 @@ func processElements(schemaRepo *schemaRepo, elements 
*elements, writeEvent *str
        req := writeEvent.Request
 
        elements.timestamps = append(elements.timestamps, ts)
-       eID := convert.HashStr(metadata.Group + "|" + metadata.Name + "|" + 
req.Element.ElementId)
+       var eID uint64
+       if req.Element.ElementID != "" {
+               eID = convert.HashStr(metadata.Group + "|" + metadata.Name + 
"|" + req.Element.ElementId)
+       } else {
+               eID = schemaRepo.idGen.NextID()
+       }

Review Comment:
   The new behavior (server-generated element ID when `element_id` is empty) 
changes deduplication semantics. There are many existing stream tests, but none 
appear to cover the empty `element_id` path; adding a test that writes multiple 
elements with empty `element_id` and asserts they don’t collapse into one 
element would prevent regressions.



##########
banyand/stream/svc_liaison.go:
##########
@@ -186,8 +186,9 @@ func (s *liaison) PreRun(ctx context.Context) error {
        } else {
                s.l.Info().Msg("failed parts storage limit disabled (percent 
set to 0)")
        }
+       nodeID := val.(string)
        streamDataNodeRegistry := 
grpc.NewClusterNodeRegistry(data.TopicStreamPartSync, s.option.tire2Client, 
s.dataNodeSelector)
-       s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, 
streamDataNodeRegistry)
+       s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, 
streamDataNodeRegistry, nodeID)

Review Comment:
   `common.ContextNodeKey` is used elsewhere as a `common.Node` (see standalone 
stream service). Casting `val` to `string` here will panic at runtime when the 
context contains a `common.Node`. Type assert to `common.Node` (with `ok` 
check) and use `node.NodeID` as the string node ID.



##########
pkg/idgen/snowflake_test.go:
##########
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package idgen
+
+import (
+       "sync"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestGenerator_Uniqueness(t *testing.T) {
+       g := NewGenerator("test-node-0", nil)
+       const count = 1_000_000
+       ids := make(map[uint64]struct{}, count)
+       for i := 0; i < count; i++ {
+               id := g.NextID()
+               _, exists := ids[id]
+               require.False(t, exists, "duplicate ID at iteration %d: %d", i, 
id)
+               ids[id] = struct{}{}
+       }
+       assert.Len(t, ids, count)
+}
+
+func TestGenerator_Uniqueness_Concurrent(t *testing.T) {
+       g := NewGenerator("test-node-0", nil)
+       const goroutines = 10
+       const perGoroutine = 100_000
+
+       results := make([][]uint64, goroutines)
+       var wg sync.WaitGroup
+       wg.Add(goroutines)
+       for i := 0; i < goroutines; i++ {
+               i := i
+               results[i] = make([]uint64, perGoroutine)
+               go func() {
+                       defer wg.Done()
+                       for j := 0; j < perGoroutine; j++ {
+                               results[i][j] = g.NextID()
+                       }
+               }()
+       }
+       wg.Wait()
+
+       all := make(map[uint64]struct{}, goroutines*perGoroutine)
+       for _, batch := range results {
+               for _, id := range batch {
+                       _, exists := all[id]
+                       require.False(t, exists, "concurrent duplicate ID: %d", 
id)
+                       all[id] = struct{}{}
+               }
+       }
+       assert.Len(t, all, goroutines*perGoroutine)

Review Comment:
   These tests generate and store ~2M IDs in maps (plus concurrent slices), 
which can make CI slow and memory-hungry and may become flaky on constrained 
runners. Consider reducing the counts, guarding the large loops behind 
`testing.Short()`, and/or moving the large-volume checks to a benchmark while 
keeping a smaller deterministic correctness test in `go test`.



##########
banyand/stream/svc_liaison.go:
##########
@@ -186,8 +186,9 @@ func (s *liaison) PreRun(ctx context.Context) error {
        } else {
                s.l.Info().Msg("failed parts storage limit disabled (percent 
set to 0)")
        }
+       nodeID := val.(string)
        streamDataNodeRegistry := 
grpc.NewClusterNodeRegistry(data.TopicStreamPartSync, s.option.tire2Client, 
s.dataNodeSelector)
-       s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, 
streamDataNodeRegistry)
+       s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, 
streamDataNodeRegistry, nodeID)
        s.writeListener = setUpWriteQueueCallback(s.l, &s.schemaRepo, 
s.maxDiskUsagePercent, s.option.tire2Client)
 

Review Comment:
   PR description indicates the `CHANGES.md` log still needs updating, but this 
PR doesn’t include that change. Please add an entry describing the new 
server-side element ID generation for stream writes (and its impact on 
deduplication) to keep release notes accurate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to