This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace-sidx
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit ed86318d655dd6dd4318d45e46d423ca1e95a04c
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Fri Aug 15 10:18:37 2025 +0800

    fix: improve error logging during gRPC reconnection attempts
---
 banyand/internal/sidx/DESIGN.md | 865 ++++++++++++++++++++++++++++++++++++++++
 banyand/queue/pub/client.go     |   6 +-
 2 files changed, 867 insertions(+), 4 deletions(-)

diff --git a/banyand/internal/sidx/DESIGN.md b/banyand/internal/sidx/DESIGN.md
new file mode 100644
index 00000000..64d91426
--- /dev/null
+++ b/banyand/internal/sidx/DESIGN.md
@@ -0,0 +1,865 @@
+# Secondary Index File System (sidx) Design
+
+## Overview
+
+The Secondary Index File System (sidx) is a generalized file system 
abstraction inspired by BanyanDB's stream module that uses user-provided int64 
keys as ordering values instead of timestamps. This enables efficient secondary 
indexing for various data types and use cases beyond time-series data.
+
+## Core Design Principle
+
+**The int64 ordering key is PROVIDED BY THE USER/CLIENT**, not generated or 
interpreted by sidx. The sidx system treats it as an opaque ordering value and 
only performs numerical comparisons for storage organization and query 
processing.
+
+### Key Characteristics
+
+- **User-Controlled**: Client decides what the int64 represents
+- **Opaque to sidx**: System doesn't interpret meaning, only orders by 
numerical value
+- **Monotonic Ordering**: sidx only cares about numerical comparison (`<`, 
`>`, `==`)
+- **No Semantic Validation**: sidx doesn't validate what the key represents
+- **No Key Generation**: sidx never creates or transforms keys
+
+## Architecture
+
+### Replacing Timestamp-Specific Logic
+
+#### Original Stream Module (Timestamp-based)
+```go
+// Stream knows these are timestamps and interprets them
+type partMetadata struct {
+    MinTimestamp int64  // System knows this is time
+    MaxTimestamp int64  // System knows this is time
+    // ... time-specific logic
+}
+
+// Stream might validate time ranges, format for display, etc.
+func (s *snapshot) getParts(minTime, maxTime int64) []*part {
+    // Time-aware filtering logic
+}
+```
+
+#### New sidx (User-Key-based)
+```go
+// sidx treats as opaque ordering values
+type partMetadata struct {
+    MinKey int64  // Just a number to sidx - no interpretation
+    MaxKey int64  // Just a number to sidx - no interpretation
+    // ... generic ordering logic
+}
+
+// Pure numerical comparison - no time semantics
+func (s *snapshot) getParts(minKey, maxKey int64) []*part {
+    // Generic key-range filtering
+}
+```
+
+### Core Components
+
+#### 1. Part Structure
+```go
+type part struct {
+    primary              fs.Reader                // Block metadata
+    data                 fs.Reader                // User data payloads
+    userKeys             fs.Reader                // User-provided int64 keys
+    fileSystem           fs.FileSystem
+    tagMetadata          map[string]fs.Reader     // Per-tag metadata files 
(.tm)
+    tags                 map[string]fs.Reader     // Per-tag data files (.td)
+    tagFilters           map[string]fs.Reader     // Per-tag filter files (.tf)
+    path                 string
+    primaryBlockMetadata []primaryBlockMetadata
+    partMetadata         partMetadata             // Contains MinKey/MaxKey
+}
+
+type partMetadata struct {
+    CompressedSizeBytes   uint64
+    UncompressedSizeBytes uint64
+    TotalCount            uint64
+    BlocksCount           uint64
+    MinKey                int64  // Replaces MinTimestamp
+    MaxKey                int64  // Replaces MaxTimestamp
+    ID                    uint64
+}
+```
+
+#### 2. Element Structure
+```go
+type element struct {
+    seriesID  common.SeriesID
+    userKey   int64    // The ordering key from user (replaces timestamp)
+    elementID uint64   // Internal element identifier
+    data      []byte   // User payload data
+    tags      []tag    // Individual tags, not tag families
+}
+
+// Elements are sorted by: seriesID first, then userKey
+func (e *elements) Less(i, j int) bool {
+    if e.seriesIDs[i] != e.seriesIDs[j] {
+        return e.seriesIDs[i] < e.seriesIDs[j]
+    }
+    return e.userKeys[i] < e.userKeys[j]  // Pure numerical comparison
+}
+```
+
+#### 3. Tag Storage Architecture
+
+sidx uses a **tag-based file design** where each tag is stored in its own set 
of files, unlike the stream module's tag-family grouping approach. This 
provides better isolation, modularity, and performance characteristics.
+
+##### File Organization per Tag
+Each tag gets three separate files within a part directory:
+- **`tag_<name>.td`** - Tag data file containing encoded tag values
+- **`tag_<name>.tm`** - Tag metadata file with encoding info, value type, and 
block offsets
+- **`tag_<name>.tf`** - Tag filter file containing bloom filters for fast 
lookups
+
+##### Tag Data Structures
+```go
+// Runtime tag representation
+type tag struct {
+    name      string
+    values    [][]byte
+    valueType pbv1.ValueType
+    filter    *filter.BloomFilter  // For indexed tags
+    min       []byte               // For int64 tags
+    max       []byte               // For int64 tags
+}
+
+// Persistent tag metadata
+type tagMetadata struct {
+    name        string
+    valueType   pbv1.ValueType
+    dataBlock   dataBlock       // Offset/size in .td file
+    filterBlock dataBlock       // Offset/size in .tf file
+    min         []byte
+    max         []byte
+}
+```
+
+##### File Naming Constants
+```go
+const (
+    // Core file names
+    sidxPrimaryName   = "primary"     // Block metadata
+    sidxDataName      = "data"        // User data payloads
+    sidxMetaName      = "meta"        // Part metadata
+    sidxUserKeysName  = "keys"        // User-provided int64 keys
+    
+    // Tag file prefixes and extensions
+    sidxTagPrefix     = "tag_"
+    tagDataExt       = ".td"          // Tag data
+    tagMetaExt       = ".tm"          // Tag metadata
+    tagFilterExt     = ".tf"          // Tag filter (bloom)
+)
+```
+
+##### Benefits of Tag-Based Design
+- **Isolation**: Each tag is completely independent, reducing coupling
+- **Selective Loading**: Can load only the tags needed for a query
+- **Parallel Processing**: Tags can be processed independently for better 
concurrency
+- **Schema Evolution**: Adding/removing tags doesn't affect existing tags
+- **Cache Efficiency**: Related data for a single tag is stored contiguously
+- **Debugging**: Each tag's data is in separate, identifiable files
+
+#### 4. Data Storage Architecture
+
+sidx separates user data payloads from metadata to enable efficient querying 
and data organization.
+
+##### File Organization
+- **`data.bin`** - Contains the actual user data payloads (compressed)
+- **`primary.bin`** - Contains metadata for all data files including:
+  - Data block metadata (offset/size in data.bin)
+  - Key block metadata (offset/size in keys.bin)  
+  - Tag block metadata (references to tag files)
+
+##### Data Block Structure
+```go
+// Metadata for a block of user data in data.bin
+type dataBlockMetadata struct {
+    offset uint64  // Offset in data.bin file
+    size   uint64  // Compressed size in bytes
+}
+
+// Enhanced primary block metadata with data references
+type primaryBlockMetadata struct {
+    seriesID    common.SeriesID
+    minKey      int64                      // Minimum user key in block
+    maxKey      int64                      // Maximum user key in block
+    dataBlock   dataBlockMetadata          // Reference to data in data.bin
+    keysBlock   dataBlockMetadata          // Reference to keys in keys.bin
+    tagsBlocks  map[string]dataBlockMetadata // References to tag files
+}
+```
+
+##### Benefits of Separate Data Storage
+- **Efficient Metadata Scanning**: Can read block metadata without loading 
actual data
+- **Selective Data Loading**: Load only the data blocks needed for a query
+- **Independent Compression**: Optimize compression strategy per data type
+- **Clean Separation**: Metadata operations don't require data I/O
+- **Better Cache Utilization**: Metadata fits better in memory caches
+
+#### 5. Snapshot Management
+```go
+type snapshot struct {
+    parts   []*partWrapper
+    epoch   uint64
+    creator snapshotCreator
+    ref     int32
+}
+
+// Generic key-range based part filtering
+func (s *snapshot) getParts(dst []*part, minKey, maxKey int64) ([]*part, int) {
+    var count int
+    for _, p := range s.parts {
+        pm := p.p.partMetadata
+        // Pure numerical comparison - no time interpretation
+        if maxKey < pm.MinKey || minKey > pm.MaxKey {
+            continue
+        }
+        dst = append(dst, p.p)
+        count++
+    }
+    return dst, count
+}
+```
+
+#### 6. Write Interface
+```go
+type WriteRequest struct {
+    SeriesID  common.SeriesID
+    Key       int64    // User-provided ordering key
+    Data      []byte
+    Tags      map[string][]byte
+}
+
+// Write path receives user key as-is
+func (sidx *SIDX) Write(req WriteRequest) error {
+    // Store req.Key directly without interpretation
+    // Organize into parts based on key ranges
+    // No validation of key semantics
+}
+```
+
+#### 7. Query Interface
+```go
+type QueryRequest struct {
+    KeyRange  KeyRange
+    SeriesID  common.SeriesID
+    Order     Order
+    Limit     int
+}
+
+type KeyRange struct {
+    Min int64  // User-provided minimum key
+    Max int64  // User-provided maximum key
+}
+
+type Order int
+const (
+    ASC Order = iota
+    DESC
+)
+
+// Query executes range queries on user keys
+func (sidx *SIDX) Query(req QueryRequest) ([]Element, error) {
+    // Filter parts by key range
+    // Scan blocks within key bounds
+    // Return results ordered by user keys
+}
+```
+
+### Implementation Details
+
+#### Write Path
+1. **Receive user key**: Accept int64 key from client without modification
+2. **Element accumulation**: Group elements by seriesID and sort by user key
+3. **Part creation**: Create memory parts when thresholds reached
+4. **Block organization**: Organize elements into blocks within parts
+5. **Flushing**: Persist memory parts to disk based on configurable policies
+6. **Merging**: Combine multiple small parts into larger ones
+
+#### Read Path
+1. **Query validation**: Validate query parameters (not key semantics)
+2. **Snapshot access**: Get current snapshot with reference counting
+3. **Part filtering**: Select parts that overlap with query key range
+4. **Block scanning**: Scan blocks within selected parts
+5. **Result assembly**: Combine and order results by user keys
+
+#### Part Organization
+- **Part naming**: Hex-encoded epoch numbers (generation-based)
+- **Key ranges**: Each part covers a range of user keys
+- **Directory structure**: Similar to stream module (`000000001234abcd/`)
+- **File structure**: Core files plus individual tag files
+
+##### Part Directory Example
+```
+000000001234abcd/                    # Part directory (epoch-based name)
+├── metadata.json                    # Part metadata and statistics
+├── primary.bin                      # Block metadata (references to all data 
files)
+├── data.bin                        # User data payloads (compressed)
+├── meta.bin                        # Part-level metadata (compressed)
+├── keys.bin                        # User-provided int64 ordering keys
+├── tag_service_id.td               # Service ID tag data
+├── tag_service_id.tm               # Service ID tag metadata
+├── tag_service_id.tf               # Service ID tag filter (bloom)
+├── tag_endpoint.td                 # Endpoint tag data
+├── tag_endpoint.tm                 # Endpoint tag metadata
+├── tag_endpoint.tf                 # Endpoint tag filter
+├── tag_latency.td                  # Latency tag data
+├── tag_latency.tm                  # Latency tag metadata
+└── tag_latency.tf                  # Latency tag filter
+```
+
+#### Block Metadata Validation
+```go
+// Validation ensures monotonic ordering of blocks
+func validatePrimaryBlockMetadata(blocks []primaryBlockMetadata) error {
+    for i := 1; i < len(blocks); i++ {
+        if blocks[i].seriesID < blocks[i-1].seriesID {
+            return fmt.Errorf("unexpected block with smaller seriesID")
+        }
+        if blocks[i].seriesID == blocks[i-1].seriesID && blocks[i].minKey < 
blocks[i-1].minKey {
+            return fmt.Errorf("unexpected block with smaller key")
+        }
+    }
+    return nil
+}
+```
+
+## Component Architecture: Introducer, Flusher, and Merger
+
+The sidx design implements a **hybrid approach** that combines user-controlled 
timing with centralized snapshot management. Users decide when to trigger 
storage operations, but a background introducer loop ensures snapshot 
consistency and coordinates all updates.
+
+### Core Design Philosophy
+
+**User-Controlled Timing with Centralized Coordination**: Users control when 
storage operations occur, but the introducer loop manages snapshot updates to 
ensure consistency. This provides the benefits of both user control and 
reliable snapshot management.
+
+### Component Relationships
+
+#### 1. Introducer Loop (Snapshot Coordinator)
+The introducer runs as a background goroutine that coordinates all snapshot 
updates through channel-based communication:
+
+```go
+// Introduction types for different operations
+type memIntroduction struct {
+    memPart *PartWrapper
+    applied chan struct{}
+}
+
+type flusherIntroduction struct {
+    flushed map[uint64]*PartWrapper
+    applied chan struct{}
+}
+
+type mergerIntroduction struct {
+    merged  map[uint64]struct{}
+    newPart *PartWrapper
+    applied chan struct{}
+}
+
+// Introducer loop coordinates all snapshot updates
+func (sidx *SIDX) introducerLoop(
+    flushCh chan *flusherIntroduction,
+    mergeCh chan *mergerIntroduction,
+    epoch uint64,
+) {
+    for {
+        select {
+        case <-sidx.closeCh:
+            return
+        case next := <-sidx.introductions:
+            // Introduce new memory part
+            sidx.introduceMemPart(next, epoch)
+            epoch++
+        case next := <-flushCh:
+            // Introduce flushed parts, replacing memory parts
+            sidx.introduceFlushed(next, epoch)
+            epoch++
+        case next := <-mergeCh:
+            // Introduce merged part, replacing old parts
+            sidx.introduceMerged(next, epoch)
+            epoch++
+        }
+    }
+}
+```
+
+**Key Characteristics**:
+- **Background Goroutine**: Runs continuously to manage snapshot updates
+- **Channel-based Communication**: Receives updates via channels for thread 
safety
+- **Epoch Management**: Maintains ordering and consistency through epochs
+- **Single Source of Truth**: Only the introducer can modify snapshots
+
+#### 2. Flusher (User-Triggered Persistence)
+The flusher provides a simple interface for user-controlled persistence:
+
+```go
+// Flusher provides user-triggered flush operations
+type Flusher interface {
+    // Flush triggers persistence of memory parts to disk
+    // Returns error if flush operation fails
+    Flush() error
+}
+
+// Internal implementation coordinates with introducer
+func (f *flusher) Flush() error {
+    // Determine which memory parts to flush
+    memParts := f.sidx.getMemPartsToFlush()
+    if len(memParts) == 0 {
+        return nil
+    }
+    
+    // Persist parts to disk
+    flushedParts := make(map[uint64]*PartWrapper)
+    for _, mp := range memParts {
+        part, err := f.flushMemPartToDisk(mp)
+        if err != nil {
+            return err
+        }
+        flushedParts[mp.ID()] = part
+    }
+    
+    // Send introduction to coordinator
+    intro := &flusherIntroduction{
+        flushed: flushedParts,
+        applied: make(chan struct{}),
+    }
+    
+    f.sidx.flushCh <- intro
+    <-intro.applied // Wait for introduction to complete
+    return nil
+}
+```
+
+**User Control**:
+- **When to Flush**: User calls `Flush()` when needed (memory pressure, 
durability, etc.)
+- **Simple Interface**: Single function without parameters
+- **Internal Logic**: Flusher decides what to flush based on current state
+
+#### 3. Merger (User-Triggered Compaction)
+The merger provides a simple interface for user-controlled compaction:
+
+```go
+// Merger provides user-triggered merge operations
+type Merger interface {
+    // Merge triggers compaction of parts to optimize storage
+    // Returns error if merge operation fails
+    Merge() error
+}
+
+// Internal implementation coordinates with introducer
+func (m *merger) Merge() error {
+    // Determine which parts to merge
+    parts := m.sidx.getPartsToMerge()
+    if len(parts) < 2 {
+        return nil // No merge needed
+    }
+    
+    // Perform merge operation
+    mergedPart, err := m.mergePartsToDisk(parts)
+    if err != nil {
+        return err
+    }
+    
+    // Track which parts were merged
+    mergedIDs := make(map[uint64]struct{})
+    for _, part := range parts {
+        mergedIDs[part.ID()] = struct{}{}
+    }
+    
+    // Send introduction to coordinator
+    intro := &mergerIntroduction{
+        merged:  mergedIDs,
+        newPart: mergedPart,
+        applied: make(chan struct{}),
+    }
+    
+    m.sidx.mergeCh <- intro
+    <-intro.applied // Wait for introduction to complete
+    return nil
+}
+```
+
+**User Control**:
+- **When to Merge**: User calls `Merge()` when optimization is needed
+- **Simple Interface**: Single function without parameters
+- **Internal Logic**: Merger decides what to merge based on current state
+
+### Operational Flow
+
+#### Initialization and Background Loop
+
+```go
+// SIDX initialization starts the introducer loop
+func NewSIDX(options Options) *SIDX {
+    sidx := &SIDX{
+        introductions: make(chan *memIntroduction),
+        flushCh:       make(chan *flusherIntroduction),
+        mergeCh:       make(chan *mergerIntroduction),
+        closeCh:       make(chan struct{}),
+        flusher:       newFlusher(),
+        merger:        newMerger(),
+    }
+    
+    // Start the introducer loop as background goroutine
+    go sidx.introducerLoop(sidx.flushCh, sidx.mergeCh, 0)
+    
+    return sidx
+}
+```
+
+#### Write → Automatic Introduction
+
+```go
+// Write path automatically handles memory part introduction
+func (sidx *SIDX) Write(req WriteRequest) error {
+    // 1. Add element to current memory part
+    memPart, created := sidx.addToMemPart(req)
+    
+    // 2. If new memory part was created, introduce it automatically
+    if created {
+        intro := &memIntroduction{
+            memPart: memPart,
+            applied: make(chan struct{}),
+        }
+        
+        // Send to introducer loop
+        sidx.introductions <- intro
+        <-intro.applied // Wait for introduction to complete
+    }
+    
+    return nil
+}
+```
+
+#### User-Triggered Flush Operations
+
+```go
+// Example: User decides when to flush based on application needs
+func (app *Application) manageStorage() {
+    // User monitors memory usage and triggers flush
+    if app.sidx.memoryUsage() > app.maxMemory {
+        if err := app.sidx.flusher.Flush(); err != nil {
+            app.logger.Error("flush failed", err)
+        }
+    }
+    
+    // User can also flush on shutdown for durability
+    defer func() {
+        app.sidx.flusher.Flush() // Ensure all data is persisted
+    }()
+}
+```
+
+#### User-Triggered Merge Operations
+
+```go
+// Example: User optimizes storage during maintenance windows
+func (app *Application) optimizeStorage() error {
+    // User decides when to merge based on query performance
+    if app.sidx.partCount() > app.maxParts {
+        return app.sidx.merger.Merge()
+    }
+    
+    // User can schedule merges during low-load periods
+    if app.isMaintenanceWindow() {
+        return app.sidx.merger.Merge()
+    }
+    
+    return nil
+}
+```
+
+### Key Differences from Stream Module
+
+| Aspect | Stream Module | SIDX Module |
+|--------|---------------|-------------|
+| **Execution Model** | Background loops for introducer, flusher, merger | 
Single introducer loop + user-triggered flush/merge |
+| **Decision Making** | Automatic flush/merge based on timers/thresholds | 
User decides when to flush/merge |
+| **Coordination** | Full async communication via channels | Hybrid: 
introducer uses channels, user calls are sync |
+| **Lifecycle Management** | System-managed epochs, watchers, and policies | 
User-managed timing with system-managed epochs |
+| **Resource Control** | System decides when to use CPU/I/O | User decides 
when to use CPU/I/O |
+| **Interface Complexity** | Internal implementation details | Simple 
`Flush()` and `Merge()` interfaces |
+
+### Benefits of Hybrid Design
+
+1. **User Control with Safety**: Users control timing while introducer ensures 
snapshot consistency
+2. **Simple Interface**: Single-function interfaces (`Flush()`, `Merge()`) 
without parameters
+3. **Predictable Performance**: No unexpected background I/O, but reliable 
snapshot management
+4. **Application Integration**: Operations can be coordinated with application 
lifecycle
+5. **Efficient Coordination**: Channel-based introducer prevents race 
conditions
+6. **Flexible Policies**: Users implement custom flush/merge policies while 
system handles coordination
+
+### Implementation Considerations
+
+```go
+// Example user policy implementation
+type StorageManager struct {
+    sidx   *SIDX
+    policy StoragePolicy
+}
+
+type StoragePolicy struct {
+    // Flush triggers
+    MaxMemoryParts    int
+    MaxMemorySize     uint64
+    FlushOnShutdown   bool
+    
+    // Merge triggers
+    MaxPartCount      int
+    MaintenanceWindow time.Duration
+}
+
+// User implements custom flush logic
+func (sm *StorageManager) CheckAndFlush() error {
+    if sm.sidx.memoryPartCount() >= sm.policy.MaxMemoryParts ||
+       sm.sidx.memorySize() >= sm.policy.MaxMemorySize {
+        return sm.sidx.flusher.Flush()
+    }
+    return nil
+}
+
+// User implements custom merge logic  
+func (sm *StorageManager) CheckAndMerge() error {
+    if sm.sidx.partCount() >= sm.policy.MaxPartCount ||
+       sm.isMaintenanceWindow() {
+        return sm.sidx.merger.Merge()
+    }
+    return nil
+}
+
+// Internal flusher implementation
+type flusher struct {
+    sidx *SIDX
+}
+
+func (f *flusher) getMemPartsToFlush() []*PartWrapper {
+    // Internal logic to select memory parts for flushing
+    // Could be all memory parts, or based on size/age criteria
+}
+
+// Internal merger implementation  
+type merger struct {
+    sidx *SIDX
+}
+
+func (m *merger) getPartsToMerge() []*PartWrapper {
+    // Internal logic to select parts for merging
+    // Could be based on size ratios, part count, key ranges, etc.
+}
+```
+
+### Architecture Summary
+
+This hybrid architecture provides:
+
+1. **Centralized Coordination**: Single introducer loop manages all snapshot 
updates
+2. **User Control**: Simple `Flush()` and `Merge()` interfaces give users 
timing control
+3. **Thread Safety**: Channel-based communication prevents race conditions
+4. **Simplicity**: No complex parameters or configuration - internal logic 
handles details
+5. **Flexibility**: Users can implement any policy for when to trigger 
operations
+
+The design ensures that sidx remains focused on efficient storage while 
providing users with predictable, controllable storage operations that 
integrate cleanly with application lifecycles.
+
+### Configuration Options
+```go
+type Options struct {
+    // Part configuration
+    MaxKeysPerPart     int    // Maximum keys in memory part
+    KeyRangeSize       int64  // Target size of key ranges per part
+    
+    // Flush configuration
+    FlushTimeout       time.Duration
+    FlushKeyThreshold  int64  // Flush when key range exceeds this
+    
+    // Merge configuration
+    MergeMinParts      int
+    MergeMaxParts      int
+    
+    // Query configuration
+    DefaultOrder       Order  // Default ordering for queries
+    MaxQueryRange      int64  // Maximum allowed query key range
+}
+```
+
+## User Responsibility
+
+The USER/CLIENT is responsible for:
+
+1. **Key Generation**: Creating meaningful int64 values
+2. **Key Semantics**: Understanding what keys represent
+3. **Key Consistency**: Ensuring keys are comparable within same series
+4. **Range Queries**: Providing meaningful min/max values
+5. **Key Distribution**: Ensuring reasonable distribution for performance
+
+## Use Case Examples
+
+### 1. Timestamp-based Secondary Index
+```go
+// User converts timestamp to int64
+func writeEvent(event Event) {
+    req := WriteRequest{
+        SeriesID: event.ServiceID,
+        Key:      event.Timestamp.UnixNano(),  // User generates
+        Data:     event.Payload,
+        Tags:     event.Tags,
+    }
+    sidx.Write(req)
+}
+
+// User queries with timestamp range
+func queryByTime(serviceID string, start, end time.Time) []Element {
+    req := QueryRequest{
+        SeriesID: serviceID,
+        KeyRange: KeyRange{
+            Min: start.UnixNano(),  // User provides range
+            Max: end.UnixNano(),
+        },
+        Order: ASC,
+    }
+    return sidx.Query(req)
+}
+```
+
+### 2. Latency-based Secondary Index
+```go
+// User converts latency to int64 (microseconds)
+func writeLatency(requestID string, latency time.Duration) {
+    req := WriteRequest{
+        SeriesID: "latency-index",
+        Key:      latency.Microseconds(),  // User scales to int64
+        Data:     []byte(requestID),
+    }
+    sidx.Write(req)
+}
+
+// User queries latency range
+func queryByLatency(minMs, maxMs int) []Element {
+    req := QueryRequest{
+        SeriesID: "latency-index",
+        KeyRange: KeyRange{
+            Min: int64(minMs * 1000),  // User converts to microseconds
+            Max: int64(maxMs * 1000),
+        },
+        Order: DESC,  // Highest latency first
+    }
+    return sidx.Query(req)
+}
+```
+
+### 3. Sequence Number Index
+```go
+// User maintains sequence counter
+var seqCounter int64
+
+func writeMessage(msg Message) {
+    seq := atomic.AddInt64(&seqCounter, 1)
+    req := WriteRequest{
+        SeriesID: msg.TopicID,
+        Key:      seq,  // User-generated sequence
+        Data:     msg.Content,
+    }
+    sidx.Write(req)
+}
+
+// User queries sequence range
+func queryBySequence(topicID string, fromSeq, toSeq int64) []Element {
+    req := QueryRequest{
+        SeriesID: topicID,
+        KeyRange: KeyRange{Min: fromSeq, Max: toSeq},
+        Order:    ASC,
+    }
+    return sidx.Query(req)
+}
+```
+
+### 4. Score-based Index
+```go
+// User scales float score to int64
+func writeScore(userID string, score float64) {
+    req := WriteRequest{
+        SeriesID: "user-scores",
+        Key:      int64(score * 1000000),  // User scales to int64
+        Data:     []byte(userID),
+    }
+    sidx.Write(req)
+}
+
+// User queries score range
+func queryByScore(minScore, maxScore float64) []Element {
+    req := QueryRequest{
+        SeriesID: "user-scores",
+        KeyRange: KeyRange{
+            Min: int64(minScore * 1000000),  // User scales back
+            Max: int64(maxScore * 1000000),
+        },
+        Order: DESC,  // Highest scores first
+    }
+    return sidx.Query(req)
+}
+```
+
+## What sidx DOES and DOESN'T Do
+
+### sidx DOES:
+- Store user-provided keys efficiently
+- Maintain ordering by numerical comparison
+- Support range queries on keys
+- Optimize storage with LSM-tree structure
+- Handle concurrent access safely
+- Provide snapshot consistency
+- Compress and encode data efficiently
+- Support part merging and compaction
+
+### sidx DOESN'T:
+- Generate keys
+- Interpret key meaning
+- Validate key semantics
+- Transform keys
+- Assume key represents time
+- Perform time-based operations
+- Convert between key formats
+- Validate business logic
+
+## File Structure
+
+```
+banyand/internal/sidx/
+├── DESIGN.md           # This design document
+├── sidx.go             # Main interface and types
+├── part.go             # Part structures and operations
+├── snapshot.go         # Snapshot management with generic keys
+├── metadata.go         # Metadata structures (MinKey/MaxKey)
+├── tag.go              # Individual tag handling and encoding
+├── tag_metadata.go     # Tag metadata management and persistence
+├── tag_filter.go       # Tag filtering logic and bloom filters
+├── introducer.go       # Introduction system
+├── flusher.go          # Flush operations
+├── merger.go           # Merge operations
+├── query.go            # Query execution with key ranges
+├── writer.go           # Write path implementation
+├── reader.go           # Read path implementation
+├── block.go            # Block organization
+├── options.go          # Configuration options
+└── sidx_test.go        # Comprehensive tests
+```
+
+## Benefits
+
+1. **Flexibility**: Users can implement any int64-based ordering scheme
+2. **Simplicity**: sidx doesn't need domain knowledge about keys
+3. **Performance**: No key generation or transformation overhead
+4. **Extensibility**: New use cases without sidx changes
+5. **Clear Separation**: User controls semantics, sidx handles storage
+6. **Reusability**: One system for multiple index types
+7. **Compatibility**: Can replicate stream behavior exactly with timestamp keys
+
+## Migration and Integration
+
+### Relationship to Stream Module
+- Stream module continues using timestamp-specific logic
+- sidx provides generic alternative for non-time indices
+- Both can coexist and complement each other
+- Future possibility: Stream could migrate to use sidx internally
+
+### Integration Points
+- Uses same filesystem abstractions (`pkg/fs`)
+- Uses same compression and encoding (`pkg/compress`, `pkg/encoding`)
+- Uses same storage patterns (LSM-tree, parts, snapshots)
+- Compatible with existing infrastructure
+
+This design creates a clean, general-purpose secondary index system where 
users have full control over the ordering semantics while sidx focuses purely 
on efficient storage and retrieval based on numerical key comparisons.
\ No newline at end of file
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 73f4642c..39daf2af 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -283,13 +283,11 @@ func (p *pub) checkClientHealthAndReconnect(conn 
*grpc.ClientConn, md schema.Met
                                        }()
                                        return
                                }
-                               if errEvict != nil {
-                                       _ = connEvict.Close()
-                               }
+                               _ = connEvict.Close()
                                if _, ok := p.registered[name]; !ok {
                                        return
                                }
-                               p.log.Error().Err(errEvict).Msgf("failed to 
re-connect to grpc server after waiting for %s", backoff)
+                               p.log.Error().Err(errEvict).Msgf("failed to 
re-connect to grpc server %s after waiting for %s", node.GrpcAddress, backoff)
                        case <-en.c:
                                return
                        case <-p.closer.CloseNotify():

Reply via email to