This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 5604682e0 Add storage-node post-trace pipeline: design, pipeline.v1 
proto, Go SDK (#1189)
5604682e0 is described below

commit 5604682e06cb20b0032898947ec9ae78eb514796
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Jun 24 09:53:03 2026 +0800

    Add storage-node post-trace pipeline: design, pipeline.v1 proto, Go SDK 
(#1189)
---
 .../banyandb/pipeline/v1/trace_pipeline.proto      | 300 ++++++++++
 docs/api-reference.md                              | 387 +++++++++++++
 docs/design/post-trace-pipeline.md                 | 608 +++++++++++++++++++++
 .../sdk/_example/segment-tail-sampler/README.md    |  67 +++
 .../sdk/_example/segment-tail-sampler/main.go      | 321 +++++++++++
 pkg/pipeline/sdk/decode.go                         | 129 +++++
 pkg/pipeline/sdk/sdk.go                            | 207 +++++++
 pkg/pipeline/sdk/sdk_test.go                       | 164 ++++++
 8 files changed, 2183 insertions(+)

diff --git a/api/proto/banyandb/pipeline/v1/trace_pipeline.proto 
b/api/proto/banyandb/pipeline/v1/trace_pipeline.proto
new file mode 100644
index 000000000..aae2159a3
--- /dev/null
+++ b/api/proto/banyandb/pipeline/v1/trace_pipeline.proto
@@ -0,0 +1,300 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+package banyandb.pipeline.v1;
+
+import "banyandb/common/v1/common.proto";
+import "google/api/annotations.proto";
+import "google/protobuf/duration.proto";
+import "google/protobuf/struct.proto";
+import "protoc-gen-openapiv2/options/annotations.proto";
+import "validate/validate.proto";
+
+option go_package = 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/pipeline/v1";
+option java_package = "org.apache.skywalking.banyandb.pipeline.v1";
+option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = 
{base_path: "/api"};
+
+// PipelineEvent identifies a pipeline-wide event that can be independently
+// enabled. Per-stage retention (StageRule.plugins) fires implicitly at the
+// stage's migration-out boundary and is not toggleable via this enum.
+enum PipelineEvent {
+  PIPELINE_EVENT_UNSPECIFIED = 0;
+  // In-merge filter during Hot-phase LSM compaction merges (Warm/Cold
+  // compactions stay lossless). Per-trace drops are gated by `merge_grace` so
+  // partial traces are not destroyed prematurely. Cheap, runs often; verdicts
+  // wait for trace maturity (see §7.1).
+  PIPELINE_EVENT_MERGE = 1;
+  // Tail-sampling gate at Hot-phase segment finalization, after the segment
+  // has settled (event-time watermark past `segment.End + finalize_grace`).
+  // Heavy but authoritative; sees the complete trace (see §7.3).
+  PIPELINE_EVENT_FINALIZE = 2;
+}
+
+// TracePipelineConfig is the root configuration for a storage-node trace 
pipeline.
+// It reuses existing catalog identifiers (group via metadata, stage names, 
schema
+// names) for targeting instead of declaring a parallel metadata model.
+//
+// The pipeline has up to three filter points:
+//   1. PIPELINE_EVENT_MERGE — in-merge filter during Hot-phase LSM compaction 
(default).
+//   2. PIPELINE_EVENT_FINALIZE — tail-sampling gate at Hot-phase finalization.
+//   3. Per-stage retention via StageRule.plugins, applied at the stage's
+//      migration-out boundary (when the segment migrates to the next stage).
+//      Always implicit when any StageRule carries a plugin chain.
+// Events 1 and 2 are toggleable via `enabled_events`. The gating policy those
+// events evaluate is the `plugins` chain (a sequential pipe of native Go
+// plugins; today each link is a sampler).
+message TracePipelineConfig {
+  // Identity and revision tracking; metadata.group is the Group this pipeline
+  // lives in and applies to, consistent with every other schema resource.
+  // Required: every config needs a name/group for registry handling.
+  common.v1.Metadata metadata = 1 [(validate.rules).message = {required: 
true}];
+  // Active status of the pipeline.
+  bool enabled = 2;
+  // Per-stage retention rules: which lifecycle stages this pipeline acts on,
+  // with the retention plugin chain for each. Each rule fires at its stage's
+  // migration-out boundary regardless of `enabled_events`. Empty means the
+  // only filters are the `enabled_events` events (no per-stage drop).
+  repeated StageRule stages = 3;
+  // Explicit schema names to target within the Group (exact match on 
Metadata.name).
+  // Each entry must be non-empty; cross-element uniqueness is enforced 
server-side.
+  repeated string schema_names = 4 
[(validate.rules).repeated.items.string.min_len = 1];
+  // RE2 regular expression matched against schema names. A schema is targeted 
if it
+  // is listed in schema_names OR matches this pattern. When both are empty, 
every
+  // schema in the Group is targeted.
+  string schema_name_regex = 5;
+  // Gating policy: an ordered chain of plugins (a sequential pipe) evaluated 
by
+  // any enabled event (PIPELINE_EVENT_MERGE and/or PIPELINE_EVENT_FINALIZE).
+  // Links run in declared order, each processing the traces the previous link
+  // kept; a link that fails is bypassed (fail-open). Today every link is a
+  // sampler, so the chain is the conjunction of the links' keep/drop verdicts.
+  // Empty means the only retention is the per-stage StageRule plugin chain(s)
+  // at migration-out.
+  repeated Plugin plugins = 6;
+  // Pipeline-wide events to run. Empty defaults to [PIPELINE_EVENT_MERGE] —
+  // the in-merge filter is on, the finalization gate is off. To enable the
+  // finalization gate, include PIPELINE_EVENT_FINALIZE; to disable the merge
+  // filter, list only [PIPELINE_EVENT_FINALIZE]; the explicit empty default
+  // value is also acceptable to mean "merge only". Each element must be a
+  // defined, non-UNSPECIFIED value; duplicates are normalized to a set
+  // server-side.
+  repeated PipelineEvent enabled_events = 7 
[(validate.rules).repeated.items.enum = {
+    defined_only: true
+    not_in: [0]
+  }];
+  // Per-trace maturity window for the in-merge filter (§7.1). A trace is
+  // eligible for dropping during an LSM compaction merge only once its latest
+  // span timestamp is older than `now - merge_grace`; younger traces pass
+  // through the merge unchanged. Bounds the expected intra-trace span arrival
+  // spread (typically seconds). Used iff `enabled_events` contains
+  // PIPELINE_EVENT_MERGE. Strictly positive if set; engine default 30s if 
unset.
+  google.protobuf.Duration merge_grace = 8 [(validate.rules).duration = {
+    gt: {seconds: 0}
+  }];
+  // Per-segment settling window for the scheduled finalization pass (§7.3). A
+  // segment is treated as settled, and the authoritative final filter runs,
+  // once the event-time watermark exceeds `segment.End + finalize_grace`.
+  // Bounds segment-wide late arrival (typically minutes). Used iff
+  // `enabled_events` contains PIPELINE_EVENT_FINALIZE. Strictly positive if
+  // set; engine default 5m if unset.
+  google.protobuf.Duration finalize_grace = 9 [(validate.rules).duration = {
+    gt: {seconds: 0}
+  }];
+}
+
+// StageRule binds the pipeline to one lifecycle stage of the targeted Group
+// and declares that stage's retention plugin chain. The rule fires at the
+// stage's migration-out boundary (i.e. when a segment migrates from this stage
+// to the next stage); routine compaction is governed by PIPELINE_EVENT_MERGE 
on
+// TracePipelineConfig, not by StageRule.
+//
+// Per-stage retention uses the SAME chain mechanism as gating: each stage's
+// `plugins` chain owns the keep/drop verdict for traces leaving that stage. A
+// StageRule with an empty `plugins` chain has no filtering effect — every 
trace
+// at this stage migrates unchanged. The "rising bar" across stages (Hot keeps
+// more, Cold keeps less) is expressed by each stage's plugin config (see §4.2
+// of the design doc), not by a fixed predicate vocabulary.
+message StageRule {
+  // Stage name from the Group's ResourceOpts.stages (e.g. "hot", "warm", 
"cold").
+  // Must be non-empty; an empty stage name cannot match any lifecycle stage.
+  string stage = 1 [(validate.rules).string.min_len = 1];
+  // Per-stage retention chain: the ordered plugins (a sequential pipe) that
+  // decide keep/drop for traces leaving this stage at its migration-out
+  // boundary. Empty means no per-stage drop (every trace migrates). Same
+  // contract and composition as the gating chain (see Plugin).
+  repeated Plugin plugins = 2;
+}
+
+// Plugin is one link in a pipeline's processing chain — a generic, kind-tagged
+// envelope around a user-supplied native Go plugin. The set oneof arm selects
+// the kind; today the only kind is a sampler (SamplerPlugin). Adding a new 
kind
+// is purely additive: define its payload message and add a new arm to `kind`,
+// leaving existing arms and field numbers untouched.
+//
+// A chain of Plugin (TracePipelineConfig.plugins, StageRule.plugins) is a
+// sequential pipe: links run in declared order, each link processes the traces
+// the previous link kept, and a link that fails is bypassed (its input passes
+// through unchanged). The plugin↔engine contract for each kind (the vectorized
+// batch, the projection handshake, the verdict) lives in the pinned Go SDK
+// module `pkg/pipeline/sdk`, not in this proto.
+message Plugin {
+  // Operator-facing identity for this link, used in diagnostics and admission
+  // errors. Must be non-empty; cross-element uniqueness within a chain is
+  // enforced server-side.
+  string name = 1 [(validate.rules).string.min_len = 1];
+  // Plugin kind. Exactly one arm must be set; the set arm is the kind
+  // discriminator. An unset kind is rejected at admission (protoc-gen-validate
+  // cannot mark a oneof required).
+  oneof kind {
+    // Sampler kind: owns a keep/drop verdict over a vectorized batch of 
traces.
+    SamplerPlugin sampler = 2;
+  }
+}
+
+// SamplerPlugin configures a user-supplied native Go plugin (a .so loaded
+// in-process via the Go `plugin` package) that owns a keep/drop verdict over a
+// vectorized batch of traces. It is the sampler kind of Plugin 
(Plugin.sampler)
+// and is the keep/drop link wherever a chain runs: in a TracePipelineConfig
+// chain it gates at the enabled PipelineEvent(s) (merge / finalization); in a
+// StageRule chain it is per-stage retention at a stage's migration-out 
boundary.
+//
+// The full plugin↔engine contract (the vectorized batch type, the projection
+// handshake, and the verdict shape) lives in the pinned Go SDK module
+// `pkg/pipeline/sdk`, not in this proto; this message only locates and admits
+// the plugin. Three properties of that contract:
+//   - Strong compatibility: the boundary exchanges only stdlib/primitive types
+//     defined in the pinned SDK, so no third-party struct version is pinned
+//     across the .so boundary. The plugin must be built with the SAME Go
+//     toolchain, build tags, and flags (-trimpath, CGO) and the SAME pinned 
SDK
+//     as the running data node; `abi_version` is checked at load.
+//   - Vectorized input: the plugin's Decide is called once per columnar batch 
of
+//     traces, not once per trace.
+//   - Projection: the plugin declares the columns it needs (SDK Project →
+//     Projection{Tags, SpanIDs, Spans}); the engine materializes only those 
tag
+//     columns and, only when requested, the spans stream — like a query
+//     projection.
+//
+// Operational constraints (Go `plugin`): Linux/macOS only; plugins cannot be
+// unloaded, so changing one requires a node restart (no hot-reload); a plugin
+// panic is contained with recover() and fails open (the whole batch is
+// retained). Loading arbitrary code is operator-only and gated behind a server
+// flag plus a trusted plugin directory.
+message SamplerPlugin {
+  // Plugin .so filename, resolved within the data node's trusted plugin
+  // directory. The engine rejects any path that escapes that directory.
+  string path = 1 [(validate.rules).string.min_len = 1];
+  // Constructor symbol the engine looks up; defaults to "NewSampler" if empty.
+  string symbol = 2;
+  // ABI version the plugin was built against. The engine refuses to load the
+  // plugin unless this equals its own compiled sdk.ABIVersion.
+  uint32 abi_version = 3 [(validate.rules).uint32 = {gte: 1}];
+  // Plugin-defined configuration, set directly in the pipeline config as a
+  // structured object. The engine does not interpret its keys: it serializes
+  // the Struct to canonical JSON and hands the bytes to the plugin's
+  // constructor (SDK NewSampler([]byte)), which unmarshals them into the
+  // plugin's own typed config and validates them — a malformed config fails
+  // the load. Optional: a plugin that needs no configuration leaves it unset.
+  google.protobuf.Struct config = 4;
+}
+
+message TracePipelineRegistryServiceCreateRequest {
+  TracePipelineConfig trace_pipeline_config = 1 [(validate.rules).message = 
{required: true}];
+}
+
+message TracePipelineRegistryServiceCreateResponse {
+  int64 mod_revision = 1;
+}
+
+message TracePipelineRegistryServiceUpdateRequest {
+  TracePipelineConfig trace_pipeline_config = 1 [(validate.rules).message = 
{required: true}];
+}
+
+message TracePipelineRegistryServiceUpdateResponse {
+  int64 mod_revision = 1;
+}
+
+message TracePipelineRegistryServiceDeleteRequest {
+  common.v1.Metadata metadata = 1 [(validate.rules).message = {required: 
true}];
+}
+
+message TracePipelineRegistryServiceDeleteResponse {
+  bool deleted = 1;
+  // delete_time is the server-assigned tombstone timestamp in unix nanos.
+  int64 delete_time = 2;
+  // mod_revision is the etcd revision of the tombstone; zero if the server 
did not record one.
+  int64 mod_revision = 3;
+}
+
+message TracePipelineRegistryServiceGetRequest {
+  common.v1.Metadata metadata = 1 [(validate.rules).message = {required: 
true}];
+}
+
+message TracePipelineRegistryServiceGetResponse {
+  TracePipelineConfig trace_pipeline_config = 1;
+}
+
+message TracePipelineRegistryServiceExistRequest {
+  common.v1.Metadata metadata = 1 [(validate.rules).message = {required: 
true}];
+}
+
+message TracePipelineRegistryServiceExistResponse {
+  bool has_group = 1;
+  bool has_trace_pipeline_config = 2;
+}
+
+message TracePipelineRegistryServiceListRequest {
+  string group = 1 [(validate.rules).string.min_len = 1];
+}
+
+message TracePipelineRegistryServiceListResponse {
+  repeated TracePipelineConfig trace_pipeline_config = 1;
+}
+
+// TracePipelineRegistryService manages TracePipelineConfig resources, 
mirroring
+// the registry services of every other schema resource. Create/Update run the
+// admission and conflict checks of §2.3/§2.4 of the design.
+service TracePipelineRegistryService {
+  rpc Create(TracePipelineRegistryServiceCreateRequest) returns 
(TracePipelineRegistryServiceCreateResponse) {
+    option (google.api.http) = {
+      post: "/v1/trace-pipeline/schema"
+      body: "*"
+    };
+  }
+
+  rpc Update(TracePipelineRegistryServiceUpdateRequest) returns 
(TracePipelineRegistryServiceUpdateResponse) {
+    option (google.api.http) = {
+      put: 
"/v1/trace-pipeline/schema/{trace_pipeline_config.metadata.group}/{trace_pipeline_config.metadata.name}"
+      body: "*"
+    };
+  }
+
+  rpc Delete(TracePipelineRegistryServiceDeleteRequest) returns 
(TracePipelineRegistryServiceDeleteResponse) {
+    option (google.api.http) = {delete: 
"/v1/trace-pipeline/schema/{metadata.group}/{metadata.name}"};
+  }
+
+  rpc Get(TracePipelineRegistryServiceGetRequest) returns 
(TracePipelineRegistryServiceGetResponse) {
+    option (google.api.http) = {get: 
"/v1/trace-pipeline/schema/{metadata.group}/{metadata.name}"};
+  }
+
+  rpc List(TracePipelineRegistryServiceListRequest) returns 
(TracePipelineRegistryServiceListResponse) {
+    option (google.api.http) = {get: 
"/v1/trace-pipeline/schema/lists/{group}"};
+  }
+
+  // Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch 
Get instead.
+  rpc Exist(TracePipelineRegistryServiceExistRequest) returns 
(TracePipelineRegistryServiceExistResponse);
+}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index ff5fd5760..4026f3260 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -378,6 +378,28 @@
   
     - [MeasureService](#banyandb-measure-v1-MeasureService)
   
+- 
[banyandb/pipeline/v1/trace_pipeline.proto](#banyandb_pipeline_v1_trace_pipeline-proto)
+    - [Plugin](#banyandb-pipeline-v1-Plugin)
+    - [SamplerPlugin](#banyandb-pipeline-v1-SamplerPlugin)
+    - [StageRule](#banyandb-pipeline-v1-StageRule)
+    - [TracePipelineConfig](#banyandb-pipeline-v1-TracePipelineConfig)
+    - 
[TracePipelineRegistryServiceCreateRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceCreateRequest)
+    - 
[TracePipelineRegistryServiceCreateResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceCreateResponse)
+    - 
[TracePipelineRegistryServiceDeleteRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteRequest)
+    - 
[TracePipelineRegistryServiceDeleteResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteResponse)
+    - 
[TracePipelineRegistryServiceExistRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceExistRequest)
+    - 
[TracePipelineRegistryServiceExistResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceExistResponse)
+    - 
[TracePipelineRegistryServiceGetRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceGetRequest)
+    - 
[TracePipelineRegistryServiceGetResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceGetResponse)
+    - 
[TracePipelineRegistryServiceListRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceListRequest)
+    - 
[TracePipelineRegistryServiceListResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceListResponse)
+    - 
[TracePipelineRegistryServiceUpdateRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateRequest)
+    - 
[TracePipelineRegistryServiceUpdateResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateResponse)
+  
+    - [PipelineEvent](#banyandb-pipeline-v1-PipelineEvent)
+  
+    - 
[TracePipelineRegistryService](#banyandb-pipeline-v1-TracePipelineRegistryService)
+  
 - [banyandb/property/v1/gossip.proto](#banyandb_property_v1_gossip-proto)
     - [PropagationContext](#banyandb-property-v1-PropagationContext)
     - [PropagationRequest](#banyandb-property-v1-PropagationRequest)
@@ -5899,6 +5921,371 @@ WriteResponse is the response contract for write
 
 
 
+<a name="banyandb_pipeline_v1_trace_pipeline-proto"></a>
+<p align="right"><a href="#top">Top</a></p>
+
+## banyandb/pipeline/v1/trace_pipeline.proto
+
+
+
+<a name="banyandb-pipeline-v1-Plugin"></a>
+
+### Plugin
+Plugin is one link in a pipeline&#39;s processing chain — a generic, 
kind-tagged
+envelope around a user-supplied native Go plugin. The set oneof arm selects
+the kind; today the only kind is a sampler (SamplerPlugin). Adding a new kind
+is purely additive: define its payload message and add a new arm to `kind`,
+leaving existing arms and field numbers untouched.
+
+A chain of Plugin (TracePipelineConfig.plugins, StageRule.plugins) is a
+sequential pipe: links run in declared order, each link processes the traces
+the previous link kept, and a link that fails is bypassed (its input passes
+through unchanged). The plugin↔engine contract for each kind (the vectorized
+batch, the projection handshake, the verdict) lives in the pinned Go SDK
+module `pkg/pipeline/sdk`, not in this proto.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| name | [string](#string) |  | Operator-facing identity for this link, used 
in diagnostics and admission errors. Must be non-empty; cross-element 
uniqueness within a chain is enforced server-side. |
+| sampler | [SamplerPlugin](#banyandb-pipeline-v1-SamplerPlugin) |  | Sampler 
kind: owns a keep/drop verdict over a vectorized batch of traces. |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-SamplerPlugin"></a>
+
+### SamplerPlugin
+SamplerPlugin configures a user-supplied native Go plugin (a .so loaded
+in-process via the Go `plugin` package) that owns a keep/drop verdict over a
+vectorized batch of traces. It is the sampler kind of Plugin (Plugin.sampler)
+and is the keep/drop link wherever a chain runs: in a TracePipelineConfig
+chain it gates at the enabled PipelineEvent(s) (merge / finalization); in a
+StageRule chain it is per-stage retention at a stage&#39;s migration-out 
boundary.
+
+The full plugin↔engine contract (the vectorized batch type, the projection
+handshake, and the verdict shape) lives in the pinned Go SDK module
+`pkg/pipeline/sdk`, not in this proto; this message only locates and admits
+the plugin. Three properties of that contract:
+  - Strong compatibility: the boundary exchanges only stdlib/primitive types
+    defined in the pinned SDK, so no third-party struct version is pinned
+    across the .so boundary. The plugin must be built with the SAME Go
+    toolchain, build tags, and flags (-trimpath, CGO) and the SAME pinned SDK
+    as the running data node; `abi_version` is checked at load.
+  - Vectorized input: the plugin&#39;s Decide is called once per columnar 
batch of
+    traces, not once per trace.
+  - Projection: the plugin declares the columns it needs (SDK Project →
+    Projection{Tags, SpanIDs, Spans}); the engine materializes only those tag
+    columns and, only when requested, the spans stream — like a query
+    projection.
+
+Operational constraints (Go `plugin`): Linux/macOS only; plugins cannot be
+unloaded, so changing one requires a node restart (no hot-reload); a plugin
+panic is contained with recover() and fails open (the whole batch is
+retained). Loading arbitrary code is operator-only and gated behind a server
+flag plus a trusted plugin directory.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| path | [string](#string) |  | Plugin .so filename, resolved within the data 
node&#39;s trusted plugin directory. The engine rejects any path that escapes 
that directory. |
+| symbol | [string](#string) |  | Constructor symbol the engine looks up; 
defaults to &#34;NewSampler&#34; if empty. |
+| abi_version | [uint32](#uint32) |  | ABI version the plugin was built 
against. The engine refuses to load the plugin unless this equals its own 
compiled sdk.ABIVersion. |
+| config | [google.protobuf.Struct](#google-protobuf-Struct) |  | 
Plugin-defined configuration, set directly in the pipeline config as a 
structured object. The engine does not interpret its keys: it serializes the 
Struct to canonical JSON and hands the bytes to the plugin&#39;s constructor 
(SDK NewSampler([]byte)), which unmarshals them into the plugin&#39;s own typed 
config and validates them — a malformed config fails the load. Optional: a 
plugin that needs no configuration leaves it unset. |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-StageRule"></a>
+
+### StageRule
+StageRule binds the pipeline to one lifecycle stage of the targeted Group
+and declares that stage&#39;s retention plugin chain. The rule fires at the
+stage&#39;s migration-out boundary (i.e. when a segment migrates from this 
stage
+to the next stage); routine compaction is governed by PIPELINE_EVENT_MERGE on
+TracePipelineConfig, not by StageRule.
+
+Per-stage retention uses the SAME chain mechanism as gating: each stage&#39;s
+`plugins` chain owns the keep/drop verdict for traces leaving that stage. A
+StageRule with an empty `plugins` chain has no filtering effect — every trace
+at this stage migrates unchanged. The &#34;rising bar&#34; across stages (Hot 
keeps
+more, Cold keeps less) is expressed by each stage&#39;s plugin config (see §4.2
+of the design doc), not by a fixed predicate vocabulary.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| stage | [string](#string) |  | Stage name from the Group&#39;s 
ResourceOpts.stages (e.g. &#34;hot&#34;, &#34;warm&#34;, &#34;cold&#34;). Must 
be non-empty; an empty stage name cannot match any lifecycle stage. |
+| plugins | [Plugin](#banyandb-pipeline-v1-Plugin) | repeated | Per-stage 
retention chain: the ordered plugins (a sequential pipe) that decide keep/drop 
for traces leaving this stage at its migration-out boundary. Empty means no 
per-stage drop (every trace migrates). Same contract and composition as the 
gating chain (see Plugin). |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineConfig"></a>
+
+### TracePipelineConfig
+TracePipelineConfig is the root configuration for a storage-node trace 
pipeline.
+It reuses existing catalog identifiers (group via metadata, stage names, schema
+names) for targeting instead of declaring a parallel metadata model.
+
+The pipeline has up to three filter points:
+  1. PIPELINE_EVENT_MERGE — in-merge filter during Hot-phase LSM compaction 
(default).
+  2. PIPELINE_EVENT_FINALIZE — tail-sampling gate at Hot-phase finalization.
+  3. Per-stage retention via StageRule.plugins, applied at the stage&#39;s
+     migration-out boundary (when the segment migrates to the next stage).
+     Always implicit when any StageRule carries a plugin chain.
+Events 1 and 2 are toggleable via `enabled_events`. The gating policy those
+events evaluate is the `plugins` chain (a sequential pipe of native Go
+plugins; today each link is a sampler).
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  | 
Identity and revision tracking; metadata.group is the Group this pipeline lives 
in and applies to, consistent with every other schema resource. Required: every 
config needs a name/group for registry handling. |
+| enabled | [bool](#bool) |  | Active status of the pipeline. |
+| stages | [StageRule](#banyandb-pipeline-v1-StageRule) | repeated | Per-stage 
retention rules: which lifecycle stages this pipeline acts on, with the 
retention plugin chain for each. Each rule fires at its stage&#39;s 
migration-out boundary regardless of `enabled_events`. Empty means the only 
filters are the `enabled_events` events (no per-stage drop). |
+| schema_names | [string](#string) | repeated | Explicit schema names to 
target within the Group (exact match on Metadata.name). Each entry must be 
non-empty; cross-element uniqueness is enforced server-side. |
+| schema_name_regex | [string](#string) |  | RE2 regular expression matched 
against schema names. A schema is targeted if it is listed in schema_names OR 
matches this pattern. When both are empty, every schema in the Group is 
targeted. |
+| plugins | [Plugin](#banyandb-pipeline-v1-Plugin) | repeated | Gating policy: 
an ordered chain of plugins (a sequential pipe) evaluated by any enabled event 
(PIPELINE_EVENT_MERGE and/or PIPELINE_EVENT_FINALIZE). Links run in declared 
order, each processing the traces the previous link kept; a link that fails is 
bypassed (fail-open). Today every link is a sampler, so the chain is the 
conjunction of the links&#39; keep/drop verdicts. Empty means the only 
retention is the per-stage StageRu [...]
+| enabled_events | [PipelineEvent](#banyandb-pipeline-v1-PipelineEvent) | 
repeated | Pipeline-wide events to run. Empty defaults to 
[PIPELINE_EVENT_MERGE] — the in-merge filter is on, the finalization gate is 
off. To enable the finalization gate, include PIPELINE_EVENT_FINALIZE; to 
disable the merge filter, list only [PIPELINE_EVENT_FINALIZE]; the explicit 
empty default value is also acceptable to mean &#34;merge only&#34;. Each 
element must be a defined, non-UNSPECIFIED value; duplicate [...]
+| merge_grace | [google.protobuf.Duration](#google-protobuf-Duration) |  | 
Per-trace maturity window for the in-merge filter (§7.1). A trace is eligible 
for dropping during an LSM compaction merge only once its latest span timestamp 
is older than `now - merge_grace`; younger traces pass through the merge 
unchanged. Bounds the expected intra-trace span arrival spread (typically 
seconds). Used iff `enabled_events` contains PIPELINE_EVENT_MERGE. Strictly 
positive if set; engine default 30s  [...]
+| finalize_grace | [google.protobuf.Duration](#google-protobuf-Duration) |  | 
Per-segment settling window for the scheduled finalization pass (§7.3). A 
segment is treated as settled, and the authoritative final filter runs, once 
the event-time watermark exceeds `segment.End &#43; finalize_grace`. Bounds 
segment-wide late arrival (typically minutes). Used iff `enabled_events` 
contains PIPELINE_EVENT_FINALIZE. Strictly positive if set; engine default 5m 
if unset. |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceCreateRequest"></a>
+
+### TracePipelineRegistryServiceCreateRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| trace_pipeline_config | 
[TracePipelineConfig](#banyandb-pipeline-v1-TracePipelineConfig) |  |  |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceCreateResponse"></a>
+
+### TracePipelineRegistryServiceCreateResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) |  |  |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteRequest"></a>
+
+### TracePipelineRegistryServiceDeleteRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteResponse"></a>
+
+### TracePipelineRegistryServiceDeleteResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| deleted | [bool](#bool) |  |  |
+| delete_time | [int64](#int64) |  | delete_time is the server-assigned 
tombstone timestamp in unix nanos. |
+| mod_revision | [int64](#int64) |  | mod_revision is the etcd revision of the 
tombstone; zero if the server did not record one. |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceExistRequest"></a>
+
+### TracePipelineRegistryServiceExistRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceExistResponse"></a>
+
+### TracePipelineRegistryServiceExistResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| has_group | [bool](#bool) |  |  |
+| has_trace_pipeline_config | [bool](#bool) |  |  |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceGetRequest"></a>
+
+### TracePipelineRegistryServiceGetRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceGetResponse"></a>
+
+### TracePipelineRegistryServiceGetResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| trace_pipeline_config | 
[TracePipelineConfig](#banyandb-pipeline-v1-TracePipelineConfig) |  |  |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceListRequest"></a>
+
+### TracePipelineRegistryServiceListRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| group | [string](#string) |  |  |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceListResponse"></a>
+
+### TracePipelineRegistryServiceListResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| trace_pipeline_config | 
[TracePipelineConfig](#banyandb-pipeline-v1-TracePipelineConfig) | repeated |  |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateRequest"></a>
+
+### TracePipelineRegistryServiceUpdateRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| trace_pipeline_config | 
[TracePipelineConfig](#banyandb-pipeline-v1-TracePipelineConfig) |  |  |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateResponse"></a>
+
+### TracePipelineRegistryServiceUpdateResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) |  |  |
+
+
+
+
+
+ 
+
+
+<a name="banyandb-pipeline-v1-PipelineEvent"></a>
+
+### PipelineEvent
+PipelineEvent identifies a pipeline-wide event that can be independently
+enabled. Per-stage retention (StageRule.plugins) fires implicitly at the
+stage&#39;s migration-out boundary and is not toggleable via this enum.
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| PIPELINE_EVENT_UNSPECIFIED | 0 |  |
+| PIPELINE_EVENT_MERGE | 1 | In-merge filter during Hot-phase LSM compaction 
merges (Warm/Cold compactions stay lossless). Per-trace drops are gated by 
`merge_grace` so partial traces are not destroyed prematurely. Cheap, runs 
often; verdicts wait for trace maturity (see §7.1). |
+| PIPELINE_EVENT_FINALIZE | 2 | Tail-sampling gate at Hot-phase segment 
finalization, after the segment has settled (event-time watermark past 
`segment.End &#43; finalize_grace`). Heavy but authoritative; sees the complete 
trace (see §7.3). |
+
+
+ 
+
+ 
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryService"></a>
+
+### TracePipelineRegistryService
+TracePipelineRegistryService manages TracePipelineConfig resources, mirroring
+the registry services of every other schema resource. Create/Update run the
+admission and conflict checks of §2.3/§2.4 of the design.
+
+| Method Name | Request Type | Response Type | Description |
+| ----------- | ------------ | ------------- | ------------|
+| Create | 
[TracePipelineRegistryServiceCreateRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceCreateRequest)
 | 
[TracePipelineRegistryServiceCreateResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceCreateResponse)
 |  |
+| Update | 
[TracePipelineRegistryServiceUpdateRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateRequest)
 | 
[TracePipelineRegistryServiceUpdateResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateResponse)
 |  |
+| Delete | 
[TracePipelineRegistryServiceDeleteRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteRequest)
 | 
[TracePipelineRegistryServiceDeleteResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteResponse)
 |  |
+| Get | 
[TracePipelineRegistryServiceGetRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceGetRequest)
 | 
[TracePipelineRegistryServiceGetResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceGetResponse)
 |  |
+| List | 
[TracePipelineRegistryServiceListRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceListRequest)
 | 
[TracePipelineRegistryServiceListResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceListResponse)
 |  |
+| Exist | 
[TracePipelineRegistryServiceExistRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceExistRequest)
 | 
[TracePipelineRegistryServiceExistResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceExistResponse)
 | Exist doesn&#39;t expose an HTTP endpoint. Please use HEAD method to touch 
Get instead. |
+
+ 
+
+
+
 <a name="banyandb_property_v1_gossip-proto"></a>
 <p align="right"><a href="#top">Top</a></p>
 
diff --git a/docs/design/post-trace-pipeline.md 
b/docs/design/post-trace-pipeline.md
new file mode 100644
index 000000000..663510622
--- /dev/null
+++ b/docs/design/post-trace-pipeline.md
@@ -0,0 +1,608 @@
+# BanyanDB Storage-Node Post-Trace Pipeline & Storage Tier Mapping 
Specification
+
+This document presents the complete technical design for the **BanyanDB 
Storage-Node Post-Trace Pipeline** and its integration with physical hardware 
storage tiers and time-aging schedulers.
+
+Unlike traditional streaming-based telemetry engines that perform span 
assembly inside active, memory-heavy windowing pipelines, BanyanDB relies on a 
**native trace model**. Spans are stored, sorted, and indexed by `trace_id` 
directly within the storage engine layout on individual data nodes.
+
+By decoupling trace assembly from the active ingestion path, the post-trace 
pipeline executes asynchronous tail-sampling, per-stage retention filtering, 
and data reduction natively during storage lifecycle events on the data nodes. 
This design is grounded in recent academic advancements in post-hoc retroactive 
tracing and storage-level file merge analysis.
+
+## Architectural Concept: Decoupled Gating and Per-Stage Retention
+
+To maximize storage efficiency and compute performance on the BanyanDB data 
nodes, trace evaluation is separated into two logical phases, which 
subsequently feed into an automated **time-aging system** for partition-level 
migration:
+
+```mermaid
+flowchart TD
+    GA["Grouped Trace Assembly"]
+    GA --> G["1. Gating (plugins chain, at merge/finalize)<br/>Goal: decide 
whether a trace is retained at all<br/>Criteria: native Go plugin-chain verdict 
(operator-defined)"]
+    G -->|"Retain"| R["2. Per-Stage Retention (StageRule.plugins)<br/>Goal: at 
each stage's migration-out event, keep/drop per the stage's plugin-chain 
verdict<br/>Criteria: per-stage native Go plugin-chain verdict 
(operator-defined)"]
+    G -->|"Drop / Purge"| D["Discard Block<br/>Reclaim Space"]
+    R --> T["3. Time-Aging System (Stage-Stepped Migration Engine)<br/>Goal: 
each stage's StageRule decides which traces migrate to the next 
stage;<br/>routine Hot-phase compaction may also drop traces when 
PIPELINE_EVENT_MERGE is on (default)<br/>Stage Migration: Hot → Warm → Cold 
(LifecycleStage order)<br/>RULE: medium is a node-group concern (may be 
heterogeneous); the per-stage plugin governs retention, not medium selection"]
+```
+
+### 1.1 Gating (the `plugins` chain)
+
+- **Operation Type**: An ordered chain of native Go plugins (`plugins`) 
delivers the keep/drop verdict. Each link is a `Plugin` (today the only kind is 
a sampler); the chain is a sequential pipe — links run in declared order, each 
evaluating the traces the previous link kept, so an all-sampler chain is the 
conjunction of the links' verdicts (§2.5). It is the sole gating mechanism, 
invoked at whichever of the toggleable events is enabled.
+
+- **Responsibility**: Gating runs **only in the Hot phase**, at whichever of 
the toggleable events is enabled — `PIPELINE_EVENT_MERGE` (in-merge on Hot 
compactions, §7.1; default-on) and/or `PIPELINE_EVENT_FINALIZE` (on settled Hot 
segments, §7.3); Warm and Cold compactions are byte-for-byte lossless. It 
determines whether a freshly-assembled trace block **survives** at all — 
passing into the stage lifecycle — or is **purged** to reclaim storage space.
+
+- **Compute Profile**: Operator-defined. The plugin receives a vectorized 
batch of traces and, via its `Project()` declaration, materializes only the 
columns it needs — the named tag columns and, only when requested, the heavy 
span bodies (§2.5). Verdicts are expected to be pure in their inputs (the 
`TraceBatch` plus the frozen `config`), so they are deterministic in `trace_id` 
and stable across re-evaluation once the trace is mature per `merge_grace` or 
settled per `finalize_grace`.
+
+- **Contract**: An ordered chain of native Go plugins (`.so`s loaded 
in-process via the Go `plugin` package); each link owns a keep/drop verdict and 
the chain composes them as a sequential pipe, invoked by the same enabled 
events at the same maturity/settling gates. Each link's contract — a vectorized 
columnar batch in, a boolean keep-mask out, with up-front column projection — 
is specified in §2.5.
+
+### 1.2 Per-Stage Retention (`StageRule.plugins`)
+
+- **Operation Type**: Per-stage keep/drop verdict by a native Go plugin chain. 
Each `StageRule` carries its own `plugins` chain; a trace it drops is omitted 
from the part written for the next stage at the stage's migration-out boundary.
+
+- **Responsibility**: Decides **which traces survive each stage** as data 
ages. Each `StageRule` fires once per segment lifetime, at the stage's 
**migration-out** boundary (when the segment migrates to the next stage). The 
"rising bar" effect — Hot keeps more, Cold keeps less — is expressed by 
tightening each stage plugin's config at successive stages. The pipeline does 
**not** choose the physical storage medium; that is a node-group / 
`LifecycleStage` placement concern (§4.1). Routine H [...]
+
+- **Compute Profile**: Operator-defined — the same vectorized contract as the 
gating plugin (§2.5): the stage plugin receives a batch of traces, projects 
only the columns it declares via `Project()`, and returns a boolean keep-mask. 
`MinTS`/`MaxTS` are free from block metadata; tag columns are decoded only when 
the plugin's projection requests them.
+
+## Protobuf Message Design
+
+The pipeline configuration is a single trace-typed message, 
`TracePipelineConfig`. Rather than a parallel abstract metadata layer, it 
reuses the existing catalog identifiers — Group (via `metadata`), lifecycle 
stage names, and schema names — and adds only the trace-specific gating (a 
native plugin chain) and per-stage retention rules. General stream parsing and 
metrics aggregation configurations are excluded to maintain a strict focus on 
trace-centric analytical operations.
+
+### 2.1 Targeting Model: Reuse Existing Catalog Identifiers
+
+This design deliberately does **not** introduce an abstract `Pipeline` 
resource or an `ExecutionTrigger` enum. Both would re-declare targeting that 
the storage model already owns and let the two drift. A trace pipeline is 
instead expressed entirely with identifiers that already exist:
+
+- **Group** — named by the config's own `metadata.group` 
(`common.v1.Metadata`). A `TracePipelineConfig` lives in, and applies to, that 
Group, exactly as every other schema resource does. The Group already fixes the 
`catalog`, so catalog is never repeated.
+- **Lifecycle stages** — a `StageRule` per targeted stage, each naming a stage 
from the Group's `ResourceOpts.stages` (e.g. `"hot"`, `"warm"`, `"cold"`) and 
carrying that stage's **retention `plugins` chain** (the same native-plugin 
mechanism as gating). Each rule fires at the stage's migration-out boundary. 
Stage names are the same vocabulary queries already accept 
(`trace/v1/query.proto`'s `stages`). All retention policy is configured here, 
per stage and per pipeline — there is no hard [...]
+- **Schema selector** — an explicit `schema_names` list (exact match on 
`common.v1.Metadata.name`) plus a `schema_name_regex` (RE2). A schema matches 
if it is listed OR matches the regex; both empty targets every schema in the 
Group.
+
+The former `ExecutionTrigger` (COMPACTION / MIGRATION / SCHEDULED) is replaced 
by three anchored events: the **in-merge filter at LSM compaction** (§7.1, 
toggleable via `PIPELINE_EVENT_MERGE` — on by default), the **plugin gating 
pass at finalization** (§7.3, toggleable via `PIPELINE_EVENT_FINALIZE`), and 
the **per-stage retention pass at migration-out** (§7.2, always-on when any 
`StageRule` carries a `plugins` chain). The first two run the gating chain; the 
third runs the per-stage `Sta [...]
+
+Multiple `TracePipelineConfig`s may coexist in a Group only when their 
effective coverage is disjoint; the schema registry enforces this with a 
per-tuple uniqueness rule (§2.3).
+
+### 2.2 Trace Pipeline Specification (`trace_pipeline.proto`)
+
+The full message definitions live in the proto source at 
`api/proto/banyandb/pipeline/v1/trace_pipeline.proto` (package 
`banyandb.pipeline.v1`); they are not duplicated here. `TracePipelineConfig` is 
the single root resource: it carries its own identity (`metadata`), the 
targeting fields from §2.1, and the trace-specific gating (a native plugin 
chain) and per-stage retention rules. There is no embedded abstract `Pipeline` 
and no `ExecutionTrigger`; the anchored filter points are the plug [...]
+
+The message set is:
+
+- **`TracePipelineConfig`** — root resource: `metadata`, `enabled`, the 
per-stage `stages` rules, the `schema_names` / `schema_name_regex` selector, 
the gating policy as a native-plugin `plugins` chain, the `enabled_events` list 
(defaults to `[PIPELINE_EVENT_MERGE]`), and the two grace windows `merge_grace` 
(§5.1 / §7.1) and `finalize_grace` (§7.3) — each consulted only when its 
corresponding event is enabled.
+- **`PipelineEvent`** — enum of pipeline-wide events: `PIPELINE_EVENT_MERGE` 
(in-merge filter at LSM compaction, §7.1) and `PIPELINE_EVENT_FINALIZE` (plugin 
gating pass at segment finalization, §7.3). Per-stage retention (`StageRule`) 
fires implicitly at migration-out and is not in this enum.
+- **`StageRule`** — binds the pipeline to one lifecycle stage and carries that 
stage's retention `plugins` chain: a `stage` name plus a `repeated Plugin`. The 
rule fires at the stage's migration-out boundary, where the chain's keep-mask 
decides which traces migrate; a `StageRule` with an empty `plugins` chain has 
no filtering effect (see §4.2).
+- **`Plugin`** — one link in a chain: a `name` plus a `oneof kind` whose set 
arm selects the kind. The set arm is the discriminator; today the only arm is 
`sampler` (a `SamplerPlugin`). Adding a kind is purely additive — a new payload 
message and a new `oneof` arm — so existing configs and field numbers are 
untouched (§2.6).
+- **`SamplerPlugin`** — the sampler kind of `Plugin`: `path` (the `.so` within 
the trusted plugin dir), `symbol` (constructor, default `NewSampler`), 
`abi_version` (checked against the host at load), and a structured `config` 
(`google.protobuf.Struct`) set directly in the pipeline config — the engine 
serializes it to JSON and hands it to the plugin, which unmarshals it into its 
own typed config. The vectorized-batch / projection / verdict contract is the 
Go SDK, not the proto (§2.5).
+- **`TracePipelineRegistryService`** — the CRUD registry surface (`Create` / 
`Update` / `Delete` / `Get` / `List` / `Exist`, with HTTP mappings under 
`/v1/trace-pipeline/schema`), mirroring every other schema resource's 
`*RegistryService`. `Create`/`Update` are where the admission and conflict 
checks of §2.3 / §2.4 run. Without this service a `TracePipelineConfig` would 
be an orphaned, un-writable resource.
+
+### 2.3 Uniqueness and Conflict Policy
+
+Multiple `TracePipelineConfig` resources can coexist in a Group, but their 
**effective coverage must be disjoint on two independent keys**, so the 
behavior at every point is deterministic — there is no implicit ordering, 
priority, or composition of overlapping pipelines:
+
+- **Gating key `(Group, Schema, Event)`** — at most one active pipeline may 
gate a given schema at a given `PipelineEvent` (`PIPELINE_EVENT_MERGE` / 
`PIPELINE_EVENT_FINALIZE`). Gating runs only in the Hot phase (§1.1), 
independent of `stages`.
+- **Retention key `(Group, Schema, Source-Stage)`** — at most one active 
pipeline may carry a `StageRule` for a given schema at a given source stage's 
migration-out boundary.
+
+**Effective coverage of a pipeline.** A `TracePipelineConfig` P with 
`enabled=true` covers:
+
+- **Gating tuples** `(P.metadata.group, schema, event)` for each `schema` 
selected by `P.schema_names`/`P.schema_name_regex` (both empty selectors target 
every schema in the Group) and each `event` in `P.enabled_events` (default 
`[PIPELINE_EVENT_MERGE]`).
+- **Retention tuples** `(P.metadata.group, schema, stage)` for each selected 
`schema` and each `stage` named by a `StageRule` in `P.stages`. An empty 
`P.stages` covers **no** retention tuples — it declares no per-stage retention, 
consistent with the proto semantics that empty `stages` means no per-stage drop.
+
+**Conflict detection on write.** When a `TracePipelineConfig` is created or 
updated, the registry expands its gating and retention tuples against the 
current schemas and stages of its Group and rejects the write if **any** tuple 
on either key is already covered by another `enabled` pipeline in the same 
Group. The error names the conflicting pipeline and the overlapping tuple(s).
+
+**Schema additions.** When a new `Trace` schema is created in a Group, the 
registry re-validates every `enabled` `TracePipelineConfig` in that Group 
against the new schema. If the new schema would cause two pipelines to overlap 
on any gating or retention tuple, the schema creation is rejected; the operator 
must narrow the pipelines (e.g. add explicit `schema_names`) before adding the 
schema.
+
+**Disabling and replacement.** Setting `enabled=false` removes a pipeline from 
active coverage immediately, freeing its tuples for another pipeline to claim. 
This is the safe path for atomically swapping in a new pipeline: disable the 
old, enable or create the new, then delete the old. There is no implicit 
precedence between two `enabled` pipelines — the registry will not pick a 
winner.
+
+**Runtime defense-in-depth.** At each enabled Hot-phase event (§7.1/§7.3) the 
engine selects the active pipeline by gating key `(group, schema, event)`; at a 
migration-out boundary (§7.2) it selects by retention key `(group, schema, 
source_stage)`. If a registry inconsistency ever exposes more than one active 
match for a key, the engine logs an error and applies **none** of them — 
failing safe to retain rather than risk a nondeterministic destructive drop.
+
+**Recommended pattern.** For most workloads, a single pipeline per Group with 
a broad `schema_name_regex` is the simplest configuration. Multiple pipelines 
in a Group are useful only when their schema selectors are mutually exclusive 
(for example `schema_names: ["segment"]` and `schema_names: ["zipkin_span"]` in 
a hypothetical mixed group) so no tuple is doubly covered.
+
+### 2.4 Admission Validation
+
+The proto pins bounds with PGV so a malformed config is rejected at parse time 
rather than producing undefined behavior. The schema registry's admission 
control adds one cross-field rule that PGV cannot express.
+
+**PGV-enforced bounds** (rejected at proto parse / `Write` time):
+
+- `TracePipelineConfig.merge_grace`, `TracePipelineConfig.finalize_grace` — 
strictly positive when set; unset means engine default (30s and 5m 
respectively). Each is consulted only when its corresponding `PipelineEvent` is 
in `enabled_events`.
+- `TracePipelineConfig.enabled_events` — each element must be a defined, 
non-`UNSPECIFIED` enum value (`repeated.items.enum = {defined_only: true, 
not_in: [0]}`).
+- `TracePipelineConfig.schema_names` — each element non-empty 
(`repeated.items.string.min_len = 1`).
+- `StageRule.stage` — non-empty (`min_len: 1`); `Plugin.name` — non-empty 
(`min_len: 1`); `SamplerPlugin.path` — non-empty (`min_len: 1`); 
`SamplerPlugin.abi_version` — `gte: 1`.
+
+**Admission rules (server-side; not expressible in PGV):**
+
+- **Required rule blocks when enabled.** When `TracePipelineConfig.enabled = 
true`, the config must declare at least one of: (a) a non-empty gating 
`plugins` chain paired with a non-empty **effective event set** — that is, 
after applying the `[PIPELINE_EVENT_MERGE]` default for an unset/empty 
`enabled_events`, at least one of `PIPELINE_EVENT_MERGE` or 
`PIPELINE_EVENT_FINALIZE` actually fires (this check must be evaluated against 
the defaulted set, not a raw `len(enabled_events) > 0`, so  [...]
+- **Well-formedness checks** (PGV cannot express these cross-element rules): 
`enabled_events` is normalized to a duplicate-free set (duplicates are 
de-duped, not an error); every `StageRule.stage` and every `schema_names` entry 
is unique within the config (duplicates rejected); every `StageRule.stage` 
names a real stage in the Group's `ResourceOpts.stages` and is **non-terminal** 
— a `StageRule` on the last (Cold) stage is rejected, since the terminal stage 
has no migration-out boundary  [...]
+- **Plugin admission (for every link in every chain — gating and per-stage).** 
Validation is split by topology, because plugins live on the **data nodes** 
while the registry write path runs on the **liaison/metadata** layer — the 
liaison cannot itself `dlopen` a data-node `.so`. (1) At write time the liaison 
validates only the **static** fields it can check without the binary: every 
`Plugin` has a set `kind` arm and a non-empty `name` (unique within its chain), 
and for the sampler kind n [...]
+
+Both the liaison write path and the per-node runtime apply their respective 
checks. If a registry inconsistency ever lets a malformed config reach a data 
node, the node logs an error and falls back to **retain** for every tuple the 
malformed config would have governed — never a destructive drop (consistent 
with the §2.3 runtime fail-safe).
+
+### 2.5 Native Plugin Contract & Chain Composition
+
+A pipeline's gating policy and each stage's retention are an **ordered chain 
of native Go plugins** (`plugins`). A `Plugin` is one link — a generic, 
kind-tagged envelope whose set `oneof` arm selects the kind; today the only 
kind is a **sampler**, a **user-supplied `.so` loaded in-process via the 
standard Go `plugin` package** that owns a keep/drop verdict. The chain is 
invoked by the enabled events (§7.1 / §7.3) at the same maturity 
(`merge_grace`) and settling (`finalize_grace`) gates. [...]
+
+This is an **operator-only, trusted** extension point: loading a `.so` runs 
arbitrary code inside the data node, so it is gated behind a server flag and a 
fixed trusted plugin directory; the proto config only *references* a plugin 
already vetted on disk (§2.4). It is not tenant-facing.
+
+The contract is designed around three hard requirements.
+
+**(2) Vectorized parameter — the engine's native trace columns.** The batch is 
**not** a parallel columnar form invented for the plugin; it is the engine's 
own native trace block (`banyand/trace/block.go`). The merger already streams 
and assembles one columnar `block` per `trace_id` — `spans [][]byte` (the span 
column), `tags []tag` (tag columns, each a row-aligned `values [][]byte` + a 
`valueType`), `spanIDs []string`, and `minTS`/`maxTS` — so the plugin receives 
exactly the projected c [...]
+
+**(1) Strong compatibility of the parameter and return types.** Go's `plugin` 
package is strict by design: the [official docs](https://pkg.go.dev/plugin) 
state host and plugin must be built with "exactly the same version of the 
toolchain, the same build tags, and the same values of certain flags and 
environment variables," and that "all common dependencies … [must be] built 
from exactly the same source code" — in practice "built together by a single … 
component." This lock is *irreducibl [...]
+
+- The boundary types live in a single pinned SDK module, `pkg/pipeline/sdk`, 
that mirrors the native column layout with stdlib slices (`[][]byte`, 
`[]string`, `int64`) plus the engine's already-public, byte-sized value-type 
enum `pbv1.ValueType` (`pkg/pb/v1`). Tag values cross as the raw marshaled 
`[][]byte` exactly as the block holds them; the SDK re-exports the `pbv1` 
decode helper so a plugin decodes a value by its `ValueType` without importing 
engine internals directly.
+- Because the batch *is* the native trace layout, the plugin is by design 
**version-locked to the BanyanDB release** — which is consistent with Go 
plugin's irreducible toolchain lock, not an additional cost. The SDK module is 
the single version-tagged surface operators build against; `pkg/pb/v1` is a 
stable `byte` enum, and no `banyand/trace`-internal struct is exported across 
the boundary.
+- The plugin re-exports an `ABIVersion` constant; the engine refuses to load 
on mismatch with its own compiled `sdk.ABIVersion`, turning a silent miscompile 
into a clear, fail-fast error. Configuration is a structured 
`google.protobuf.Struct` (`SamplerPlugin.config`) set directly in the pipeline 
config; the engine serializes it to canonical JSON and the plugin unmarshals 
those `[]byte` into its own typed config — so the wire form is structured and 
inspectable while the `.so` boundary sta [...]
+- **Distribution:** operators build plugins against the released, 
version-tagged `pkg/pipeline/sdk` using the **same CI image / Go version / 
`-trimpath` / CGO flags** as the data node. (Background constraints, 
well-documented but outside this design's verified scope: Go plugins are 
**Linux/macOS only**, **cannot be unloaded** — so changing a plugin requires a 
node restart, there is no hot-reload — and a plugin **panic crashes the host** 
unless contained; see fail-open below.)
+
+**(3) Projection / column selection — spans optional, more than tags.** The 
plugin declares the columns it needs up front via `Project()`, which returns a 
`Projection{ Tags []string; SpanIDs bool; Spans bool }`. The engine turns 
`Tags` into the **same `model.TagProjection`** the block reader already honors 
(`blockMetadata.tagProjection`), so only those tag columns are decoded into 
`block.tags` — literally the query engine's tag-projection path 
(`trace/v1/query.proto`), not a new mechanis [...]
+
+**Go SDK (`pkg/pipeline/sdk`).** The batch types mirror the native trace 
`block`/`tag` (`banyand/trace/block.go`); the engine fills them with the 
block's own slices, shared **read-only** (not copied) — see the read-only 
contract below:
+
+```go
+package sdk
+
+// ABIVersion is compiled into the host and re-exported by every plugin; the
+// engine refuses to load a plugin whose ABIVersion differs.
+const ABIVersion = 1
+
+// Exported symbols the .so MUST provide (the constructor name is the kind's
+// convention — NewSampler for the sampler kind):
+//   var  ABIVersion int                          // == sdk.ABIVersion
+//   func NewSampler(config []byte) (Sampler, error)
+
+// Kind identifies a Plugin's role in a chain; it mirrors the set arm of the
+// proto Plugin.kind oneof. New kinds are added here in lockstep with new arms.
+type Kind uint8
+
+const (
+    KindUnspecified Kind = iota // zero value; a real plugin never reports it
+    KindSampler                 // keep/drop verdict (see Sampler)
+)
+
+// Plugin is the common interface every kind satisfies — the generic link type
+// the engine handles uniformly. The engine constructs a Plugin, checks Kind
+// against the proto oneof arm that named it, then type-asserts to that kind's
+// interface (e.g. Sampler). Project and Close are shared by every kind.
+type Plugin interface {
+    // Kind reports the plugin kind, for bookkeeping and as a cross-check
+    // against the proto oneof arm. Constant for the plugin's lifetime.
+    Kind() Kind
+
+    // Project is the column-selection handshake, called ONCE at load. The
+    // engine honors it for the plugin's lifetime: Tags drives the native
+    // model.TagProjection (only those tag columns are decoded); SpanIDs and
+    // Spans gate the spans stream. Intrinsic columns are always present.
+    Project() Projection
+
+    // Close releases any resources the plugin holds (called once at unload).
+    Close() error
+}
+
+// Sampler is the keep/drop kind of Plugin (Kind reports KindSampler). In a
+// chain it is a conjunction link: each Sampler narrows the traces the next
+// link sees.
+type Sampler interface {
+    Plugin
+
+    // Decide receives a vectorized batch of assembled per-trace blocks and
+    // returns a keep-mask aligned to batch.Traces. The batch is READ-ONLY:
+    // Decide must not mutate any slice it receives (see the read-only 
contract).
+    Decide(batch *TraceBatch) (Verdict, error)
+}
+
+// Projection is the plugin's up-front column request — one handshake covering
+// every optional column. Intrinsic columns (TraceID, MinTS/MaxTS) are always
+// materialized and are NOT listed here.
+type Projection struct {
+    // Tags names the tag columns to decode; the engine builds a
+    // model.TagProjection from them. Empty => no tag columns decoded.
+    Tags []string
+    // SpanIDs opts IN to the span-id column. Default false. span ids and span
+    // bodies share one encoded data block in the native layout, so requesting
+    // span ids forces a read of the spans stream (it is NOT free metadata).
+    SpanIDs bool
+    // Spans opts IN to the heavy span-body column. Default false: the engine
+    // leaves TraceBlock.Spans nil and, on the merge fast path (mustReadRaw),
+    // never decodes span bodies. Set true only when the verdict reads them.
+    Spans bool
+}
+
+// TraceBatch is a vectorized batch of assembled per-trace blocks — the 
engine's
+// native columnar trace layout, not a parallel form invented for the plugin.
+type TraceBatch struct {
+    Traces []TraceBlock
+}
+
+// TraceBlock mirrors the native trace `block`: all columns are indexed in
+// lockstep by span row i in [0,Len). The engine shares the block's slices
+// READ-ONLY. Intrinsic columns are always populated; SpanIDs, Tags, and Spans
+// appear only as requested by Project().
+type TraceBlock struct {
+    // Intrinsic — always present (from blockMetadata, no spans-stream decode):
+    TraceID string
+    MinTS   int64       // earliest span start (unix nanos)
+    MaxTS   int64       // latest span end; Duration = MaxTS - MinTS
+    // Projected — per Project():
+    Tags    []TagColumn // projected tag columns (per Projection.Tags)
+    SpanIDs []string    // span-id column, row-aligned; nil unless 
Projection.SpanIDs
+    Spans   [][]byte    // span bodies (heaviest column); nil unless 
Projection.Spans
+}
+
+// TagColumn mirrors the native `tag`: a row-aligned column of marshaled values
+// plus the value type needed to decode them via the SDK's pbv1 helper.
+type TagColumn struct {
+    Name      string
+    ValueType pbv1.ValueType // pkg/pb/v1 — the engine's stable byte-sized enum
+    Values    [][]byte       // one marshaled value per span row; nil == absent
+}
+
+// Verdict is the per-trace decision, aligned to batch.Traces.
+type Verdict struct {
+    Keep []bool // len(Keep) MUST == len(batch.Traces); Keep[i] retains 
Traces[i]
+}
+```
+
+These types are real and live in [`pkg/pipeline/sdk`](../../pkg/pipeline/sdk); 
the block above is the conceptual layout, and the canonical definitions plus 
the value-decode helpers (`TagColumn.At`, `DecodeTagValue`) are in that 
package. A complete reference plugin implementing the Scenario 6.1 sampler — 
config parsing, `Project()`, and tag/span extraction — lives at 
[`pkg/pipeline/sdk/_example/segment-tail-sampler`](../../pkg/pipeline/sdk/_example/segment-tail-sampler).
+
+**Verdict shape — boolean keep-mask.** `Decide` returns a `[]bool` aligned to 
`batch.Traces`: `Keep[i]` retains trace `i`. This is the simplest 
fully-vectorized contract and makes the alignment invariant trivial to check 
(the engine rejects a verdict whose length ≠ `len(batch.Traces)`). The 
keep/drop is per `trace_id`, matching the merger's per-`trace_id` write 
granularity (§7.1).
+
+**Chain composition — sequential pipe.** `TracePipelineConfig.plugins` 
(gating) and each `StageRule.plugins` (per-stage retention) are *ordered 
chains*. The engine materializes the **union** of every link's `Project()` 
once, then runs the links in declared order, handing each link the traces the 
previous link kept — a read-only sub-batch view, not a copy. The chain's 
survivors are what the event retains. For an all-sampler chain this is exactly 
the conjunction (AND) of the links' keep-ma [...]
+
+**Read-only batch contract.** The plugin receives the native block's slices 
**shared, not copied** — so `Decide` MUST treat `TraceBatch` (and every 
`[]byte`/`[]string`/`TagColumn` inside it) as **read-only**. The keep-mask is 
the plugin's **only** output channel. Crucially, persistence is structurally 
immune to a misbehaving plugin: the engine applies the returned mask to its 
**own** untouched block data when writing the reduced part — it never 
serializes the plugin-visible slices back t [...]
+
+**Failure handling — fail-open.** The engine wraps every `NewSampler`/`Decide` 
call in `recover()`. On panic, returned error, or a length-mismatched verdict, 
the engine **retains every trace in the batch** and emits a log + metric. A 
retention filter must never *drop* data because of a plugin bug; a misbehaving 
plugin degrades to "keep everything," not "drop everything." (A 
consecutive-failure counter may later disable the sampler entirely and alert, 
but the per-batch default is always k [...]
+
+**Determinism.** `Decide` is expected to be **pure** in its inputs 
(`TraceBatch` + the frozen `config`). A plugin that consults wall-clock or 
mutable external state breaks the "re-evaluation is stable across 
merge/finalize" property the timing model (§3.1) relies on; the SDK documents 
this expectation.
+
+**Where it runs.** The plugin runs at each enabled event: at 
`PIPELINE_EVENT_MERGE` the batch collects the mature (`merge_grace`-passed) 
traces the merger has assembled (§7.1); at `PIPELINE_EVENT_FINALIZE` it 
collects the settled segment's traces (§7.3). In both cases the engine wraps 
the native assembled `block` it already holds as a `TraceBlock` (sharing the 
column slices read-only) and buffers several before calling `Decide` once, 
applying the same metadata-vs-decode cost split as §7. [...]
+
+### 2.6 Extending with a New Plugin Kind
+
+The sampler is the only kind this design ships, but `Plugin` is a generic 
envelope so new kinds are additive — no change to `TracePipelineConfig`, 
`StageRule`, the chain semantics, or existing `.so`s. Adding a hypothetical 
**transformer** kind (one that *rewrites* batch data — drop or redact tags, 
enrich, collapse spans — rather than only filtering) would touch three places, 
in lockstep:
+
+1. **Proto** — add a payload message and a new arm to the `oneof`: `oneof kind 
{ SamplerPlugin sampler = 2; TransformerPlugin transformer = 3; }`. Existing 
arms and field numbers are untouched, so old configs still parse.
+2. **SDK** — add a `Transformer` sub-interface that embeds `Plugin` and adds 
its own processing method (e.g. `Transform(*TraceBatch) (*TraceBatch, error)`), 
plus a `KindTransformer` value and a `NewTransformer` constructor convention. A 
transform that emits a *new* batch needs a mutable output channel, which is 
exactly why this design keeps the batch strictly read-only and defers the 
transform kind: the read-only contract (above) holds precisely because the only 
shipped kind is a filter.
+3. **Engine** — when the chain executor meets a `transformer` link it 
type-asserts to `Transformer` and feeds its output to the next link instead of 
applying a keep-mask. The sequential-pipe model (§2.5) already hands each link 
the previous link's output, so a transform link slots in without changing how 
the chain is wired — it simply rewrites the sub-batch the next link sees.
+
+The same three-step pattern (proto arm → SDK sub-interface + constructor → 
engine dispatch) covers any future kind (exporter, router, …); the 
discriminator is always the set `oneof` arm, cross-checked by `Plugin.Kind()`.
+
+## Retention-Decision Timing & Trace Completeness
+
+Both the plugin gating policy (§1.1) and the per-stage retention plugin (§1.2) 
operate on the **whole** trace, not on individual spans as they arrive: 
`D_total` needs the earliest start and latest end, `has_error` scans all spans, 
and tag matchers scan all spans. Spans of one trace, however, arrive at the 
storage node anywhere from milliseconds to hours apart, and BanyanDB writes 
each span into the segment selected by its **event time** (the `start_time` 
tag), not its arrival time (`bany [...]
+
+### 3.1 When Retention Decisions Are Safe
+
+Retention decisions are therefore **not** made at write time. They are made by 
the post-trace passes that re-assemble a trace by `trace_id`:
+
+- **In-merge filter, during Hot-phase compaction** — when 
`PIPELINE_EVENT_MERGE` is enabled (the default), the plugin gating policy is 
evaluated in-line during Hot-phase LSM compaction (Warm/Cold compactions stay 
lossless). Compaction is part-count-driven, so it can fire while a trace is 
still growing; per-trace drops are deferred until `traceMaxTs < now − 
merge_grace` (§7.1).
+- **Plugin gating, at finalization** — when `PIPELINE_EVENT_FINALIZE` is 
enabled, the plugin gating policy is evaluated once per **Hot** segment after 
it has **settled**: its event-time window has closed *and* the event-time 
watermark has advanced past it by the `finalize_grace` period (§7.3). BanyanDB 
has no hard "segment sealed" event — `create()` pre-creates the next segment up 
to an hour *before* the current window ends, and late data keeps routing to an 
old segment by event time — s [...]
+- **Per-stage retention, at migration-out** — by the time a segment is 
migrated to the next stage (≥ its stage's TTL, orders of magnitude longer than 
`finalize_grace`), every trace it holds has settled, so per-stage plugin 
evaluation is final.
+
+Two completeness caveats follow from event-time segmentation:
+
+1. **Spans arriving after finalization.** A span whose `start_time` falls in 
an already-finalized segment is still accepted (while the segment is within 
retention) but is missed by the finalization-time evaluation. The 
`finalize_grace` period bounds how often this happens; a late span that arrives 
after the gate has run does not retroactively change the verdict for 
already-dropped traces.
+
+2. **Traces spanning multiple segments.** A trace longer than 
`segment_interval`, or one that straddles a boundary, is physically split 
across two segments, and each segment's pass evaluates only its own fragment 
(`D_total` and the indicators are fragment-local). BanyanDB performs no 
cross-segment trace assembly at the storage layer. Where trace duration is far 
below `segment_interval` (e.g. the showcase's ≈2.8 s max trace against 1-day 
segments) the fragment equals the whole trace and t [...]
+
+## Integration of the Retention System & Time-Aging System
+
+Distributed telemetry's diagnostic utility naturally decays as time passes. 
The pipeline must therefore decide, per stage, what to keep — but it must 
**not** try to pick the physical storage medium, because in BanyanDB the medium 
is already determined by where a `LifecycleStage` is placed.
+
+### 4.1 Stage Placement vs. Rule-Governed Retention
+
+The physical medium is **not** fixed per tier by this design. Each 
`LifecycleStage` (`common.v1.LifecycleStage`) is routed to a node group through 
its `node_selector`; the medium is whatever hardware that node group runs, and 
operators may deploy heterogeneous backings (for example two warm node groups 
on different SSD classes). A common placement is Hot → local NVMe, Warm → SATA 
SSD/HDD, Cold → object storage, but that mapping is an operator deployment 
choice, not a rule this pipeline e [...]
+
+Because medium selection already belongs to stage placement, per-stage 
`StageRule` retention **does not route traces to a medium**. Within whatever 
stage currently holds a trace, the stage plugin only acts as a **retention 
gate** — dictating whether a trace is retained when a partition is rewritten or 
migrated, or discarded/pruned to reduce storage footprint.
+
+### 4.2 Stage-Stepped Retention via Rising Plugin Config
+
+"Aging" is expressed structurally: each `StageRule` carries its own retention 
`plugins` chain, and the bar rises as data moves to colder stages by tightening 
each chain's plugin **config**. **Semantics at a stage's migration-out 
boundary:** the stage chain returns a keep/drop verdict per `trace_id` over the 
vectorized batch (§2.5); a trace it drops is omitted from the part written for 
the next stage. A `StageRule` with an empty `plugins` chain has no filtering 
effect (every trace at that [...]
+
+A typical profile rises across **three points** — Hot-phase gating, then each 
migration-out boundary — tightening the config at each step; for example, with 
one shared retention plugin the operator configures per stage:
+
+- **Gating (Hot phase):** keep if duration ≥ threshold, OR error, OR a key tag 
matches; probabilistically sample the healthy rest.
+- **Hot → Warm:** keep if duration ≥ 100 ms, OR error, OR a key tag matches 
(PostgreSQL, ActiveMQ, etc.).
+- **Warm → Cold:** keep errors only — slow-but-healthy traces are dropped at 
this boundary, so only errors enter Cold and persist for the full 30-day 
retention. Cold itself does no sampling (Cold compactions are lossless).
+
+The plugin's verdict is deterministic in its inputs, so retention stays 
self-explanatory: a trace is retained at Warm because the stage plugin's config 
matched `db.type=PostgreSQL`, even though it was only 3 ms. Time enters only 
through *when* a partition reaches each stage, which is governed by 
`LifecycleStage.ttl` / `segment_interval`.
+
+## Downstream Lifecycle Actions Governed by Per-Stage Retention
+
+The time-aging engine has up to three retention points: LSM-merge filtering 
during routine compaction (§5.1, gated by `PIPELINE_EVENT_MERGE` — on by 
default), per-stage retention at the migration-out boundary (§5.2, always-on 
when any `StageRule` carries a `plugins` chain), and segment-granularity 
eviction (§5.3, unchanged).
+
+### 5.1 Compaction Rewrites (when `PIPELINE_EVENT_MERGE` is enabled)
+
+When `PIPELINE_EVENT_MERGE` is in `enabled_events` (the default), the data 
node hooks the **Hot-phase** LSM merge stream so that traces failing the plugin 
gating policy are omitted from the consolidated output (Warm and Cold 
compactions are never filtered — they stay byte-for-byte lossless). Hot 
compaction is then both a space-reclamation pass and an early retention pass — 
drops happen incrementally as Hot parts age, rather than once-per-segment at 
finalization. Per-trace drops are gated [...]
+
+When the event is disabled (`enabled_events` omits `PIPELINE_EVENT_MERGE`), 
routine LSM compaction stays byte-for-byte lossless; retention falls entirely 
to the finalization gate (§7.3, when `PIPELINE_EVENT_FINALIZE` is enabled) and 
the per-stage retention plugin (§5.2).
+
+### 5.2 Partition-Level Tier Migration
+
+Both showcase trace groups (`sw_trace`, `sw_zipkinTrace`) declare a Hot → Warm 
→ Cold lifecycle in `ResourceOpts.stages`: Hot holds the active window (1-day 
ttl), Warm runs on `node_selector type=warm` (7-day ttl), and Cold runs on 
`node_selector type=cold` with `close=true` (30-day ttl). When a Hot segment 
matures past its 1-day ttl, the migration worker copies it to the Warm node 
group; the medium behind each node group (e.g. NVMe → SATA SSD → object 
storage) is the operator's deployme [...]
+
+- The migration engine runs the source stage's retention `plugins` chain 
against every trace in the partition.
+
+- **No dynamic splitting is performed:** all retained data is written to the 
next stage's node group. Traces the source stage's plugin drops are omitted 
from the target write stream, reducing the physical size of the migrated 
partition. With Scenario 6.1's hot retention plugin (config `min_duration: 
100ms`) a healthy `/homepage` trace (2802 ms) is kept and migrates to Warm; a 
PostgreSQL-touching trace is kept by the config's tag rule and migrates too; a 
healthy fast trace (6 ms) matches  [...]
+
+- When the partition matures past the Warm 7-day ttl, the Warm stage's 
retention plugin runs at this Warm→Cold boundary — the gate that decides what 
enters Cold (typically the strictest config; in Scenario 6.1 it keeps errors 
only). Only the traces it keeps are written into the Cold parts and then 
retained for the full Cold TTL; everything else is dropped here. Cold itself 
does no further sampling — Cold compactions are lossless.
+
+### 5.3 Eviction: Per-Trace Drop During Rewrites, Segment-Granularity GC
+
+BanyanDB does not tombstone individual traces, and this design does not add 
per-trace tombstones. The trace part layout is column-oriented (separate 
`primary`, `spans`, and `tags` streams), so there is no per-trace delete 
primitive. Eviction therefore happens at two distinct granularities:
+
+- **Per-trace removal is a side effect of the pre-migration rewrite (§7.2), 
not GC.** A trace the current stage's retention plugin drops is simply omitted 
from the reduced part produced for the next stage. No tombstone is written; the 
trace's columns are not copied into the new part, and the space is reclaimed 
when the old part is retired with the source segment.
+
+- **Whole-segment eviction stays the existing mechanism.** Reclaiming an 
entire time segment remains governed by the current retention path — TTL expiry 
plus the disk high/low watermarks (`banyand/trace/svc_standalone.go`, 
`TopicDeleteExpiredTraceSegments`). Per-stage plugins do not place tombstones 
and do not trigger segment deletion; they only change how much of a segment 
migrates to the next stage.
+
+## Operational Scenario Configurations
+
+Below are two operational scenarios represented as complete 
`TracePipelineConfig` instances, one per trace group found in the 
skywalking-showcase cluster (`sw_trace` and `sw_zipkinTrace`). Group names, 
schema names, tags, latencies, and lifecycle stages are taken from real data in 
that cluster; the retention outcomes quoted are derived by evaluating the 
per-stage plugins against real sampled traces.
+
+### 6.1 Scenario 1: SkyWalking-Native Segment Retention (`sw_trace`)
+
+- **Objective**: On the showcase `sw_trace` group (schema `segment`), keep 
every error trace all the way through migrations for incident forensics, keep 
genuinely slow requests through Warm, always keep traces that touch PostgreSQL 
or the ActiveMQ `queue-songs-ping` queue at the Hot→Warm boundary, and 
probabilistically sample the healthy remainder at gating. The group's real Hot 
→ Warm → Cold stages (ttl 1d / 7d / 30d) carry rising retention strictness at 
each migration-out boundary. The [...]
+
+- **Configuration JSON**:
+
+```Plain Text
+{
+  "metadata": { "group": "sw_trace", "name": "segment-tail-sampler" },
+  "enabled": true,
+  "stages": [
+    {
+      "stage": "hot",
+      "plugins": [
+        {
+          "name": "hot-retention",
+          "sampler": {
+            "path": "segment-stage-retention.so",
+            "abi_version": 1,
+            "config": {
+              "min_duration": "0.100s",
+              "keep_errors": true,
+              "keep_tag_rules": [
+                { "tag_key": "db.type",  "equals": "PostgreSQL" },
+                { "tag_key": "mq.queue", "equals": "queue-songs-ping" }
+              ]
+            }
+          }
+        }
+      ]
+    },
+    {
+      "stage": "warm",
+      "plugins": [
+        {
+          "name": "warm-retention",
+          "sampler": {
+            "path": "segment-stage-retention.so",
+            "abi_version": 1,
+            "config": {
+              "keep_errors": true
+            }
+          }
+        }
+      ]
+    }
+  ],
+  "schema_names": ["segment"],
+  "plugins": [
+    {
+      "name": "segment-tail-sampler",
+      "sampler": {
+        "path": "segment-tail-sampler.so",
+        "abi_version": 1,
+        "config": {
+          "duration_threshold": "0.500s",
+          "keep_errors": true,
+          "healthy_sample_rate": 0.1,
+          "keep_tag_rules": [
+            { "tag_key": "db.type",  "equals": "PostgreSQL" },
+            { "tag_key": "mq.queue", "equals": "queue-songs-ping" }
+          ]
+        }
+      }
+    }
+  ],
+  "enabled_events": ["PIPELINE_EVENT_MERGE", "PIPELINE_EVENT_FINALIZE"],
+  "merge_grace": "30s",
+  "finalize_grace": "300s"
+}
+```
+
+> Each plugin link's `config` is a structured `google.protobuf.Struct` set 
directly in the pipeline config (not an opaque blob); the engine does not 
interpret its keys — it serializes the object to JSON and hands the bytes to 
the plugin's constructor, which unmarshals them into its own typed config. The 
gating chain here is a single `sampler` link whose `Project()` returns 
`Projection{ Tags: ["is_error", "db.type", "mq.queue"], SpanIDs: false, Spans: 
false }`, so only those three tag col [...]
+
+- **Retention Dynamics** (real `sw_trace` traces; gating runs in Hot, owned by 
the gating chain; per-stage retention owned by each stage's `plugins` chain):
+
+    - The error trace `5fcdb353-…` (`POST /test`, `agent::app`, `is_error=1`, 
4 ms) is a sure-keep at gating: the gating plugin keeps it because its config 
sets `keep_errors`. At Hot→Warm, the hot retention plugin keeps it (config 
`keep_errors`) → migrated. At Warm→Cold, the warm retention plugin 
(errors-only) keeps it → migrated into Cold, where it is retained for the full 
30-day Cold TTL (Cold does no further sampling).
+
+    - The slow healthy trace `b03bb932-…` (`/homepage`, `agent::ui` → 
`agent::frontend`, 2802 ms) is a sure-keep at gating: the gating plugin keeps 
it because 2802 ms > 500 ms `duration_threshold` in its config (from `MaxTS - 
MinTS`, no span decode). At Hot→Warm: the hot plugin keeps it (2802 ms ≥ its 
100 ms `min_duration`) → kept through Warm. At Warm→Cold: the warm plugin 
(errors-only) drops it (no error) → **dropped at Warm→Cold; never enters Cold**.
+
+    - A PostgreSQL-touching trace (e.g. `b31e4be8-…`, `agent::songs` 
`UndertowDispatch`, 3 ms, `db.type=PostgreSQL`) is sure-kept at gating via the 
gating plugin's `db.type` keep-tag-rule. At Hot→Warm: the hot plugin keeps it 
(its config's `db.type` rule) → kept through Warm. At Warm→Cold: the warm 
plugin (errors-only) drops it (no error) → **dropped at Warm→Cold; never enters 
Cold**.
+
+    - A healthy fast trace such as `GET:/songs` at 6 ms (`agent::songs`, 
`http.status_code=200`) is only kept at gating if the gating plugin's 
`healthy_sample_rate` (`0.1`) hash retains it (deterministic `hash(trace_id) < 
0.1`, since it matches no sure-keep rule). If kept, at Hot→Warm the hot plugin 
drops it (6 ms < 100 ms, no error, no tag match) → **dropped at Hot→Warm 
migration**.
+
+### 6.2 Scenario 2: Istio / Zipkin Mesh Edge Sampling (`sw_zipkinTrace`)
+
+- **Objective**: On the showcase `sw_zipkinTrace` group (schema 
`zipkin_span`), apply a lower-cost edge sampler to the Istio service-mesh 
spans. The Zipkin schema has no first-class `is_error` column, so server errors 
are caught with a tag rule on the flattened `query` attributes rather than 
`keep_errors`; mesh gateway spans are kept by tag. Targets the group's Warm and 
Cold stages. As in §6.1 the gating verdict is owned by a native Go plugin chain 
(a single sampler link, §2.5); per-stag [...]
+
+- **Configuration JSON**:
+
+```Plain Text
+{
+  "metadata": { "group": "sw_zipkinTrace", "name": "zipkin-edge-sampler" },
+  "enabled": true,
+  "stages": [
+    {
+      "stage": "warm",
+      "plugins": [
+        {
+          "name": "warm-retention",
+          "sampler": {
+            "path": "zipkin-stage-retention.so",
+            "abi_version": 1,
+            "config": {
+              "min_duration": "1s",
+              "keep_tag_rules": [
+                { "tag_key": "query", "regex": "http\\.status_code=5\\d\\d" },
+                { "tag_key": "local_endpoint_service_name", "equals": 
"gateway.sample-services" }
+              ]
+            }
+          }
+        }
+      ]
+    }
+  ],
+  "schema_names": ["zipkin_span"],
+  "plugins": [
+    {
+      "name": "zipkin-edge-sampler",
+      "sampler": {
+        "path": "zipkin-edge-sampler.so",
+        "abi_version": 1,
+        "config": {
+          "duration_threshold": "1.000s",
+          "keep_errors": false,
+          "healthy_sample_rate": 0.05,
+          "keep_tag_rules": [
+            { "tag_key": "query", "regex": "http\\.status_code=5\\d\\d" }
+          ]
+        }
+      }
+    }
+  ],
+  "merge_grace": "30s"
+}
+```
+
+> Each link's `config` is a structured `google.protobuf.Struct` set directly 
in the pipeline config (not an opaque blob); the engine does not interpret its 
keys — it serializes the object to JSON and hands the bytes to the plugin's 
constructor, which unmarshals them into its own typed config. The gating 
chain's single `sampler` link `Project()` returns `Projection{ Tags: ["query"], 
SpanIDs: false, Spans: false }`, so only the `query` tag column (plus the 
intrinsic `trace_id` / `MinTS` /  [...]
+
+- **Retention Dynamics** (real `sw_zipkinTrace` spans; gating runs in Hot, 
owned by the gating chain; per-stage retention owned by the Warm `plugins` 
chain):
+
+    - The slowest mesh call observed — `trace_id 0961e077…`, a 30.7 s 
`istio.skywalking-showcase` client span to Grafana's live-WS endpoint 
(`http.status_code=101`) — is a sure-keep at gating: the gating plugin keeps it 
because 30.7 s > 1 s `duration_threshold` in its config (from `MaxTS - MinTS`, 
no span decode). At Warm→Cold: the warm retention plugin keeps it (30.7 s ≥ its 
1 s `min_duration`) → migrated into Cold, where it is retained for the full 
Cold TTL (Cold does no further sampling).
+
+    - A gateway span on `gateway.sample-services` at the mesh p90 (~19 ms): 
the gating plugin passes it only via the `0.05` `healthy_sample_rate` hash (19 
ms < 1 s, no 5xx, so it matches no sure-keep rule). If kept, at Warm→Cold the 
warm plugin keeps it (19 ms < 1 s, but its `local_endpoint_service_name = 
gateway.sample-services` rule matches) → migrated into Cold, where it is 
retained for the full Cold TTL.
+
+    - A typical p50 mesh span (~2 ms) is kept at gating only via the gating 
plugin's `0.05` sample. At Warm→Cold the warm plugin drops it (2 ms < 1 s, not 
a gateway span, no 5xx) → **dropped at Warm→Cold**.
+
+    - Any span carrying a `5xx` in its `query` attributes is sure-kept at 
gating (Hot) by the gating plugin's `query` keep-tag-rule, and kept at 
Warm→Cold by the warm plugin's `query` rule → migrated into Cold and preserved 
for the full Cold TTL (Cold does no further sampling). Whole Warm/Cold segments 
are still reclaimed on their own schedule by `LifecycleStage.ttl`.
+
+## Post-Trace Data Flow Architecture
+
+The execution of the post-trace pipeline occurs natively on BanyanDB Data 
Nodes via three distinct pathways, matching storage lifecycle transitions.
+
+```mermaid
+flowchart TD
+    RI["Raw Ingestion"] --> LN["Liaison Node"] --> FW["Fast Local Write"] --> 
MB["Data Node Memory Buffer"]
+    MB --> PL["Storage Node Post-Trace Processing Loops"]
+    PL --> H71["7.1 LSM Compaction Merge Filter Hook<br/>gated by 
PIPELINE_EVENT_MERGE; per-trace merge_grace defers drops"]
+    PL --> H72["7.2 Pre-Migration Filter Rewrite<br/>rewrite segment to a 
reduced part, then migrate it unchanged"]
+    PL --> H73["7.3 Scheduled Final Filter on a Settled Segment<br/>gated by 
PIPELINE_EVENT_FINALIZE; watermark + finalize_grace"]
+```
+
+### 7.1 LSM Compaction Merge Phase Loop (when `PIPELINE_EVENT_MERGE` is 
enabled)
+
+During **Hot-phase** LSM file compaction, immature part files containing 
segmented trace blocks are merged into unified, consolidated parts. The merge 
loop already streams blocks grouped by `trace_id`, so it is the ideal place to 
evaluate traces without a separate read pass — *provided* drops are deferred 
until each trace is mature. (Warm and Cold compactions run the same merger with 
the gating hook disabled, so they stay lossless.)
+
+> **Design decision (accepted):** The trace LSM merger is refactored to expose 
a **trace-filter hook**. When `PIPELINE_EVENT_MERGE` is in `enabled_events` and 
a `TracePipelineConfig` targets the schema being compacted, blocks belonging to 
traces failing the plugin gating policy are omitted from the consolidated 
output: mature traces are accumulated into a projected `TraceBatch` (§2.5) and 
the plugin's boolean keep-mask drives the per-`trace_id` drop. With 
`PIPELINE_EVENT_MERGE` disabled  [...]
+
+#### Integration point in the real merger
+
+The hook is injected into `mergeBlocks` (`banyand/trace/merger.go`), which is 
the single point where every emitted block flows to the block writer. Both 
write paths are wrapped by the filter:
+
+- `blockWriter.mustWriteRawBlock` — the fast path that copies a single-block 
trace as raw bytes without decoding.
+- `blockWriter.mustWriteBlock` — the slow path that emits a decoded, 
accumulated block for a `trace_id`.
+
+```mermaid
+flowchart TD
+    IP["Immature Parts"] --> BR["blockReader.nextBlockMetadata<br/>streams 
blocks ordered by trace_id"]
+    BR --> PA["Per-trace assembly<br/>existing pending-block accumulation in 
mergeBlocks"]
+    PA --> M{"Mature?<br/>traceMaxTs (timestampsMetadata.max) older than now − 
merge_grace"}
+    M -->|"NO (trace may still grow)"| RET["RETAIN unchanged"]
+    M -->|"YES (stopped growing)"| CHK["plugin gating policy check"]
+    CHK -->|"KEEP"| WB["mustWriteBlock / mustWriteRawBlock"]
+    CHK -->|"DROP"| SK["blocks skipped; spans, tags, primary entries never 
written<br/>(space reclaimed in-stream, no delete mutation)"]
+```
+
+#### Filter contract and what the merge already gives us for free
+
+1. **Duration is free; projected columns cost a decode.** Block metadata 
carries `timestampsMetadata{min,max}` (`block_metadata.go`), so `D_total` is 
derivable on the raw fast path without unmarshaling — duration checks are free. 
A plugin that projects tag columns (`Project().Tags`) — or opts into span 
bodies (`Project().Spans`) — forces the decoded slow path (`loadBlockData`) for 
those columns; a plugin that needs neither stays on the raw fast path.
+
+2. **Decisions are per `trace_id`, not per block.** A single trace may be 
emitted as multiple blocks when its accumulated span size crosses 
`maxUncompressedSpanSize`. The filter computes one keep/drop verdict per 
`trace_id` and applies it to every block carrying that id, so a trace is never 
partially written.
+
+3. **A trace is dropped only after it stops growing (per-trace 
`merge_grace`).** Compaction is part-count-driven (`getPartsToMerge`), not 
time-driven — so a merge routinely runs on the active write window while a 
trace's remaining spans are still seconds away. Dropping such a trace on its 
partial spans would orphan the late ones. The filter therefore evaluates a 
trace only once its latest span timestamp is older than `now − merge_grace` 
(the trace's max timestamp is read for free from `t [...]
+
+4. **Derived part state reflects only retained traces.** When a trace is 
dropped, its entries are excluded from `partMetadata` counts, the 
`traceIDFilter` bloom filter (`mustWriteTraceIDFilter`), and the `tagType` set 
written at `Flush`. The filter runs before these are finalized so the 
consolidated part stays self-consistent.
+
+5. **Secondary indexes must be pruned in lockstep — this is net-new engine 
work.** Today `mergePartsThenSendIntroduction` merges sidx parts via 
`sidxInstance.Merge(closeCh, partIDMap, newPartID)` — a **part-ID-based** merge 
with no per-`trace_id` predicate, so a core trace drop would leave **dangling 
sidx entries** pointing at traces no longer in the reduced part. Closing this 
gap is a required, not-yet-existing contract: the trace filter must surface the 
set of dropped `trace_id`s, and  [...]
+
+6. **Drops are final and the merge is crash-safe.** Because a trace is dropped 
only after `merge_grace` has elapsed since its last span (point 3), the verdict 
acts on a trace that has stopped growing, so the drop is final rather than 
premature. The merge writes the new part atomically and only retires source 
parts after the introduction is applied, so a crash mid-merge leaves the 
immature parts intact for retry; re-running compaction on an already-filtered 
part is a no-op.
+
+### 7.2 Hot-to-Warm Tier Migration (Pre-Migration Filter Rewrite)
+
+When the lifecycle agent migrates a segment to the next stage, the existing 
transfer is an opaque byte stream of part directories — it is not a per-trace 
filter. Today's path walks shards via `traceMigrationVisitor.VisitShard` 
(`banyand/backup/lifecycle/trace_migration_visitor.go`), reads source parts 
with `generateAllPartData` (which opens part files via 
`trace.CreatePartFileReaderFromPath`, `banyand/trace/part.go`), and ships them 
with `streamPartToTargetShard` over a chunked sync clie [...]
+
+> **Design decision (accepted):** Do **not** filter inside the byte-copy 
transfer. Instead, run a **pre-migration filter pass** on the source segment 
that reads the existing parts through the pipeline and writes a **new, reduced 
part**; the unchanged migration then streams that new part to the next stage. 
This keeps "migration" a faithful byte copy and isolates all lossy behaviour in 
a dedicated rewrite step.
+
+```mermaid
+flowchart TD
+    HT["Hot Tier segment (settled; at least stage TTL old)"]
+    HT --> PR["Pre-migration rewrite pass — new visitor over the segment's 
parts<br/>read each part: blockReader over 
CreatePartFileReaderFromPath<br/>apply pipeline filter (same trace-filter 
contract as §7.1)<br/>write retained traces to a NEW reduced part via 
blockWriter"]
+    PR --> RP["Reduced part on Hot tier"]
+    RP --> EM["Existing migration (unchanged)<br/>VisitShard → 
generateAllPartData → streamPartToTargetShard<br/>opaque chunked byte copy of 
the reduced part"]
+    EM --> WT["Warm tier segment"]
+```
+
+1. The pre-migration pass attaches between `generateAllPartData` and 
`streamPartToTargetShard` in `VisitShard`: each source part is read with 
`blockReader`/`blockWriter`, the source stage's retention `plugins` chain runs 
per `trace_id`, and a reduced part is produced in place of the source.
+
+2. The filter contract is per-`trace_id` retain/drop verdicts, with dropped 
traces excluded from `partMetadata`, the `traceIDFilter` bloom filter, 
`tagType`, and the parallel sidx parts — the latter via the same trace-ID-aware 
sidx rewrite required for the merge path (§7.1, point 5), which is net-new 
engine work. Because a segment only migrates once it is at least its stage TTL 
old (e.g. a Hot segment after ~1 day), its event-time window closed long ago 
and any late arrivals have settled [...]
+
+3. The reduced part — not the original — is what `streamPartToTargetShard` 
transfers. The migration protocol, replica fan-out, and destination 
registration are untouched, so the considerable savings come purely from 
shipping fewer bytes to the next tier.
+
+### 7.3 Scheduled Final Filter on a Settled Segment
+
+The plugin gating pass must run only once a Hot segment is unlikely to gain 
more spans — otherwise a trace whose remaining spans are still arriving could 
be judged prematurely. (Finalization, like the in-merge filter, runs only in 
the Hot phase.) The catch: **BanyanDB has no "segment sealed" signal** to 
trigger this, so the pass is driven by an event-time watermark and a grace 
period instead.
+
+Why there is no seal event (grounding in 
`banyand/internal/storage/rotation.go`):
+
+- **Writes route by event time, so old segments keep receiving data.** A span 
is written into the segment whose window contains its `start_time` 
(`banyand/trace/write_standalone.go` → `CreateSegmentIfNotExist(time.Unix(0, 
ts))`), not its arrival time. A span that arrives hours late still lands in its 
old event-time segment for as long as that segment exists.
+- **Segment creation is look-ahead, not rollover.** The rotation loop 
pre-creates the *next* segment up to `creationGap` (1 hour) **before** the 
current window ends: it fires `segmentController.create` only while `0 < 
latest.End - eventTime <= newSegmentTimeGap`. At that moment the current 
segment is still the active write target, so `create()` cannot mean "the 
previous segment is sealed."
+- **`closeIdleSegments` is not a seal.** It only releases idle in-memory 
handles after an idle timeout (`banyand/internal/storage/segment.go`) and 
reopens them on the next access; it says nothing about window completeness.
+- **The only definitive boundary is TTL removal** (`retentionTask`, cron `5 
0`), which deletes the whole segment — far too late to be a finalization 
trigger.
+
+> **Design decision (accepted):** Drive the gating pass from an **event-time 
watermark plus a settling grace period**, not from a (non-existent) seal event. 
The rotation path already tracks the maximum observed event time 
(`latestTickTime`, fed by `Tick`, `rotation.go`). A segment is treated as 
**settled** once `watermark > segment.End + finalize_grace`, where 
`finalize_grace` is `TracePipelineConfig.finalize_grace` — a configured 
expected-late-arrival lag (engine default `5m` if unset). [...]
+
+```mermaid
+flowchart TD
+    WM["Event-time watermark (latestTickTime) advances"]
+    WM -->|"post-trace scheduler cron tick"| SEG["For each segment with 
watermark past segment.End + finalize_grace, not yet finalized<br/>walk each 
(segment, shard) tsTable's parts via the trace Visitor<br/>apply pipeline 
filter once: trace treated as complete, verdict final"]
+    SEG --> RW["Rewrite to reduced parts + mark segment finalized"]
+```
+
+1. The scheduler runs the pipeline's plugin gating policy across the settled 
segment's per-shard `tsTable` parts (a `tsTable` is scoped per `(segment, 
shard)`).
+
+2. Because the watermark is past `segment.End + finalize_grace`, the segment 
is very unlikely to gain more spans, so the verdict is stable: this pass 
decides whether a trace survives at all. Surviving traces continue through the 
lifecycle and are filtered again at each migration-out boundary by the relevant 
`StageRule` (§7.2).
+
+3. The `finalize_grace` period trades latency for completeness: a larger 
`finalize_grace` catches more late spans before finalizing but delays space 
reclamation. The cron tick doubles as a catch-up sweep — after a node restart 
it still finds settled-but-unfinalized segments and applies the filter, after 
which normal retention/GC proceeds.
+
+## Step-by-Step Execution Sequence
+
+To illustrate the relationship, here is the complete processing loop executed 
by the post-trace engine on a BanyanDB Data Node across a trace's lifetime 
(Example trace from Scenario 6.1: `5fcdb353-…`, `POST /test` on `agent::app`, 
`is_error=1`, with `enabled_events = [PIPELINE_EVENT_MERGE, 
PIPELINE_EVENT_FINALIZE]`):
+
+```mermaid
+flowchart TD
+    subgraph A["A. In-merge filter at HOT-PHASE LSM COMPACTION 
(PIPELINE_EVENT_MERGE enabled)"]
+        A1["1. mergeBlocks streams blocks ordered by trace_id 
(banyand/trace/merger.go).<br/>Maturity check: traceMaxTs (= 
timestampsMetadata.max) older than now − merge_grace (30s)?"]
+        A1 -->|"NO"| A2["Pass blocks through unchanged; defer the verdict"]
+        A1 -->|"YES (stopped growing)"| A3["Engine builds a projected 
TraceBatch; plugin Decide returns a keep-mask.<br/>The §6.1 
segment-tail-sampler keeps this trace because is_error is set (config keeps 
errors);<br/>a healthy trace below the 0.500s threshold with no matching tag 
would fall to the 0.1 sample.<br/>Verdict for 5fcdb353-…: KEEP (drops here 
reclaim space during routine compaction)"]
+    end
+    subgraph B["B. Plugin gating pass at HOT FINALIZATION 
(PIPELINE_EVENT_FINALIZE enabled, once per settled Hot segment)"]
+        B2["2. Post-trace scheduler tick observes a segment with watermark 
past segment.End + finalize_grace (300s), not yet finalized"]
+        B2 --> B3["3. Pick the active TracePipelineConfig for (group, schema, 
stage); match targeting fields (schema_names / schema_name_regex). No match? 
Skip."]
+        B3 --> B4["4. For each surviving trace_id (not already dropped at A), 
evaluate the plugin gating policy — same as step 1.<br/>Surviving traces 
continue into the stage lifecycle; failures are dropped from the segment."]
+    end
+    subgraph C["C. Per-stage retention at MIGRATION-OUT (once per stage 
boundary, when a StageRule has a plugin)"]
+        C5["5. Source stage migrates to the next stage (e.g. Hot → Warm after 
the 1-day TTL). Pre-migration rewrite reads each source part with blockReader 
and runs the source stage's retention plugin, returning a keep-mask per 
trace_id:<br/>Hot → Warm (hot StageRule.plugins, config min_duration 0.100s): 
duration ≥ 0.100s? (no, only 4ms); keep_errors AND is_error? YES → Keep, 
migrate<br/>Warm → Cold (warm StageRule.plugins, config keep_errors only): 
keep_errors AND is_error? YES → Keep,  [...]
+        C5 --> C6["6. RETENTION OUTCOME: the error trace survives the gating 
plugin at A and B, then the stage retention plugin keeps it (keep_errors) at 
every migration boundary in C.<br/>Retained Hot → Warm → Cold; written into the 
Cold (type=cold) parts. Cold stage TTL = 30 days (per the group's 
ResourceOpts.stages)."]
+    end
+    A3 --> B2
+    B4 --> C5
+```
+
+## References
+
+- OpenTelemetry Collector — Tail Sampling Processor (`tailsamplingprocessor`): 
the keep-all-errors / latency / probabilistic-policy model a typical sampler 
plugin implements.
+- L. Zhang, Z. Xie, V. Anand, Y. Vigfusson, J. Mace. "The Benefit of 
Hindsight: Tracing Edge-Cases in Distributed Systems." NSDI '23 — motivation 
for post-hoc / retroactive trace retention.
+- BanyanDB schema and query model: 
`api/proto/banyandb/common/v1/common.proto`, 
`api/proto/banyandb/trace/v1/query.proto`.
+- Example data source: the `skywalking-showcase` BanyanDB cluster — groups 
`sw_trace` (schema `segment`) and `sw_zipkinTrace` (schema `zipkin_span`). All 
scenario group/schema/tag names, lifecycle stages, latencies, and retention 
outcomes are derived from real traces sampled from this cluster via the 
`/v1/trace/data` API.
+- Native plugin sampler (§2.5): the vectorized batch reuses BanyanDB's own 
native trace columnar block (`banyand/trace/block.go` — `block`/`tag`) and the 
existing tag projection (`blockMetadata.tagProjection`, 
`pkg/query/model.TagProjection`), not a new columnar form. External grounding: 
Go [`plugin` package docs](https://pkg.go.dev/plugin) (toolchain/build 
version-lock); Apache Arrow [columnar 
format](https://arrow.apache.org/docs/format/Columnar.html) and Apache 
DataFusion [scalar UDFs [...]
diff --git a/pkg/pipeline/sdk/_example/segment-tail-sampler/README.md 
b/pkg/pipeline/sdk/_example/segment-tail-sampler/README.md
new file mode 100644
index 000000000..337746726
--- /dev/null
+++ b/pkg/pipeline/sdk/_example/segment-tail-sampler/README.md
@@ -0,0 +1,67 @@
+# segment-tail-sampler (reference plugin)
+
+A worked example of a BanyanDB post-trace sampler plugin built on
+[`pkg/pipeline/sdk`](../../). It implements the Scenario 6.1 (`sw_trace`) 
sampler
+from 
[`docs/design/post-trace-pipeline.md`](../../../../../docs/design/post-trace-pipeline.md)
+and exists to show plugin authors three things:
+
+1. **Read the proto config.** `SamplerPlugin.config` is a
+   `google.protobuf.Struct`. The engine serializes it to canonical JSON and
+   passes it to `NewSampler([]byte)`, which unmarshals it into a typed 
`config`.
+2. **Declare a projection.** `Project()` returns only the tag columns the
+   verdict reads (the error tag + each rule's tag key), and opts into the
+   span-id column only when a span-count rule is configured. Span bodies are
+   never read, so the merge raw fast path is preserved.
+3. **Extract tags and spans.** `Decide` decodes tag values by their
+   `pbv1.ValueType` (via `TagColumn.At`) and reads the span-id column
+   (`TraceBlock.Len` / `SpanIDs`).
+
+It is the `sampler` kind of the generic `Plugin`: because `sdk.Sampler` embeds
+`sdk.Plugin`, the sampler also implements `Kind() sdk.Kind` (returning
+`sdk.KindSampler`). In a `TracePipelineConfig` or `StageRule` it is one link in
+an ordered `plugins` chain (a sequential pipe); a chain of samplers keeps a
+trace only if every link keeps it.
+
+## Keep logic
+
+A trace is kept when **any** sure-keep rule matches; otherwise a deterministic
+hash of `trace_id` admits a fraction of the healthy remainder:
+
+1. `duration ≥ duration_threshold` (free from the intrinsic `MinTS`/`MaxTS`).
+2. `keep_errors` and the error tag is truthy on any span.
+3. any `keep_tag_rules` entry matches any span (`exists` / `equals` / `in` / 
`regex`).
+4. `min_span_count` configured and the trace has at least that many spans.
+5. else `hash(trace_id) < healthy_sample_rate`.
+
+## Config
+
+Set as the `config` `Struct` of a `sampler` plugin link in the
+`TracePipelineConfig`'s `plugins` chain; shown here as the JSON the engine 
hands
+to the plugin:
+
+```json
+{
+  "duration_threshold": "0.500s",
+  "keep_errors": true,
+  "error_tag": "is_error",
+  "healthy_sample_rate": 0.1,
+  "keep_tag_rules": [
+    { "tag_key": "db.type",  "equals": "PostgreSQL" },
+    { "tag_key": "mq.queue", "equals": "queue-songs-ping" }
+  ]
+}
+```
+
+## Build
+
+Build with the **same Go toolchain and the same pinned `pkg/pipeline/sdk`** as
+the running data node (Go's `plugin` package requires it):
+
+```sh
+go build -buildmode=plugin -trimpath \
+  -o segment-tail-sampler.so \
+  ./pkg/pipeline/sdk/_example/segment-tail-sampler
+```
+
+Then place `segment-tail-sampler.so` in the data node's trusted plugin
+directory and reference it by `path` in `SamplerPlugin`.
diff --git a/pkg/pipeline/sdk/_example/segment-tail-sampler/main.go 
b/pkg/pipeline/sdk/_example/segment-tail-sampler/main.go
new file mode 100644
index 000000000..05f4748bc
--- /dev/null
+++ b/pkg/pipeline/sdk/_example/segment-tail-sampler/main.go
@@ -0,0 +1,321 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Command segment-tail-sampler is the reference post-trace sampler plugin from
+// docs/design/post-trace-pipeline.md §6.1. It is a worked example of the
+// pkg/pipeline/sdk contract and illustrates the three things a real plugin
+// must do:
+//
+//  1. Parse the operator-supplied config. SamplerPlugin.config is a
+//     google.protobuf.Struct; the engine serializes it to canonical JSON and
+//     hands it to NewSampler as bytes, which this plugin unmarshals into its
+//     own typed config.
+//  2. Declare a projection (Project) so the engine materializes only the
+//     columns the verdict reads — here a handful of tag columns, plus the
+//     span-id column only when a span-count rule is configured.
+//  3. Extract tags and spans from the vectorized batch in Decide — decoding 
tag
+//     values by their ValueType and reading the span-id column.
+//
+// Build it as a Go plugin (it is deliberately under an _example directory so
+// `go build ./...` and the linters skip it):
+//
+//     go build -buildmode=plugin -trimpath \
+//       -o segment-tail-sampler.so \
+//       ./pkg/pipeline/sdk/_example/segment-tail-sampler
+//
+// It must be built with the same Go toolchain and the same pinned
+// pkg/pipeline/sdk as the running data node (see §2.5).
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "hash/fnv"
+       "regexp"
+       "sort"
+       "strconv"
+       "time"
+
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pipeline/sdk"
+)
+
+// ABIVersion re-exports the SDK ABI version. The engine refuses to load the
+// plugin unless this equals its own compiled sdk.ABIVersion.
+var ABIVersion = sdk.ABIVersion
+
+// tagRule is one sure-keep tag predicate. Exactly one matcher field is 
honored,
+// checked in the order exists, equals, in, regex.
+type tagRule struct {
+       re     *regexp.Regexp
+       In     []string `json:"in"`
+       Regex  string   `json:"regex"`
+       TagKey string   `json:"tag_key"`
+       Equals string   `json:"equals"`
+       Exists bool     `json:"exists"`
+}
+
+// config is the JSON shape the operator sets in SamplerPlugin.config.
+type config struct {
+       DurationThreshold string    `json:"duration_threshold"`
+       ErrorTag          string    `json:"error_tag"`
+       KeepTagRules      []tagRule `json:"keep_tag_rules"`
+       HealthySampleRate float64   `json:"healthy_sample_rate"`
+       MinSpanCount      int       `json:"min_span_count"`
+       KeepErrors        bool      `json:"keep_errors"`
+}
+
+// segmentTailSampler keeps a trace when any sure-keep rule matches, and
+// otherwise admits a deterministic fraction of the healthy remainder.
+type segmentTailSampler struct {
+       errorTag          string
+       rules             []tagRule
+       requiredTags      []string
+       durationThreshold time.Duration
+       healthySampleRate float64
+       minSpanCount      int
+       keepErrors        bool
+       wantSpanIDs       bool
+}
+
+// NewSampler is the constructor symbol the engine looks up. It parses and
+// validates the operator config, compiles any regex matchers, and computes the
+// projection. A returned error rejects the plugin at admission.
+func NewSampler(configJSON []byte) (sdk.Sampler, error) {
+       var c config
+       if len(configJSON) > 0 {
+               if err := json.Unmarshal(configJSON, &c); err != nil {
+                       return nil, fmt.Errorf("segment-tail-sampler: invalid 
config JSON: %w", err)
+               }
+       }
+       if c.HealthySampleRate < 0 || c.HealthySampleRate > 1 {
+               return nil, fmt.Errorf("segment-tail-sampler: 
healthy_sample_rate %v out of [0,1]", c.HealthySampleRate)
+       }
+       s := &segmentTailSampler{
+               rules:             c.KeepTagRules,
+               healthySampleRate: c.HealthySampleRate,
+               keepErrors:        c.KeepErrors,
+               errorTag:          c.ErrorTag,
+               minSpanCount:      c.MinSpanCount,
+       }
+       if s.errorTag == "" {
+               s.errorTag = "is_error"
+       }
+       if c.DurationThreshold != "" {
+               d, err := time.ParseDuration(c.DurationThreshold)
+               if err != nil {
+                       return nil, fmt.Errorf("segment-tail-sampler: invalid 
duration_threshold %q: %w", c.DurationThreshold, err)
+               }
+               if d <= 0 {
+                       return nil, fmt.Errorf("segment-tail-sampler: 
duration_threshold must be positive, got %v", d)
+               }
+               s.durationThreshold = d
+       }
+
+       // Build the projection: the error tag (when keep_errors is set) and 
every
+       // rule's tag key. Compile regex matchers once, here, not per batch.
+       tagSet := make(map[string]struct{})
+       if s.keepErrors {
+               tagSet[s.errorTag] = struct{}{}
+       }
+       for i := range s.rules {
+               if s.rules[i].TagKey == "" {
+                       return nil, fmt.Errorf("segment-tail-sampler: 
keep_tag_rules[%d] has empty tag_key", i)
+               }
+               if s.rules[i].Regex != "" {
+                       re, err := regexp.Compile(s.rules[i].Regex)
+                       if err != nil {
+                               return nil, fmt.Errorf("segment-tail-sampler: 
keep_tag_rules[%d] bad regex %q: %w", i, s.rules[i].Regex, err)
+                       }
+                       s.rules[i].re = re
+               }
+               tagSet[s.rules[i].TagKey] = struct{}{}
+       }
+       s.requiredTags = make([]string, 0, len(tagSet))
+       for k := range tagSet {
+               s.requiredTags = append(s.requiredTags, k)
+       }
+       // Stable order keeps Project() reproducible across runs (Go map 
iteration is
+       // randomized); the engine treats Tags as a set, but logs, tests, and 
caches
+       // benefit from determinism.
+       sort.Strings(s.requiredTags)
+       // A span-count rule reads the span-id column, so project it on demand.
+       s.wantSpanIDs = s.minSpanCount > 0
+       return s, nil
+}
+
+// Kind reports the sampler kind, satisfying the generic sdk.Plugin interface
+// that sdk.Sampler embeds.
+func (s *segmentTailSampler) Kind() sdk.Kind { return sdk.KindSampler }
+
+// Project declares the columns the verdict reads: the rule/error tag columns,
+// plus the span-id column only when a span-count rule is configured. Span
+// bodies are never read, so Spans stays false and the merge raw fast path is
+// preserved.
+func (s *segmentTailSampler) Project() sdk.Projection {
+       return sdk.Projection{Tags: s.requiredTags, SpanIDs: s.wantSpanIDs}
+}
+
+// Decide returns a keep-mask aligned to batch.Traces. The batch is read-only.
+func (s *segmentTailSampler) Decide(batch *sdk.TraceBatch) (sdk.Verdict, 
error) {
+       keep := make([]bool, len(batch.Traces))
+       for i := range batch.Traces {
+               k, err := s.keepTrace(&batch.Traces[i])
+               if err != nil {
+                       return sdk.Verdict{}, err
+               }
+               keep[i] = k
+       }
+       return sdk.Verdict{Keep: keep}, nil
+}
+
+// Close releases resources; this sampler holds none.
+func (s *segmentTailSampler) Close() error { return nil }
+
+// keepTrace applies the sure-keep rules, then the deterministic healthy 
sample.
+func (s *segmentTailSampler) keepTrace(b *sdk.TraceBlock) (bool, error) {
+       // Duration is free from the intrinsic MinTS/MaxTS — no decode.
+       if s.durationThreshold > 0 && time.Duration(b.MaxTS-b.MinTS) >= 
s.durationThreshold {
+               return true, nil
+       }
+       // Error keep: decode the error tag column and look for any truthy row.
+       if s.keepErrors {
+               hit, err := s.hasError(b)
+               if err != nil {
+                       return false, err
+               }
+               if hit {
+                       return true, nil
+               }
+       }
+       // Sure-keep tag rules.
+       for i := range s.rules {
+               hit, err := matchRule(b, &s.rules[i])
+               if err != nil {
+                       return false, err
+               }
+               if hit {
+                       return true, nil
+               }
+       }
+       // Span-count rule: read the span-id column the projection requested.
+       if s.minSpanCount > 0 && b.Len() >= s.minSpanCount {
+               return true, nil
+       }
+       // Healthy remainder: deterministic hash(trace_id) < rate, stable across
+       // re-evaluation at merge and finalization.
+       if s.healthySampleRate > 0 && sampleFraction(b.TraceID) < 
s.healthySampleRate {
+               return true, nil
+       }
+       return false, nil
+}
+
+// hasError reports whether the error tag is truthy on any span row.
+func (s *segmentTailSampler) hasError(b *sdk.TraceBlock) (bool, error) {
+       col := b.Tag(s.errorTag)
+       if col == nil {
+               return false, nil
+       }
+       for row := range col.Values {
+               v, err := col.At(row)
+               if err != nil {
+                       return false, err
+               }
+               if v.IsNull() {
+                       continue
+               }
+               switch v.ValueType() {
+               case pbv1.ValueTypeInt64:
+                       if v.Int64() != 0 {
+                               return true, nil
+                       }
+               case pbv1.ValueTypeStr:
+                       if str := v.Str(); str == "true" || str == "1" {
+                               return true, nil
+                       }
+               default:
+               }
+       }
+       return false, nil
+}
+
+// matchRule reports whether the rule matches any span row of the trace.
+func matchRule(b *sdk.TraceBlock, r *tagRule) (bool, error) {
+       col := b.Tag(r.TagKey)
+       if col == nil {
+               return false, nil
+       }
+       for row := range col.Values {
+               v, err := col.At(row)
+               if err != nil {
+                       return false, err
+               }
+               if v.IsNull() {
+                       continue
+               }
+               if r.Exists {
+                       return true, nil
+               }
+               str := stringOf(v)
+               switch {
+               case r.Equals != "":
+                       if str == r.Equals {
+                               return true, nil
+                       }
+               case len(r.In) > 0:
+                       for _, candidate := range r.In {
+                               if str == candidate {
+                                       return true, nil
+                               }
+                       }
+               case r.re != nil:
+                       if r.re.MatchString(str) {
+                               return true, nil
+                       }
+               }
+       }
+       return false, nil
+}
+
+// stringOf renders a decoded value as a string for matching against the rule's
+// string predicates.
+func stringOf(v sdk.Value) string {
+       switch v.ValueType() {
+       case pbv1.ValueTypeStr:
+               return v.Str()
+       case pbv1.ValueTypeInt64, pbv1.ValueTypeTimestamp:
+               return strconv.FormatInt(v.Int64(), 10)
+       case pbv1.ValueTypeFloat64:
+               return strconv.FormatFloat(v.Float64(), 'g', -1, 64)
+       case pbv1.ValueTypeBinaryData:
+               return string(v.Bytes())
+       default:
+               return ""
+       }
+}
+
+// sampleFraction maps a trace_id to a stable fraction in [0,1) via FNV-1a, so
+// the keep decision is deterministic and reproducible across passes. The top 
53
+// bits fill a float64 mantissa exactly (the technique math/rand uses), so the
+// result is strictly below 1 and a healthy_sample_rate of 1.0 keeps every 
trace.
+func sampleFraction(traceID string) float64 {
+       h := fnv.New64a()
+       _, _ = h.Write([]byte(traceID))
+       return float64(h.Sum64()>>11) / (1 << 53)
+}
+
+func main() {}
diff --git a/pkg/pipeline/sdk/decode.go b/pkg/pipeline/sdk/decode.go
new file mode 100644
index 000000000..101cf695f
--- /dev/null
+++ b/pkg/pipeline/sdk/decode.go
@@ -0,0 +1,129 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sdk
+
+import (
+       "fmt"
+
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// Value is a single decoded tag value. The accessor matching ValueType returns
+// the decoded datum; the others return their zero value. A nil raw value
+// decodes to a null Value (IsNull reports true).
+type Value struct {
+       str       string
+       bytes     []byte
+       strArr    []string
+       intArr    []int64
+       int64Val  int64
+       floatVal  float64
+       valueType pbv1.ValueType
+       null      bool
+}
+
+// IsNull reports whether the tag was absent on this row.
+func (v Value) IsNull() bool { return v.null }
+
+// ValueType returns the type tag of the decoded value.
+func (v Value) ValueType() pbv1.ValueType { return v.valueType }
+
+// Str returns the string value (valid for ValueTypeStr).
+func (v Value) Str() string { return v.str }
+
+// Int64 returns the integer value (valid for ValueTypeInt64 and, as unix
+// nanoseconds, ValueTypeTimestamp).
+func (v Value) Int64() int64 { return v.int64Val }
+
+// Float64 returns the float value (valid for ValueTypeFloat64).
+func (v Value) Float64() float64 { return v.floatVal }
+
+// Bytes returns the raw value (valid for ValueTypeBinaryData).
+func (v Value) Bytes() []byte { return v.bytes }
+
+// StrArr returns the string-array value (valid for ValueTypeStrArr).
+func (v Value) StrArr() []string { return v.strArr }
+
+// Int64Arr returns the integer-array value (valid for ValueTypeInt64Arr).
+func (v Value) Int64Arr() []int64 { return v.intArr }
+
+// At decodes the value at the given span row. A nil element decodes to a null
+// Value. It returns an error if row is out of range.
+func (c *TagColumn) At(row int) (Value, error) {
+       if row < 0 || row >= len(c.Values) {
+               return Value{}, fmt.Errorf("tag %q: row %d out of range 
[0,%d)", c.Name, row, len(c.Values))
+       }
+       return DecodeTagValue(c.ValueType, c.Values[row])
+}
+
+// DecodeTagValue decodes one marshaled tag value, as stored in the native 
trace
+// block, into a typed Value. It mirrors the engine's own per-row decode so a
+// plugin never needs to import banyand/trace internals. A nil raw value yields
+// a null Value.
+func DecodeTagValue(valueType pbv1.ValueType, raw []byte) (Value, error) {
+       if raw == nil {
+               return Value{valueType: valueType, null: true}, nil
+       }
+       switch valueType {
+       case pbv1.ValueTypeStr:
+               return Value{valueType: valueType, str: string(raw)}, nil
+       case pbv1.ValueTypeInt64:
+               if len(raw) != 8 {
+                       return Value{}, fmt.Errorf("int64: expected 8 bytes, 
got %d", len(raw))
+               }
+               return Value{valueType: valueType, int64Val: 
convert.BytesToInt64(raw)}, nil
+       case pbv1.ValueTypeFloat64:
+               if len(raw) != 8 {
+                       return Value{}, fmt.Errorf("float64: expected 8 bytes, 
got %d", len(raw))
+               }
+               return Value{valueType: valueType, floatVal: 
convert.BytesToFloat64(raw)}, nil
+       case pbv1.ValueTypeBinaryData:
+               return Value{valueType: valueType, bytes: raw}, nil
+       case pbv1.ValueTypeTimestamp:
+               if len(raw) != 8 {
+                       return Value{}, fmt.Errorf("timestamp: expected 8 
bytes, got %d", len(raw))
+               }
+               return Value{valueType: valueType, int64Val: 
convert.BytesToInt64(raw)}, nil
+       case pbv1.ValueTypeInt64Arr:
+               if len(raw)%8 != 0 {
+                       return Value{}, fmt.Errorf("int64 array: length %d is 
not a multiple of 8", len(raw))
+               }
+               values := make([]int64, 0, len(raw)/8)
+               for i := 0; i < len(raw); i += 8 {
+                       values = append(values, 
convert.BytesToInt64(raw[i:i+8]))
+               }
+               return Value{valueType: valueType, intArr: values}, nil
+       case pbv1.ValueTypeStrArr:
+               var values []string
+               for idx := 0; idx < len(raw); {
+                       end, next, err := encoding.UnmarshalVarArray(raw, idx)
+                       if err != nil {
+                               return Value{}, fmt.Errorf("str array: %w", err)
+                       }
+                       values = append(values, string(raw[idx:end]))
+                       idx = next
+               }
+               return Value{valueType: valueType, strArr: values}, nil
+       case pbv1.ValueTypeUnknown:
+               return Value{valueType: valueType, null: true}, nil
+       default:
+               return Value{}, fmt.Errorf("unsupported value type: %d", 
valueType)
+       }
+}
diff --git a/pkg/pipeline/sdk/sdk.go b/pkg/pipeline/sdk/sdk.go
new file mode 100644
index 000000000..0871e0da8
--- /dev/null
+++ b/pkg/pipeline/sdk/sdk.go
@@ -0,0 +1,207 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package sdk defines the contract between the BanyanDB post-trace pipeline
+// engine and user-supplied native Go plugins (each a .so loaded in-process via
+// the standard Go plugin package). It is the single pinned surface plugin
+// authors build against; see docs/design/post-trace-pipeline.md §2.5.
+//
+// A plugin is one kind of the generic Plugin interface. Every kind shares 
Kind,
+// Project, and Close; each kind adds its own processing method (the sampler 
kind
+// adds Decide — see Sampler). Today the only kind is the sampler. A plugin is 
a
+// package main built with `-buildmode=plugin` that exports exactly two symbols
+// (the constructor name is the kind's convention — NewSampler for the sampler
+// kind):
+//
+//     var  ABIVersion int                          // == sdk.ABIVersion
+//     func NewSampler(config []byte) (sdk.Sampler, error)
+//
+// The engine refuses to load a plugin whose ABIVersion differs from its own
+// compiled sdk.ABIVersion, turning a silent miscompile into a fail-fast error.
+// The config bytes are the canonical JSON serialization of the
+// google.protobuf.Struct set in the plugin's proto payload (e.g.
+// SamplerPlugin.config); the plugin unmarshals them into its own typed config.
+//
+// In a TracePipelineConfig or StageRule, plugins are wired as an ordered chain
+// (a sequential pipe): links run in declared order, each processing what the
+// previous link kept, and a link that fails is bypassed. For an all-sampler
+// chain this is the conjunction of the links' keep/drop verdicts.
+//
+// The boundary deliberately crosses only stdlib slices plus the engine's
+// stable, byte-sized pbv1.ValueType enum, so no banyand/trace-internal struct
+// is shared across the .so boundary.
+package sdk
+
+import (
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// ABIVersion is compiled into the host and must be re-exported, unchanged, by
+// every plugin. The engine refuses to load a plugin whose ABIVersion differs.
+const ABIVersion = 1
+
+// Kind identifies a Plugin's role in a chain. It mirrors the set arm of the
+// proto Plugin.kind oneof and lets the engine label and cross-check a plugin
+// before type-asserting it to the matching kind interface. New kinds are added
+// here in lockstep with new oneof arms.
+type Kind uint8
+
+const (
+       // KindUnspecified is the zero value; a real plugin never reports it.
+       KindUnspecified Kind = iota
+       // KindSampler is a plugin that owns a keep/drop verdict (see Sampler).
+       KindSampler
+)
+
+// Plugin is the common interface every plugin kind satisfies — the generic 
link
+// type the engine handles uniformly. The engine constructs a Plugin, checks
+// Kind against the proto oneof arm that named it, then type-asserts to that
+// kind's interface (e.g. Sampler) for the kind-specific call. Project and 
Close
+// are shared by every kind; the per-kind processing method lives on the kind
+// interface. Each kind keeps its own constructor symbol convention (the 
sampler
+// kind defaults to NewSampler; a future kind would use its own, e.g.
+// NewTransformer).
+type Plugin interface {
+       // Kind reports which plugin kind this is, for engine bookkeeping and 
as a
+       // cross-check against the proto oneof arm. It must be constant for the
+       // plugin's lifetime.
+       Kind() Kind
+
+       // Project is the column-selection handshake, called once at load. The
+       // engine honors it for the plugin's lifetime: Tags drives the native 
tag
+       // projection (only those tag columns are decoded); SpanIDs and Spans 
gate
+       // the spans stream. Intrinsic columns (TraceID, MinTS, MaxTS) are 
always
+       // present regardless of the projection.
+       Project() Projection
+
+       // Close releases any resources the plugin holds. It is called once 
when the
+       // pipeline config is removed; because Go plugins cannot be unloaded, 
the .so
+       // itself stays mapped until the process restarts.
+       Close() error
+}
+
+// Sampler is the keep/drop kind of Plugin (Kind reports KindSampler). The 
engine
+// calls Project once at load, then Decide once per batch, then Close at 
unload.
+// In a chain it is a conjunction link: each Sampler narrows the traces the 
next
+// link sees.
+type Sampler interface {
+       Plugin
+
+       // Decide receives a vectorized batch of assembled per-trace blocks and
+       // returns a keep-mask aligned to batch.Traces. The batch is READ-ONLY:
+       // Decide must not mutate any slice it receives. The keep-mask is the 
only
+       // output channel; the engine writes retained traces from its own 
untouched
+       // block data, so a returned error or a length-mismatched verdict makes 
the
+       // engine fail open (retain the whole batch).
+       Decide(batch *TraceBatch) (Verdict, error)
+}
+
+// Projection is the plugin's up-front column request — one handshake covering
+// every optional column. Intrinsic columns (TraceID, MinTS, MaxTS) are always
+// materialized and are not listed here.
+type Projection struct {
+       // Tags names the tag columns to decode into TraceBlock.Tags. Empty 
means no
+       // tag columns are decoded.
+       Tags []string
+       // SpanIDs opts in to the span-id column. Default false. Span ids and 
span
+       // bodies share one encoded data block in the native layout, so 
requesting
+       // span ids forces a read of the spans stream — it is not free metadata.
+       SpanIDs bool
+       // Spans opts in to the heavy span-body column. Default false: the 
engine
+       // leaves TraceBlock.Spans nil and, on the merge raw fast path, never 
decodes
+       // span bodies. Set true only when the verdict reads them.
+       Spans bool
+}
+
+// TraceBatch is a vectorized batch of assembled per-trace blocks. It is the
+// engine's native columnar trace layout, shared read-only with the plugin.
+type TraceBatch struct {
+       // Traces holds one block per trace_id. The verdict's keep-mask is 
aligned
+       // to this slice.
+       Traces []TraceBlock
+}
+
+// TraceBlock mirrors the native trace block: every populated column is indexed
+// in lockstep by span row i in [0, Len). Intrinsic columns are always set;
+// Tags, SpanIDs, and Spans appear only as requested by Project. Slices are
+// shared with the engine and must be treated as read-only.
+type TraceBlock struct {
+       // TraceID identifies the trace; the keep/drop verdict is per trace_id.
+       TraceID string
+       // Tags holds the projected tag columns, one per Projection.Tags entry 
that
+       // the trace actually carries.
+       Tags []TagColumn
+       // SpanIDs is the row-aligned span-id column; nil unless 
Projection.SpanIDs.
+       SpanIDs []string
+       // Spans is the row-aligned span-body column (opaque marshaled bytes); 
nil
+       // unless Projection.Spans.
+       Spans [][]byte
+       // MinTS is the earliest span start in unix nanoseconds.
+       MinTS int64
+       // MaxTS is the latest span end in unix nanoseconds; trace duration is
+       // MaxTS - MinTS.
+       MaxTS int64
+}
+
+// Len reports the number of span rows in the block. It is available only when 
a
+// row-indexed column (a projected tag, SpanIDs, or Spans) was materialized;
+// with a metadata-only projection it returns 0.
+func (b *TraceBlock) Len() int {
+       if len(b.SpanIDs) > 0 {
+               return len(b.SpanIDs)
+       }
+       if len(b.Spans) > 0 {
+               return len(b.Spans)
+       }
+       for i := range b.Tags {
+               if n := len(b.Tags[i].Values); n > 0 {
+                       return n
+               }
+       }
+       return 0
+}
+
+// Tag returns the projected tag column with the given name, or nil if the 
trace
+// did not carry it (or the plugin did not project it).
+func (b *TraceBlock) Tag(name string) *TagColumn {
+       for i := range b.Tags {
+               if b.Tags[i].Name == name {
+                       return &b.Tags[i]
+               }
+       }
+       return nil
+}
+
+// TagColumn mirrors the native tag: a row-aligned column of marshaled values
+// plus the value type needed to decode them via At.
+type TagColumn struct {
+       // Name is the tag key.
+       Name string
+       // Values holds one marshaled value per span row; a nil element means 
the
+       // tag is absent on that row.
+       Values [][]byte
+       // ValueType is the engine's stable, byte-sized type tag for every 
value in
+       // the column.
+       ValueType pbv1.ValueType
+}
+
+// Verdict is the per-trace decision, aligned to TraceBatch.Traces.
+type Verdict struct {
+       // Keep must have the same length as the batch; Keep[i] true retains
+       // Traces[i]. A length mismatch makes the engine fail open.
+       Keep []bool
+}
diff --git a/pkg/pipeline/sdk/sdk_test.go b/pkg/pipeline/sdk/sdk_test.go
new file mode 100644
index 000000000..7cff3a0bc
--- /dev/null
+++ b/pkg/pipeline/sdk/sdk_test.go
@@ -0,0 +1,164 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sdk_test
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pipeline/sdk"
+)
+
+func strArr(values ...string) []byte {
+       var dest []byte
+       for _, v := range values {
+               dest = encoding.MarshalVarArray(dest, []byte(v))
+       }
+       return dest
+}
+
+func int64Arr(values ...int64) []byte {
+       var dest []byte
+       for _, v := range values {
+               dest = append(dest, convert.Int64ToBytes(v)...)
+       }
+       return dest
+}
+
+func TestDecodeTagValue(t *testing.T) {
+       tests := []struct {
+               check func(*testing.T, sdk.Value)
+               name  string
+               raw   []byte
+               vt    pbv1.ValueType
+       }{
+               {
+                       name: "str", vt: pbv1.ValueTypeStr, raw: 
[]byte("PostgreSQL"),
+                       check: func(t *testing.T, v sdk.Value) { 
assert.Equal(t, "PostgreSQL", v.Str()) },
+               },
+               {
+                       name: "int64", vt: pbv1.ValueTypeInt64, raw: 
convert.Int64ToBytes(-42),
+                       check: func(t *testing.T, v sdk.Value) { 
assert.Equal(t, int64(-42), v.Int64()) },
+               },
+               {
+                       name: "float64", vt: pbv1.ValueTypeFloat64, raw: 
convert.Float64ToBytes(3.5),
+                       check: func(t *testing.T, v sdk.Value) { 
assert.InDelta(t, 3.5, v.Float64(), 1e-9) },
+               },
+               {
+                       name: "binary", vt: pbv1.ValueTypeBinaryData, raw: 
[]byte{0x01, 0x02, 0x03},
+                       check: func(t *testing.T, v sdk.Value) { 
assert.Equal(t, []byte{0x01, 0x02, 0x03}, v.Bytes()) },
+               },
+               {
+                       name: "timestamp", vt: pbv1.ValueTypeTimestamp, raw: 
convert.Int64ToBytes(1_700_000_000_000_000_000),
+                       check: func(t *testing.T, v sdk.Value) { 
assert.Equal(t, int64(1_700_000_000_000_000_000), v.Int64()) },
+               },
+               {
+                       name: "int64_arr", vt: pbv1.ValueTypeInt64Arr, raw: 
int64Arr(1, 2, 3),
+                       check: func(t *testing.T, v sdk.Value) { 
assert.Equal(t, []int64{1, 2, 3}, v.Int64Arr()) },
+               },
+               {
+                       name: "str_arr", vt: pbv1.ValueTypeStrArr, raw: 
strArr("a", "bb", "ccc"),
+                       check: func(t *testing.T, v sdk.Value) { 
assert.Equal(t, []string{"a", "bb", "ccc"}, v.StrArr()) },
+               },
+               {
+                       name: "nil_is_null", vt: pbv1.ValueTypeStr, raw: nil,
+                       check: func(t *testing.T, v sdk.Value) { assert.True(t, 
v.IsNull()) },
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       v, err := sdk.DecodeTagValue(tt.vt, tt.raw)
+                       require.NoError(t, err)
+                       assert.Equal(t, tt.vt, v.ValueType())
+                       tt.check(t, v)
+               })
+       }
+}
+
+func TestDecodeTagValueErrors(t *testing.T) {
+       _, err := sdk.DecodeTagValue(pbv1.ValueTypeInt64Arr, []byte{0x01, 0x02})
+       require.Error(t, err, "int64 array length not a multiple of 8 must 
error")
+
+       _, err = sdk.DecodeTagValue(pbv1.ValueType(255), []byte("x"))
+       require.Error(t, err, "unknown value type must error")
+
+       for _, vt := range []pbv1.ValueType{pbv1.ValueTypeInt64, 
pbv1.ValueTypeFloat64, pbv1.ValueTypeTimestamp} {
+               _, err = sdk.DecodeTagValue(vt, []byte{0x01, 0x02, 0x03})
+               require.Errorf(t, err, "fixed-width type %d with fewer than 8 
bytes must error", vt)
+       }
+}
+
+func TestTagColumnAt(t *testing.T) {
+       col := sdk.TagColumn{
+               Name:      "db.type",
+               ValueType: pbv1.ValueTypeStr,
+               Values:    [][]byte{[]byte("PostgreSQL"), nil, []byte("MySQL")},
+       }
+       v0, err := col.At(0)
+       require.NoError(t, err)
+       assert.Equal(t, "PostgreSQL", v0.Str())
+
+       v1, err := col.At(1)
+       require.NoError(t, err)
+       assert.True(t, v1.IsNull())
+
+       _, err = col.At(3)
+       require.Error(t, err, "row out of range must error")
+}
+
+func TestTraceBlockHelpers(t *testing.T) {
+       b := sdk.TraceBlock{
+               TraceID: "t-1",
+               MinTS:   100,
+               MaxTS:   2_802_000_000, // 2802ms in nanos
+               Tags: []sdk.TagColumn{
+                       {Name: "db.type", ValueType: pbv1.ValueTypeStr, Values: 
[][]byte{[]byte("PostgreSQL"), nil}},
+               },
+               SpanIDs: []string{"s-1", "s-2"},
+       }
+       assert.Equal(t, 2, b.Len(), "Len comes from the span-id column")
+       require.NotNil(t, b.Tag("db.type"))
+       assert.Nil(t, b.Tag("missing"))
+
+       // Len falls back to a projected tag column when only tags are present.
+       tagsOnly := sdk.TraceBlock{Tags: []sdk.TagColumn{{Name: "x", Values: 
[][]byte{nil, nil, nil}}}}
+       assert.Equal(t, 3, tagsOnly.Len())
+
+       // Metadata-only block reports 0 rows.
+       metaOnly := sdk.TraceBlock{TraceID: "t-2", MinTS: 1, MaxTS: 2}
+       assert.Equal(t, 0, metaOnly.Len())
+}
+
+type fakeSampler struct{}
+
+func (fakeSampler) Kind() sdk.Kind                              { return 
sdk.KindSampler }
+func (fakeSampler) Project() sdk.Projection                     { return 
sdk.Projection{} }
+func (fakeSampler) Decide(*sdk.TraceBatch) (sdk.Verdict, error) { return 
sdk.Verdict{}, nil }
+func (fakeSampler) Close() error                                { return nil }
+
+func TestSamplerIsPlugin(t *testing.T) {
+       var s sdk.Sampler = fakeSampler{}
+       var p sdk.Plugin = s // a Sampler is a Plugin
+       assert.Equal(t, sdk.KindSampler, p.Kind())
+       assert.NotEqual(t, sdk.KindUnspecified, p.Kind())
+}

Reply via email to