This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new e8c05d059 use lock-free atomic CAS for snowflake ID generator (#999)
e8c05d059 is described below
commit e8c05d05922033f7735cdd2ff47aa35509dbcffa
Author: Tanay Paul <[email protected]>
AuthorDate: Thu Mar 12 17:56:04 2026 +0530
use lock-free atomic CAS for snowflake ID generator (#999)
---
CHANGES.md | 1 +
banyand/stream/metadata.go | 7 ++-
banyand/stream/svc_liaison.go | 3 +-
banyand/stream/write_standalone.go | 7 ++-
pkg/idgen/snowflake.go | 99 ++++++++++++++++++++++++++++++
pkg/idgen/snowflake_test.go | 121 +++++++++++++++++++++++++++++++++++++
6 files changed, 235 insertions(+), 3 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 0b6e13a1d..50071ac63 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -27,6 +27,7 @@ Release Notes.
- Add eBPF-based KTM I/O monitor for FODC agent.
- Support relative paths in configuration.
- Support 'none' node discovery and make it the default.
+- Support server-side element ID generation for stream writes when clients
omit element_id.
### Bug Fixes
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index dca00f739..4bf425cda 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -37,6 +37,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/banyand/queue/pub"
+ "github.com/apache/skywalking-banyandb/pkg/idgen"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
@@ -54,6 +55,7 @@ type schemaRepo struct {
resourceSchema.Repository
l *logger.Logger
metadata metadata.Repo
+ idGen *idgen.Generator
path string
nodeID string
role databasev1.Role
@@ -65,6 +67,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels
map[string]string, n
path: path,
metadata: svc.metadata,
nodeID: nodeID,
+ idGen: idgen.NewGenerator(nodeID, svc.l),
role: databasev1.Role_ROLE_DATA,
Repository: resourceSchema.NewRepository(
svc.metadata,
@@ -77,11 +80,13 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels
map[string]string, n
return sr
}
-func newLiaisonSchemaRepo(path string, svc *liaison, streamDataNodeRegistry
grpc.NodeRegistry) schemaRepo {
+func newLiaisonSchemaRepo(path string, svc *liaison, streamDataNodeRegistry
grpc.NodeRegistry, nodeID string) schemaRepo {
sr := schemaRepo{
l: svc.l,
path: path,
metadata: svc.metadata,
+ nodeID: nodeID,
+ idGen: idgen.NewGenerator(nodeID, svc.l),
role: databasev1.Role_ROLE_LIAISON,
Repository: resourceSchema.NewRepository(
svc.metadata,
diff --git a/banyand/stream/svc_liaison.go b/banyand/stream/svc_liaison.go
index e0247347d..49fe0f286 100644
--- a/banyand/stream/svc_liaison.go
+++ b/banyand/stream/svc_liaison.go
@@ -186,8 +186,9 @@ func (s *liaison) PreRun(ctx context.Context) error {
} else {
s.l.Info().Msg("failed parts storage limit disabled (percent
set to 0)")
}
+ node := val.(common.Node)
streamDataNodeRegistry :=
grpc.NewClusterNodeRegistry(data.TopicStreamPartSync, s.option.tire2Client,
s.dataNodeSelector)
- s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s,
streamDataNodeRegistry)
+ s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s,
streamDataNodeRegistry, node.NodeID)
s.writeListener = setUpWriteQueueCallback(s.l, &s.schemaRepo,
s.maxDiskUsagePercent, s.option.tire2Client)
// Register chunked sync handler for stream data
diff --git a/banyand/stream/write_standalone.go
b/banyand/stream/write_standalone.go
index 7a3db5b31..294210854 100644
--- a/banyand/stream/write_standalone.go
+++ b/banyand/stream/write_standalone.go
@@ -169,7 +169,12 @@ func processElements(schemaRepo *schemaRepo, elements
*elements, writeEvent *str
req := writeEvent.Request
elements.timestamps = append(elements.timestamps, ts)
- eID := convert.HashStr(metadata.Group + "|" + metadata.Name + "|" +
req.Element.ElementId)
+ var eID uint64
+ if req.Element.GetElementId() != "" {
+ eID = convert.HashStr(metadata.Group + "|" + metadata.Name +
"|" + req.Element.GetElementId())
+ } else {
+ eID = schemaRepo.idGen.NextID()
+ }
elements.elementIDs = append(elements.elementIDs, eID)
stm, ok := schemaRepo.loadStream(metadata)
diff --git a/pkg/idgen/snowflake.go b/pkg/idgen/snowflake.go
new file mode 100644
index 000000000..4454ae4ac
--- /dev/null
+++ b/pkg/idgen/snowflake.go
@@ -0,0 +1,99 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package idgen provides a lock-free snowflake ID generator for stream writes.
+package idgen
+
+import (
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+ epoch int64 = 1609459200000
+
+ nodeBits = 7
+ sequenceBits = 10
+
+ nodeShift = sequenceBits
+ timeStampShift = nodeBits + sequenceBits
+
+ maxNodeID int64 = (1 << nodeBits) - 1
+ maxSequence int64 = (1 << sequenceBits) - 1
+)
+
+// Generator produces unique 64-bit snowflake IDs using a lock-free CAS loop.
+// Bit layout: [1-bit sign=0] [46-bit ms timestamp] [7-bit node ID] [10-bit
sequence].
+// The internal state packs lastTime and sequence into a single uint64:
+// state = (lastTime << sequenceBits) | sequence.
+type Generator struct {
+ l *logger.Logger
+ state atomic.Uint64
+ nodeID int64
+}
+
+// NewGenerator creates a Generator whose 7-bit node component is derived by
hashing the given nodeID string.
+func NewGenerator(nodeID string, l *logger.Logger) *Generator {
+ nid := int64(convert.HashStr(nodeID) & uint64(maxNodeID))
+ return &Generator{
+ nodeID: nid,
+ l: l,
+ }
+}
+
+// NodeID returns the 7-bit node component used by this generator.
+func (g *Generator) NodeID() int64 {
+ return g.nodeID
+}
+
+// NextID returns the next unique 64-bit ID.
+func (g *Generator) NextID() uint64 {
+ for {
+ old := g.state.Load()
+ oldTime := int64(old >> sequenceBits)
+ oldSeq := int64(old & uint64(maxSequence))
+
+ now := time.Now().UnixMilli() - epoch
+
+ var newTime, newSeq int64
+ if now > oldTime {
+ newTime = now
+ newSeq = 0
+ } else {
+ // Clock is equal or regressed — use logical clock from
state.
+ newTime = oldTime
+ newSeq = oldSeq + 1
+ if newSeq > maxSequence {
+ // Sequence exhausted in this ms — advance
logical clock.
+ newTime++
+ newSeq = 0
+ }
+ }
+
+ newState := (uint64(newTime) << sequenceBits) | uint64(newSeq)
+ if g.state.CompareAndSwap(old, newState) {
+ if now < oldTime && g.l != nil {
+ g.l.Warn().Int64("lastTime",
oldTime).Int64("now", now).
+ Msg("clock regression detected, using
logical clock")
+ }
+ return uint64((newTime << timeStampShift) | (g.nodeID
<< nodeShift) | newSeq)
+ }
+ }
+}
diff --git a/pkg/idgen/snowflake_test.go b/pkg/idgen/snowflake_test.go
new file mode 100644
index 000000000..154caa5eb
--- /dev/null
+++ b/pkg/idgen/snowflake_test.go
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package idgen
+
+import (
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestGenerator_Uniqueness(t *testing.T) {
+ gen := NewGenerator("test-node-0", nil)
+ const count = 100_000
+ ids := make(map[uint64]struct{}, count)
+ for idx := 0; idx < count; idx++ {
+ id := gen.NextID()
+ _, exists := ids[id]
+ require.False(t, exists, "duplicate ID at iteration %d: %d",
idx, id)
+ ids[id] = struct{}{}
+ }
+ assert.Len(t, ids, count)
+}
+
+func TestGenerator_Uniqueness_Concurrent(t *testing.T) {
+ gen := NewGenerator("test-node-0", nil)
+ const goroutines = 10
+ const perGoroutine = 10_000
+
+ results := make([][]uint64, goroutines)
+ var wg sync.WaitGroup
+ wg.Add(goroutines)
+ for gi := 0; gi < goroutines; gi++ {
+ gi := gi
+ results[gi] = make([]uint64, perGoroutine)
+ go func() {
+ defer wg.Done()
+ for ji := 0; ji < perGoroutine; ji++ {
+ results[gi][ji] = gen.NextID()
+ }
+ }()
+ }
+ wg.Wait()
+
+ all := make(map[uint64]struct{}, goroutines*perGoroutine)
+ for _, batch := range results {
+ for _, id := range batch {
+ _, exists := all[id]
+ require.False(t, exists, "concurrent duplicate ID: %d",
id)
+ all[id] = struct{}{}
+ }
+ }
+ assert.Len(t, all, goroutines*perGoroutine)
+}
+
+func TestGenerator_Monotonic(t *testing.T) {
+ gen := NewGenerator("test-node-0", nil)
+ prev := gen.NextID()
+ for idx := 0; idx < 10_000; idx++ {
+ curr := gen.NextID()
+ assert.Greater(t, curr, prev, "ID must be monotonically
increasing")
+ prev = curr
+ }
+}
+
+func TestGenerator_SequenceOverflow(t *testing.T) {
+ gen := NewGenerator("test-node-0", nil)
+ total := int(maxSequence) + 2
+ ids := make(map[uint64]struct{}, total)
+ for idx := 0; idx < total; idx++ {
+ id := gen.NextID()
+ _, exists := ids[id]
+ require.False(t, exists, "duplicate on overflow at %d: %d",
idx, id)
+ ids[id] = struct{}{}
+ }
+ assert.Len(t, ids, total)
+}
+
+func TestGenerator_DifferentNodes(t *testing.T) {
+ gen1 := NewGenerator("node-a", nil)
+ gen2 := NewGenerator("node-b", nil)
+ ids := make(map[uint64]struct{}, 20_000)
+ for idx := 0; idx < 10_000; idx++ {
+ id1 := gen1.NextID()
+ id2 := gen2.NextID()
+ _, exists1 := ids[id1]
+ require.False(t, exists1, "cross-node duplicate from gen1: %d",
id1)
+ _, exists2 := ids[id2]
+ require.False(t, exists2, "cross-node duplicate from gen2: %d",
id2)
+ ids[id1] = struct{}{}
+ ids[id2] = struct{}{}
+ }
+ assert.Len(t, ids, 20_000)
+}
+
+func TestGenerator_BitLayout(t *testing.T) {
+ gen := NewGenerator("test-node-0", nil)
+ id := gen.NextID()
+ // Sign bit must be 0.
+ assert.Equal(t, uint64(0), id>>63, "sign bit must be 0")
+
+ // Node bits are at positions [10..16].
+ extractedNode := int64((id >> nodeShift) & uint64(maxNodeID))
+ assert.Equal(t, gen.NodeID(), extractedNode, "node ID should match")
+}