Copilot commented on code in PR #999: URL: https://github.com/apache/skywalking-banyandb/pull/999#discussion_r2918021971
########## pkg/idgen/snowflake.go: ########## @@ -0,0 +1,90 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package idgen + +import ( + "sync" + "time" + + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/convert" +) + +const ( + epoch int64 = 1609459200000 + + nodebits = 7 + sequenceBits = 10 + + nodeShift = sequenceBits + timeStampShift = nodebits + sequenceBits + + maxNodeID = (1 << nodebits) - 1 + maxSequence = (1 << sequenceBits) - 1 +) + +// [1-bit sign=0] [46-bit ms timestamp] [7-bit node ID] [10-bit sequence] +type Generator struct { + l *logger.Logger + mu sync.Mutex + nodeID int64 + sequence int64 + lastTime int64 +} + +// NewGenerator creates a Generator whose 7-bit node component is derived by hashing the given nodeID string. +func NewGenerator(nodeID string, l *logger.Logger) *Generator { + nid := int64(convert.HashStr(nodeID)) & maxNodeID + return &Generator{ + nodeID: nid, + l: l, + } Review Comment: This file isn’t formatted according to the repo’s Go formatting/import rules (gofumpt + gci). The misaligned struct fields, extra parentheses (`if(now == ...)`), stray whitespace, and import ordering will likely fail CI linters. Run gofumpt/gci (or `make lint`/your usual formatter) on this file. ########## banyand/stream/write_standalone.go: ########## @@ -169,7 +169,12 @@ func processElements(schemaRepo *schemaRepo, elements *elements, writeEvent *str req := writeEvent.Request elements.timestamps = append(elements.timestamps, ts) - eID := convert.HashStr(metadata.Group + "|" + metadata.Name + "|" + req.Element.ElementId) + var eID uint64 + if req.Element.ElementID != "" { + eID = convert.HashStr(metadata.Group + "|" + metadata.Name + "|" + req.Element.ElementId) Review Comment: `req.Element` from stream v1 uses the protobuf field `element_id`, which is generated as `ElementId`/`GetElementId()` in Go. The new check uses `req.Element.ElementID` (and the hash uses `ElementId`), which will not compile and is inconsistent. Use the same accessor (`GetElementId()` or `ElementId`) for both the emptiness check and hashing input. ```suggestion if req.Element.GetElementId() != "" { eID = convert.HashStr(metadata.Group + "|" + metadata.Name + "|" + req.Element.GetElementId()) ``` ########## pkg/idgen/snowflake.go: ########## @@ -0,0 +1,90 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package idgen + +import ( + "sync" + "time" + + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/convert" +) + +const ( + epoch int64 = 1609459200000 + + nodebits = 7 + sequenceBits = 10 + + nodeShift = sequenceBits + timeStampShift = nodebits + sequenceBits + + maxNodeID = (1 << nodebits) - 1 + maxSequence = (1 << sequenceBits) - 1 +) + +// [1-bit sign=0] [46-bit ms timestamp] [7-bit node ID] [10-bit sequence] +type Generator struct { + l *logger.Logger + mu sync.Mutex + nodeID int64 + sequence int64 + lastTime int64 +} + +// NewGenerator creates a Generator whose 7-bit node component is derived by hashing the given nodeID string. +func NewGenerator(nodeID string, l *logger.Logger) *Generator { + nid := int64(convert.HashStr(nodeID)) & maxNodeID Review Comment: `nid := int64(convert.HashStr(nodeID)) & maxNodeID` converts a `uint64` hash to `int64` before masking, which relies on two’s-complement behavior and is harder to reason about. Prefer masking in `uint64` first (or store `nodeID` as `uint64`) and convert only after the value is within range. ```suggestion nid := int64(convert.HashStr(nodeID) & uint64(maxNodeID)) ``` ########## pkg/idgen/snowflake.go: ########## @@ -0,0 +1,90 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package idgen + +import ( + "sync" + "time" + + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/convert" +) + +const ( + epoch int64 = 1609459200000 + + nodebits = 7 + sequenceBits = 10 + + nodeShift = sequenceBits + timeStampShift = nodebits + sequenceBits + + maxNodeID = (1 << nodebits) - 1 + maxSequence = (1 << sequenceBits) - 1 +) + +// [1-bit sign=0] [46-bit ms timestamp] [7-bit node ID] [10-bit sequence] +type Generator struct { + l *logger.Logger + mu sync.Mutex + nodeID int64 + sequence int64 + lastTime int64 +} + +// NewGenerator creates a Generator whose 7-bit node component is derived by hashing the given nodeID string. +func NewGenerator(nodeID string, l *logger.Logger) *Generator { + nid := int64(convert.HashStr(nodeID)) & maxNodeID + return &Generator{ + nodeID: nid, + l: l, + } +} + +// NextID returns the next unique 64-bit ID. +func (g *Generator) NextID() uint64 { + g.mu.Lock() + defer g.mu.Unlock() + + now := time.Now().UnixMilli() - epoch + + if now < g.lastTime { + // clock regression - keep using the last known timestamp + if g.l != nil { + g.l.Warn().Int64("lastTime", g.lastTime).Int64("now", now). + Msg("clock regression detected, using logical clock") + } + now = g.lastTime + } + + if(now == g.lastTime) { + g.sequence++ + if g.sequence > maxSequence { + // sequence exhausted - wait for the next millisecond + for now <= g.lastTime { + now = time.Now().UnixMilli() - epoch + } + g.sequence = 0 + } Review Comment: The sequence-overflow path uses a tight busy-wait loop calling `time.Now().UnixMilli()` until the next millisecond. Under sustained high-throughput (>1024 IDs/ms), this can burn CPU and amplify latency. Consider yielding/sleeping briefly in the loop (or using a clock abstraction) to avoid spinning. ########## banyand/stream/metadata.go: ########## @@ -82,6 +85,8 @@ func newLiaisonSchemaRepo(path string, svc *liaison, streamDataNodeRegistry grpc l: svc.l, path: path, metadata: svc.metadata, + nodeID: nodeID, + idGen: idgen.NewGenerator(nodeID, svc.l), role: databasev1.Role_ROLE_LIAISON, Review Comment: `newLiaisonSchemaRepo` now initializes `schemaRepo.nodeID`/`idGen` but references `nodeID` without it being in scope, and its signature still only accepts `(path, svc, streamDataNodeRegistry)`. This won’t compile. Add a `nodeID string` parameter to the function signature and pass it through from the caller. ########## banyand/stream/write_standalone.go: ########## @@ -169,7 +169,12 @@ func processElements(schemaRepo *schemaRepo, elements *elements, writeEvent *str req := writeEvent.Request elements.timestamps = append(elements.timestamps, ts) - eID := convert.HashStr(metadata.Group + "|" + metadata.Name + "|" + req.Element.ElementId) + var eID uint64 + if req.Element.ElementID != "" { + eID = convert.HashStr(metadata.Group + "|" + metadata.Name + "|" + req.Element.ElementId) + } else { + eID = schemaRepo.idGen.NextID() + } Review Comment: The new behavior (server-generated element ID when `element_id` is empty) changes deduplication semantics. There are many existing stream tests, but none appear to cover the empty `element_id` path; adding a test that writes multiple elements with empty `element_id` and asserts they don’t collapse into one element would prevent regressions. ########## banyand/stream/svc_liaison.go: ########## @@ -186,8 +186,9 @@ func (s *liaison) PreRun(ctx context.Context) error { } else { s.l.Info().Msg("failed parts storage limit disabled (percent set to 0)") } + nodeID := val.(string) streamDataNodeRegistry := grpc.NewClusterNodeRegistry(data.TopicStreamPartSync, s.option.tire2Client, s.dataNodeSelector) - s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, streamDataNodeRegistry) + s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, streamDataNodeRegistry, nodeID) Review Comment: `common.ContextNodeKey` is used elsewhere as a `common.Node` (see standalone stream service). Casting `val` to `string` here will panic at runtime when the context contains a `common.Node`. Type assert to `common.Node` (with `ok` check) and use `node.NodeID` as the string node ID. ########## pkg/idgen/snowflake_test.go: ########## @@ -0,0 +1,121 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package idgen + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGenerator_Uniqueness(t *testing.T) { + g := NewGenerator("test-node-0", nil) + const count = 1_000_000 + ids := make(map[uint64]struct{}, count) + for i := 0; i < count; i++ { + id := g.NextID() + _, exists := ids[id] + require.False(t, exists, "duplicate ID at iteration %d: %d", i, id) + ids[id] = struct{}{} + } + assert.Len(t, ids, count) +} + +func TestGenerator_Uniqueness_Concurrent(t *testing.T) { + g := NewGenerator("test-node-0", nil) + const goroutines = 10 + const perGoroutine = 100_000 + + results := make([][]uint64, goroutines) + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + i := i + results[i] = make([]uint64, perGoroutine) + go func() { + defer wg.Done() + for j := 0; j < perGoroutine; j++ { + results[i][j] = g.NextID() + } + }() + } + wg.Wait() + + all := make(map[uint64]struct{}, goroutines*perGoroutine) + for _, batch := range results { + for _, id := range batch { + _, exists := all[id] + require.False(t, exists, "concurrent duplicate ID: %d", id) + all[id] = struct{}{} + } + } + assert.Len(t, all, goroutines*perGoroutine) Review Comment: These tests generate and store ~2M IDs in maps (plus concurrent slices), which can make CI slow and memory-hungry and may become flaky on constrained runners. Consider reducing the counts, guarding the large loops behind `testing.Short()`, and/or moving the large-volume checks to a benchmark while keeping a smaller deterministic correctness test in `go test`. ########## banyand/stream/svc_liaison.go: ########## @@ -186,8 +186,9 @@ func (s *liaison) PreRun(ctx context.Context) error { } else { s.l.Info().Msg("failed parts storage limit disabled (percent set to 0)") } + nodeID := val.(string) streamDataNodeRegistry := grpc.NewClusterNodeRegistry(data.TopicStreamPartSync, s.option.tire2Client, s.dataNodeSelector) - s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, streamDataNodeRegistry) + s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, streamDataNodeRegistry, nodeID) s.writeListener = setUpWriteQueueCallback(s.l, &s.schemaRepo, s.maxDiskUsagePercent, s.option.tire2Client) Review Comment: PR description indicates the `CHANGES.md` log still needs updating, but this PR doesn’t include that change. Please add an entry describing the new server-side element ID generation for stream writes (and its impact on deduplication) to keep release notes accurate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
