hanahmily commented on code in PR #702: URL: https://github.com/apache/skywalking-banyandb/pull/702#discussion_r2214597163
########## docs/operation/property-repair.md: ########## Review Comment: You do not need to create a custom UI page for property data repair. Instead, you should utilize the existing `stream` query page for tracing queries, and the `measure` query page for viewing metrics. The primary objective of this document is to outline the process for storing tracing and metrics data. **Tracing Data:** 1. Create a group named `_property_repair_spans` with a 3-day Time-To-Live (TTL). The group's type will be `stream`. 2. Design a `stream` to store the spans. **Critical Metrics for Operator Action (Alerts):** 1. **Errors:** Monitor error metrics related to connection issues, repairs, or other types of errors. Additionally, please outline the procedures for effectively handling these errors. 2. **Unexpected Values:** For instance, you mentioned the `Sync Request Count`. If there is no increment over a specified period, the operator should take appropriate action to address this issue. ########## docs/concept/property-repair.md: ########## @@ -0,0 +1,211 @@ +# Property Background Repair Strategy + +In Property, the `replicas` configuration allows Property data to be stored across multiple data nodes, providing high availability. +However, during data updates, some nodes may become temporarily unavailable, which can lead to version inconsistencies of the same data across different machines. + +This documentation explains how the system automatically synchronizes Property data across all data nodes in the background, +ensuring that all replicas eventually converge to the same version. + +The process can be roughly divided into the following two steps: +1. **Build Merkel Tree**: Automatically build anti-entropy Merkle Trees in the background for efficient comparison in subsequent operations. +2. **Recovery Data through gossip**: Use the gossip protocol among data nodes to detect and repair data inconsistencies. + +## Build Merkel Tree + +The Merkel Tree is a data structure used to efficiently compare node information across two different data nodes, allowing +the data nodes to quickly determine whether their data is consistent. + +### Structure +In the context of property data, each shard within a group builds its own Merkle Tree, Since the Property data is finite. +The tree consists of the following three types of nodes: +1. **Leaf Node**: Store the summary information of each Property data: + 1. **Entity**: The identifier of the Property data, composed of `group_name` + `property_name` + `id`. + 2. **SHA Value**: The SHA512 hash value of the Property source data(`sha512(source_json_bytes+property_is_deleted)`), used for fast equality comparison to check the consistency of the same entity data +2. **Slot Node**: Each tree contains a fixed number(such as `32`) of slot nodes. When the Property data is added into the tree, it is placed in a slot node based on its hash value of Entity(`hash(entity) % slot_count`). + The slot node contains the SHA value of the Property data and the number of Property data in that slot. +3. **Root Node**: The root node of the Merkle Tree, which contains the SHA value of the entire tree and the number of slot nodes. + +Therefore, the Merkle Tree has above three structure levels: +* The top level is a single root node. +* The middle level is a fixed number of slot nodes. +* The bottom level is a variable number of leaf nodes. + +### Timing and Building Process + +There are two types of triggers for Merkle Tree construction: +1. **Fixed Interval**: By default, the system automatically triggers the construction every hour (this interval is configurable). +2. **On Update**: When an update in the shard is detected, the system schedules a delayed build after a short wait period (default 10 minutes). + +The construction process follows these steps: +1. **Check for Updates**: The system compares the snapshot ID(`XXXXX.snp`) of the previously built tree with the current snapshot ID of the shard. + If they differ, it indicates that data has changed, and the process continues. If they match, the tree construction is skipped. +2. **Snapshot the Shard**: A snapshot of the shard data is taken to avoid blocking ongoing business operations during data traversal. +3. **Build the Tree**: Using the streaming method, the system scans all data in the snapshot and builds a Merkle Tree for each group individually. +4. **Save the Snapshot ID**: The snapshot ID used in this build is saved, so it can be used for efficient change detection during the next scheduled run. + +## Gossip Protocol + +In BanyanDB's typical data communication model, data exchange is primarily handled by **liaison** nodes, which interact with **data nodes** through **broadcasting or publishing** mechanisms. +However, in the context of Property data repair, involving liaison nodes would introduce unnecessary network overhead. +Therefore, the system adopts the gossip protocol, allowing data nodes to communicate directly with each other to perform data repair in a more efficient and decentralized manner. + +Unlike typical gossip-based message dissemination, where messages are spread randomly, the number of **data nodes** in BanyanDB is fixed and relatively small. +To ensure greater stability, the random peer selection is replaced with a deterministic strategy, where each node communicates only with its next node in the sequence, and the node list is ordered. +Additionally, to minimize network traffic, only one peer node is involved in each round of communication. + +BanyanDB already has a built-in [cluster discovery and data transmission](./clustering.md) mechanism. Therefore, the implementation of gossip protocol can be built as an extension on top of the existing cluster protocol. +Since each node already exposes a gRPC port, there is no need to introduce any additional ports, simplifying the deployment and integration. + +### Propagation Message + +When initiating gossip message propagation, the sender node must include both the list of participating nodes and the message content. +The gossip protocol then proceeds through the following steps: + +1: **Build the Context**: The sender node builds a context object that is attached to each message and includes the following parameters: + 1. **Node List**: A list of participating nodes used to determine the next node in the sequence. + 2. **Maximum Count**: The maximum number of message transmissions, calculated as `(node_count) * 2 - 3`. The rationale behind this formula will be explained in an upcoming example. + 3. **Origin Node ID**: The ID of the node that initiated the gossip message propagation, used to return the final result. + 4. **Original Message ID**: A unique identifier for the original message, allowing the origin node to track which a gossip message propagation process has completed. +2. **Send to the First Node**: If the first node in the list is the sender itself, the process begins locally. Otherwise, the message is sent to the first node in the list. +3. **Receive Gossip Message**: Upon receiving the message, the node identifies the next node in the sequence and prepares for peer-to-peer interaction. +4. **Protocol-Level Handling**: The Property repair logic runs its own protocol between the current node and the next node, ensuring their Property data is synchronized. (Details of the two-node sync protocol will be covered in a later section.) +5. **Handle Result**: If the sync is successful, the process keeps continuing. If it fails, the flow jumps to step 7. +6. **Forward to the Next Node**: The maximum count in the context is decremented by one to indicate a completed round. + * If the maximum count reaches zero, the process is considered complete, and it proceeds to step 7. + * Otherwise, the message is forwarded to the next node, repeating steps 3–6. +7. **Send Result to Origin Node**: The current node sends the result—either success or failure—to the origin node specified within the context. +8. **Origin Node Receives the Result**: The initiating sender node receives the final outcome of the gossip protocol and can proceed with any post-processing logic. + +### Maximum Count Formula + +Since no synchronization is needed in **a single-node** setup, the following formula applies only when there are two or more nodes participating in the repair process. + +**Maximum Gossip Count = (number of nodes) × 2 − 3** + +This ensures that the updated data has sufficient opportunity to fully propagate across all nodes, even in the worst-case version distribution. + +#### Example Scenarios + +##### Case 1: 2 Nodes + +| Step | Action | A Version | B Version | +|---------|--------------------------------------|-----------|-----------| +| Initial | — | v1 | v2 | +| A → B | A receives v2 from B → **A updated** | v2 | v2 | + +* **Max Gossip Count**: `2 × 2 − 3 = 1` +* **Final State**: All nodes hold **v2** + +##### Case 2: 5 Nodes + +| Step | Action | A Version | B Version | C Version | D Version | E Version | +|---------|-----------------------------------------------|-----------|-----------|-----------|-----------|-----------| +| Initial | — | v1 | v2 | v3 | v4 | v5 | +| A → B | A receives v2 from B → **A updated** | v2 | v2 | v3 | v4 | v5 | +| B → C | B receives v3 from C → **B updated** | v2 | v3 | v3 | v4 | v5 | +| C → D | C receives v4 from D → **C updated** | v2 | v3 | v4 | v4 | v5 | +| D → E | D receives v5 from E → **D updated** | v2 | v3 | v4 | v5 | v5 | +| E → A | A receives v5 from E → **A updated to final** | v5 | v3 | v4 | v5 | v5 | +| A → B | B receives v5 from A → **B updated to final** | v5 | v5 | v4 | v5 | v5 | +| B → C | C receives v5 from B → **C updated to final** | v5 | v5 | v5 | v5 | v5 | + +* **Max Gossip Count**: `5 × 2 − 3 = 7` +* Final State: All nodes hold **v5** + +### Example of Message Propagation + +To illustrate the gossip message propagation process, consider a scenario with three nodes with the version: A(version 2), B(version 1), C(version 3), and the sender is node B. The sequence of operations is as follows: + +The process starts from **Node B** (the initiator). + + 1. **Initiation (at Node B):** Node B builds the gossip context. The propagation message is sent to the first node in the list, **Node A**. + 2. **Step 1 (at Node A):** + * Node A receives the message. + * A identifies its successor in the list: **Node B**. + * A synchronizes with B. Since A(v2) is newer than B(v1), B's data is updated. + * **State:** A(v2), B(v2), C(v3). + * A forwards the gossip message to B. + 3. **Step 2 (at Node B):** + * Node B receives the forwarded message. + * B identifies its successor: **Node C**. + * B synchronizes with C. Since C(v3) is newer than B(v2), B's data is updated. + * **State:** A(v2), B(v3), C(v3). + * B forwards the gossip message to C. + 4. **Step 3 (at Node C):** + * Node C receives the forwarded message. + * C identifies its successor: **Node A**. + * C synchronizes with A. Since C(v3) is newer than A(v2), A's data is updated. + * **State:** A(v3), B(v3), C(v3). + * C forwards the gossip message back to A (or recognizes the process is complete). + 5. **Completion:** The maximum propagation count, `(3) * 2 - 3 = 3`, is reached. The last node in the chain (Node C) sends the final result back to the origin node (Node B). All nodes have now converged to the latest version (v3). + +### Tracing + +Tracing becomes critically important in a gossip-based protocol, as issues can arise at any stage of the communication and synchronization process. +New trace spans are created in the following scenarios: + +1. **Initiating Node**: The initiating node records the full trace of the entire request, capturing all steps from message dispatch to the final result collection. +2. **Receiving Sync Requests**: When a node receives a sync request and begins communication with another peer node, it creates a new trace span for that interaction to track the request lifecycle independently. +3. **Business Logic Execution**: During the actual business processing (e.g., data comparison, update, and transmission), custom trace data and metadata can be attached to the active trace context to provide deeper insights into logic-specific behaviors. + +After each synchronization cycle, the receiving node will send its trace data back to the initiating node, allowing the initiator to aggregate and correlate all spans for end-to-end visibility and debugging. + +## Property Repair + +Based on the Merkel tree and Gossip concept, the system can proceed with the Property Repair process. +This process is scheduled to run on each data node daily at 1:00 AM(it's configurable as `property-background-repair-cron` flag), and follows these steps: +1. **Select a Group**: The node retrieves a list of Property groups where the number of **replicas is greater than or equal to 2**, and randomly selects one group for repair. +2. **Query Node List**: Then determines the list of nodes that hold replicas for the selected group and sends the gossip propagation message to those nodes to synchronize the Property data for that group. +3. **Wait for the Result**: The initiating node waits for the final result of the synchronization process before proceeding. + +### Property Synchronize between Two Nodes + +When two nodes engage in Property data synchronization, they follow a specific protocol to ensure data consistency. +Let’s refer to the current node as A and the target node as B. The process is as follows: + +1. **(A)Establish Connection**: Node A initiates a **bidirectional streaming connection** with node B to enable real-time, two-way data transfer. +2. **(A)Iterate Over Shards**: Node A retrieves the list of all Property-related shards for the selected group and processes them one by one. +3. **(A)Send Merkle Tree Summary**: For each shard, node A reads its Merkle Tree and sends a summary (including root SHA and slots SHA) to node B. +This allows B to quickly identify which slots may contain differences. +4. **(B)Verify Merkle Tree Summary and Respond**: Node B compares the received summary against its own Merkle Tree for the same shard and group: + * If the root SHA matches, node B returns an empty slot list, indicating no differences. + * If the root SHA differ, node B checks the slot SHA, identifies mismatched slots, and sends back all relevant leaf node details, including the **slot index**, **entity**, and **SHA value**. +5. **(A)Compare Leaf Data**: Node A processes the received leaf data and takes the following actions: + * For missing entities (present on B but not on A), A requests the full Property data from B. + * For entities present on A, but not on B, A sends the full Property data to B. + * For SHA mismatches, A sends its full Property data to B for validation. +6. **(B)Validate Actual Data**: Node B handles the data as follows: + * For missing entities, B returns the latest version of the data. + * For entities present on A, but not on B, B updates its local copy with the data from A. + * For SHA mismatches, B uses "last-write-win" strategy. It compares the version numbers. If B’s version is newer, it returns the Property data to A. If A’s version is newer, B updates its local copy and does not return any data. If the versions are the same, it selects the data from the smaller index of the node list; in this case, it would be from A.**** Review Comment: ```suggestion * For SHA mismatches, B uses the "last-write-win" strategy. It compares the version numbers. If B’s version is newer, it returns the Property data to A. If A’s version is newer, B updates its local copy and does not return any data. If the versions are the same, it selects the data from the smaller index of the node list; in this case, it would be from A. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org