This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new e8c05d059 use lock-free atomic CAS for snowflake ID generator (#999)
e8c05d059 is described below

commit e8c05d05922033f7735cdd2ff47aa35509dbcffa
Author: Tanay Paul <[email protected]>
AuthorDate: Thu Mar 12 17:56:04 2026 +0530

    use lock-free atomic CAS for snowflake ID generator (#999)
---
 CHANGES.md                         |   1 +
 banyand/stream/metadata.go         |   7 ++-
 banyand/stream/svc_liaison.go      |   3 +-
 banyand/stream/write_standalone.go |   7 ++-
 pkg/idgen/snowflake.go             |  99 ++++++++++++++++++++++++++++++
 pkg/idgen/snowflake_test.go        | 121 +++++++++++++++++++++++++++++++++++++
 6 files changed, 235 insertions(+), 3 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 0b6e13a1d..50071ac63 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -27,6 +27,7 @@ Release Notes.
 - Add eBPF-based KTM I/O monitor for FODC agent.
 - Support relative paths in configuration.
 - Support 'none' node discovery and make it the default.
+- Support server-side element ID generation for stream writes when clients 
omit element_id.
 
 ### Bug Fixes
 
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index dca00f739..4bf425cda 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -37,6 +37,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/banyand/queue/pub"
+       "github.com/apache/skywalking-banyandb/pkg/idgen"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
@@ -54,6 +55,7 @@ type schemaRepo struct {
        resourceSchema.Repository
        l        *logger.Logger
        metadata metadata.Repo
+       idGen    *idgen.Generator
        path     string
        nodeID   string
        role     databasev1.Role
@@ -65,6 +67,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels 
map[string]string, n
                path:     path,
                metadata: svc.metadata,
                nodeID:   nodeID,
+               idGen:    idgen.NewGenerator(nodeID, svc.l),
                role:     databasev1.Role_ROLE_DATA,
                Repository: resourceSchema.NewRepository(
                        svc.metadata,
@@ -77,11 +80,13 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels 
map[string]string, n
        return sr
 }
 
-func newLiaisonSchemaRepo(path string, svc *liaison, streamDataNodeRegistry 
grpc.NodeRegistry) schemaRepo {
+func newLiaisonSchemaRepo(path string, svc *liaison, streamDataNodeRegistry 
grpc.NodeRegistry, nodeID string) schemaRepo {
        sr := schemaRepo{
                l:        svc.l,
                path:     path,
                metadata: svc.metadata,
+               nodeID:   nodeID,
+               idGen:    idgen.NewGenerator(nodeID, svc.l),
                role:     databasev1.Role_ROLE_LIAISON,
                Repository: resourceSchema.NewRepository(
                        svc.metadata,
diff --git a/banyand/stream/svc_liaison.go b/banyand/stream/svc_liaison.go
index e0247347d..49fe0f286 100644
--- a/banyand/stream/svc_liaison.go
+++ b/banyand/stream/svc_liaison.go
@@ -186,8 +186,9 @@ func (s *liaison) PreRun(ctx context.Context) error {
        } else {
                s.l.Info().Msg("failed parts storage limit disabled (percent 
set to 0)")
        }
+       node := val.(common.Node)
        streamDataNodeRegistry := 
grpc.NewClusterNodeRegistry(data.TopicStreamPartSync, s.option.tire2Client, 
s.dataNodeSelector)
-       s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, 
streamDataNodeRegistry)
+       s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, 
streamDataNodeRegistry, node.NodeID)
        s.writeListener = setUpWriteQueueCallback(s.l, &s.schemaRepo, 
s.maxDiskUsagePercent, s.option.tire2Client)
 
        // Register chunked sync handler for stream data
diff --git a/banyand/stream/write_standalone.go 
b/banyand/stream/write_standalone.go
index 7a3db5b31..294210854 100644
--- a/banyand/stream/write_standalone.go
+++ b/banyand/stream/write_standalone.go
@@ -169,7 +169,12 @@ func processElements(schemaRepo *schemaRepo, elements 
*elements, writeEvent *str
        req := writeEvent.Request
 
        elements.timestamps = append(elements.timestamps, ts)
-       eID := convert.HashStr(metadata.Group + "|" + metadata.Name + "|" + 
req.Element.ElementId)
+       var eID uint64
+       if req.Element.GetElementId() != "" {
+               eID = convert.HashStr(metadata.Group + "|" + metadata.Name + 
"|" + req.Element.GetElementId())
+       } else {
+               eID = schemaRepo.idGen.NextID()
+       }
        elements.elementIDs = append(elements.elementIDs, eID)
 
        stm, ok := schemaRepo.loadStream(metadata)
diff --git a/pkg/idgen/snowflake.go b/pkg/idgen/snowflake.go
new file mode 100644
index 000000000..4454ae4ac
--- /dev/null
+++ b/pkg/idgen/snowflake.go
@@ -0,0 +1,99 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package idgen provides a lock-free snowflake ID generator for stream writes.
+package idgen
+
+import (
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       epoch int64 = 1609459200000
+
+       nodeBits     = 7
+       sequenceBits = 10
+
+       nodeShift      = sequenceBits
+       timeStampShift = nodeBits + sequenceBits
+
+       maxNodeID   int64 = (1 << nodeBits) - 1
+       maxSequence int64 = (1 << sequenceBits) - 1
+)
+
+// Generator produces unique 64-bit snowflake IDs using a lock-free CAS loop.
+// Bit layout: [1-bit sign=0] [46-bit ms timestamp] [7-bit node ID] [10-bit 
sequence].
+// The internal state packs lastTime and sequence into a single uint64:
+// state = (lastTime << sequenceBits) | sequence.
+type Generator struct {
+       l      *logger.Logger
+       state  atomic.Uint64
+       nodeID int64
+}
+
+// NewGenerator creates a Generator whose 7-bit node component is derived by 
hashing the given nodeID string.
+func NewGenerator(nodeID string, l *logger.Logger) *Generator {
+       nid := int64(convert.HashStr(nodeID) & uint64(maxNodeID))
+       return &Generator{
+               nodeID: nid,
+               l:      l,
+       }
+}
+
+// NodeID returns the 7-bit node component used by this generator.
+func (g *Generator) NodeID() int64 {
+       return g.nodeID
+}
+
+// NextID returns the next unique 64-bit ID.
+func (g *Generator) NextID() uint64 {
+       for {
+               old := g.state.Load()
+               oldTime := int64(old >> sequenceBits)
+               oldSeq := int64(old & uint64(maxSequence))
+
+               now := time.Now().UnixMilli() - epoch
+
+               var newTime, newSeq int64
+               if now > oldTime {
+                       newTime = now
+                       newSeq = 0
+               } else {
+                       // Clock is equal or regressed — use logical clock from 
state.
+                       newTime = oldTime
+                       newSeq = oldSeq + 1
+                       if newSeq > maxSequence {
+                               // Sequence exhausted in this ms — advance 
logical clock.
+                               newTime++
+                               newSeq = 0
+                       }
+               }
+
+               newState := (uint64(newTime) << sequenceBits) | uint64(newSeq)
+               if g.state.CompareAndSwap(old, newState) {
+                       if now < oldTime && g.l != nil {
+                               g.l.Warn().Int64("lastTime", 
oldTime).Int64("now", now).
+                                       Msg("clock regression detected, using 
logical clock")
+                       }
+                       return uint64((newTime << timeStampShift) | (g.nodeID 
<< nodeShift) | newSeq)
+               }
+       }
+}
diff --git a/pkg/idgen/snowflake_test.go b/pkg/idgen/snowflake_test.go
new file mode 100644
index 000000000..154caa5eb
--- /dev/null
+++ b/pkg/idgen/snowflake_test.go
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package idgen
+
+import (
+       "sync"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestGenerator_Uniqueness(t *testing.T) {
+       gen := NewGenerator("test-node-0", nil)
+       const count = 100_000
+       ids := make(map[uint64]struct{}, count)
+       for idx := 0; idx < count; idx++ {
+               id := gen.NextID()
+               _, exists := ids[id]
+               require.False(t, exists, "duplicate ID at iteration %d: %d", 
idx, id)
+               ids[id] = struct{}{}
+       }
+       assert.Len(t, ids, count)
+}
+
+func TestGenerator_Uniqueness_Concurrent(t *testing.T) {
+       gen := NewGenerator("test-node-0", nil)
+       const goroutines = 10
+       const perGoroutine = 10_000
+
+       results := make([][]uint64, goroutines)
+       var wg sync.WaitGroup
+       wg.Add(goroutines)
+       for gi := 0; gi < goroutines; gi++ {
+               gi := gi
+               results[gi] = make([]uint64, perGoroutine)
+               go func() {
+                       defer wg.Done()
+                       for ji := 0; ji < perGoroutine; ji++ {
+                               results[gi][ji] = gen.NextID()
+                       }
+               }()
+       }
+       wg.Wait()
+
+       all := make(map[uint64]struct{}, goroutines*perGoroutine)
+       for _, batch := range results {
+               for _, id := range batch {
+                       _, exists := all[id]
+                       require.False(t, exists, "concurrent duplicate ID: %d", 
id)
+                       all[id] = struct{}{}
+               }
+       }
+       assert.Len(t, all, goroutines*perGoroutine)
+}
+
+func TestGenerator_Monotonic(t *testing.T) {
+       gen := NewGenerator("test-node-0", nil)
+       prev := gen.NextID()
+       for idx := 0; idx < 10_000; idx++ {
+               curr := gen.NextID()
+               assert.Greater(t, curr, prev, "ID must be monotonically 
increasing")
+               prev = curr
+       }
+}
+
+func TestGenerator_SequenceOverflow(t *testing.T) {
+       gen := NewGenerator("test-node-0", nil)
+       total := int(maxSequence) + 2
+       ids := make(map[uint64]struct{}, total)
+       for idx := 0; idx < total; idx++ {
+               id := gen.NextID()
+               _, exists := ids[id]
+               require.False(t, exists, "duplicate on overflow at %d: %d", 
idx, id)
+               ids[id] = struct{}{}
+       }
+       assert.Len(t, ids, total)
+}
+
+func TestGenerator_DifferentNodes(t *testing.T) {
+       gen1 := NewGenerator("node-a", nil)
+       gen2 := NewGenerator("node-b", nil)
+       ids := make(map[uint64]struct{}, 20_000)
+       for idx := 0; idx < 10_000; idx++ {
+               id1 := gen1.NextID()
+               id2 := gen2.NextID()
+               _, exists1 := ids[id1]
+               require.False(t, exists1, "cross-node duplicate from gen1: %d", 
id1)
+               _, exists2 := ids[id2]
+               require.False(t, exists2, "cross-node duplicate from gen2: %d", 
id2)
+               ids[id1] = struct{}{}
+               ids[id2] = struct{}{}
+       }
+       assert.Len(t, ids, 20_000)
+}
+
+func TestGenerator_BitLayout(t *testing.T) {
+       gen := NewGenerator("test-node-0", nil)
+       id := gen.NextID()
+       // Sign bit must be 0.
+       assert.Equal(t, uint64(0), id>>63, "sign bit must be 0")
+
+       // Node bits are at positions [10..16].
+       extractedNode := int64((id >> nodeShift) & uint64(maxNodeID))
+       assert.Equal(t, gen.NodeID(), extractedNode, "node ID should match")
+}

Reply via email to