This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 5604682e0 Add storage-node post-trace pipeline: design, pipeline.v1
proto, Go SDK (#1189)
5604682e0 is described below
commit 5604682e06cb20b0032898947ec9ae78eb514796
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Jun 24 09:53:03 2026 +0800
Add storage-node post-trace pipeline: design, pipeline.v1 proto, Go SDK
(#1189)
---
.../banyandb/pipeline/v1/trace_pipeline.proto | 300 ++++++++++
docs/api-reference.md | 387 +++++++++++++
docs/design/post-trace-pipeline.md | 608 +++++++++++++++++++++
.../sdk/_example/segment-tail-sampler/README.md | 67 +++
.../sdk/_example/segment-tail-sampler/main.go | 321 +++++++++++
pkg/pipeline/sdk/decode.go | 129 +++++
pkg/pipeline/sdk/sdk.go | 207 +++++++
pkg/pipeline/sdk/sdk_test.go | 164 ++++++
8 files changed, 2183 insertions(+)
diff --git a/api/proto/banyandb/pipeline/v1/trace_pipeline.proto
b/api/proto/banyandb/pipeline/v1/trace_pipeline.proto
new file mode 100644
index 000000000..aae2159a3
--- /dev/null
+++ b/api/proto/banyandb/pipeline/v1/trace_pipeline.proto
@@ -0,0 +1,300 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+package banyandb.pipeline.v1;
+
+import "banyandb/common/v1/common.proto";
+import "google/api/annotations.proto";
+import "google/protobuf/duration.proto";
+import "google/protobuf/struct.proto";
+import "protoc-gen-openapiv2/options/annotations.proto";
+import "validate/validate.proto";
+
+option go_package =
"github.com/apache/skywalking-banyandb/api/proto/banyandb/pipeline/v1";
+option java_package = "org.apache.skywalking.banyandb.pipeline.v1";
+option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) =
{base_path: "/api"};
+
+// PipelineEvent identifies a pipeline-wide event that can be independently
+// enabled. Per-stage retention (StageRule.plugins) fires implicitly at the
+// stage's migration-out boundary and is not toggleable via this enum.
+enum PipelineEvent {
+ PIPELINE_EVENT_UNSPECIFIED = 0;
+ // In-merge filter during Hot-phase LSM compaction merges (Warm/Cold
+ // compactions stay lossless). Per-trace drops are gated by `merge_grace` so
+ // partial traces are not destroyed prematurely. Cheap, runs often; verdicts
+ // wait for trace maturity (see §7.1).
+ PIPELINE_EVENT_MERGE = 1;
+ // Tail-sampling gate at Hot-phase segment finalization, after the segment
+ // has settled (event-time watermark past `segment.End + finalize_grace`).
+ // Heavy but authoritative; sees the complete trace (see §7.3).
+ PIPELINE_EVENT_FINALIZE = 2;
+}
+
+// TracePipelineConfig is the root configuration for a storage-node trace
pipeline.
+// It reuses existing catalog identifiers (group via metadata, stage names,
schema
+// names) for targeting instead of declaring a parallel metadata model.
+//
+// The pipeline has up to three filter points:
+// 1. PIPELINE_EVENT_MERGE — in-merge filter during Hot-phase LSM compaction
(default).
+// 2. PIPELINE_EVENT_FINALIZE — tail-sampling gate at Hot-phase finalization.
+// 3. Per-stage retention via StageRule.plugins, applied at the stage's
+// migration-out boundary (when the segment migrates to the next stage).
+// Always implicit when any StageRule carries a plugin chain.
+// Events 1 and 2 are toggleable via `enabled_events`. The gating policy those
+// events evaluate is the `plugins` chain (a sequential pipe of native Go
+// plugins; today each link is a sampler).
+message TracePipelineConfig {
+ // Identity and revision tracking; metadata.group is the Group this pipeline
+ // lives in and applies to, consistent with every other schema resource.
+ // Required: every config needs a name/group for registry handling.
+ common.v1.Metadata metadata = 1 [(validate.rules).message = {required:
true}];
+ // Active status of the pipeline.
+ bool enabled = 2;
+ // Per-stage retention rules: which lifecycle stages this pipeline acts on,
+ // with the retention plugin chain for each. Each rule fires at its stage's
+ // migration-out boundary regardless of `enabled_events`. Empty means the
+ // only filters are the `enabled_events` events (no per-stage drop).
+ repeated StageRule stages = 3;
+ // Explicit schema names to target within the Group (exact match on
Metadata.name).
+ // Each entry must be non-empty; cross-element uniqueness is enforced
server-side.
+ repeated string schema_names = 4
[(validate.rules).repeated.items.string.min_len = 1];
+ // RE2 regular expression matched against schema names. A schema is targeted
if it
+ // is listed in schema_names OR matches this pattern. When both are empty,
every
+ // schema in the Group is targeted.
+ string schema_name_regex = 5;
+ // Gating policy: an ordered chain of plugins (a sequential pipe) evaluated
by
+ // any enabled event (PIPELINE_EVENT_MERGE and/or PIPELINE_EVENT_FINALIZE).
+ // Links run in declared order, each processing the traces the previous link
+ // kept; a link that fails is bypassed (fail-open). Today every link is a
+ // sampler, so the chain is the conjunction of the links' keep/drop verdicts.
+ // Empty means the only retention is the per-stage StageRule plugin chain(s)
+ // at migration-out.
+ repeated Plugin plugins = 6;
+ // Pipeline-wide events to run. Empty defaults to [PIPELINE_EVENT_MERGE] —
+ // the in-merge filter is on, the finalization gate is off. To enable the
+ // finalization gate, include PIPELINE_EVENT_FINALIZE; to disable the merge
+ // filter, list only [PIPELINE_EVENT_FINALIZE]; the explicit empty default
+ // value is also acceptable to mean "merge only". Each element must be a
+ // defined, non-UNSPECIFIED value; duplicates are normalized to a set
+ // server-side.
+ repeated PipelineEvent enabled_events = 7
[(validate.rules).repeated.items.enum = {
+ defined_only: true
+ not_in: [0]
+ }];
+ // Per-trace maturity window for the in-merge filter (§7.1). A trace is
+ // eligible for dropping during an LSM compaction merge only once its latest
+ // span timestamp is older than `now - merge_grace`; younger traces pass
+ // through the merge unchanged. Bounds the expected intra-trace span arrival
+ // spread (typically seconds). Used iff `enabled_events` contains
+ // PIPELINE_EVENT_MERGE. Strictly positive if set; engine default 30s if
unset.
+ google.protobuf.Duration merge_grace = 8 [(validate.rules).duration = {
+ gt: {seconds: 0}
+ }];
+ // Per-segment settling window for the scheduled finalization pass (§7.3). A
+ // segment is treated as settled, and the authoritative final filter runs,
+ // once the event-time watermark exceeds `segment.End + finalize_grace`.
+ // Bounds segment-wide late arrival (typically minutes). Used iff
+ // `enabled_events` contains PIPELINE_EVENT_FINALIZE. Strictly positive if
+ // set; engine default 5m if unset.
+ google.protobuf.Duration finalize_grace = 9 [(validate.rules).duration = {
+ gt: {seconds: 0}
+ }];
+}
+
+// StageRule binds the pipeline to one lifecycle stage of the targeted Group
+// and declares that stage's retention plugin chain. The rule fires at the
+// stage's migration-out boundary (i.e. when a segment migrates from this stage
+// to the next stage); routine compaction is governed by PIPELINE_EVENT_MERGE
on
+// TracePipelineConfig, not by StageRule.
+//
+// Per-stage retention uses the SAME chain mechanism as gating: each stage's
+// `plugins` chain owns the keep/drop verdict for traces leaving that stage. A
+// StageRule with an empty `plugins` chain has no filtering effect — every
trace
+// at this stage migrates unchanged. The "rising bar" across stages (Hot keeps
+// more, Cold keeps less) is expressed by each stage's plugin config (see §4.2
+// of the design doc), not by a fixed predicate vocabulary.
+message StageRule {
+ // Stage name from the Group's ResourceOpts.stages (e.g. "hot", "warm",
"cold").
+ // Must be non-empty; an empty stage name cannot match any lifecycle stage.
+ string stage = 1 [(validate.rules).string.min_len = 1];
+ // Per-stage retention chain: the ordered plugins (a sequential pipe) that
+ // decide keep/drop for traces leaving this stage at its migration-out
+ // boundary. Empty means no per-stage drop (every trace migrates). Same
+ // contract and composition as the gating chain (see Plugin).
+ repeated Plugin plugins = 2;
+}
+
+// Plugin is one link in a pipeline's processing chain — a generic, kind-tagged
+// envelope around a user-supplied native Go plugin. The set oneof arm selects
+// the kind; today the only kind is a sampler (SamplerPlugin). Adding a new
kind
+// is purely additive: define its payload message and add a new arm to `kind`,
+// leaving existing arms and field numbers untouched.
+//
+// A chain of Plugin (TracePipelineConfig.plugins, StageRule.plugins) is a
+// sequential pipe: links run in declared order, each link processes the traces
+// the previous link kept, and a link that fails is bypassed (its input passes
+// through unchanged). The plugin↔engine contract for each kind (the vectorized
+// batch, the projection handshake, the verdict) lives in the pinned Go SDK
+// module `pkg/pipeline/sdk`, not in this proto.
+message Plugin {
+ // Operator-facing identity for this link, used in diagnostics and admission
+ // errors. Must be non-empty; cross-element uniqueness within a chain is
+ // enforced server-side.
+ string name = 1 [(validate.rules).string.min_len = 1];
+ // Plugin kind. Exactly one arm must be set; the set arm is the kind
+ // discriminator. An unset kind is rejected at admission (protoc-gen-validate
+ // cannot mark a oneof required).
+ oneof kind {
+ // Sampler kind: owns a keep/drop verdict over a vectorized batch of
traces.
+ SamplerPlugin sampler = 2;
+ }
+}
+
+// SamplerPlugin configures a user-supplied native Go plugin (a .so loaded
+// in-process via the Go `plugin` package) that owns a keep/drop verdict over a
+// vectorized batch of traces. It is the sampler kind of Plugin
(Plugin.sampler)
+// and is the keep/drop link wherever a chain runs: in a TracePipelineConfig
+// chain it gates at the enabled PipelineEvent(s) (merge / finalization); in a
+// StageRule chain it is per-stage retention at a stage's migration-out
boundary.
+//
+// The full plugin↔engine contract (the vectorized batch type, the projection
+// handshake, and the verdict shape) lives in the pinned Go SDK module
+// `pkg/pipeline/sdk`, not in this proto; this message only locates and admits
+// the plugin. Three properties of that contract:
+// - Strong compatibility: the boundary exchanges only stdlib/primitive types
+// defined in the pinned SDK, so no third-party struct version is pinned
+// across the .so boundary. The plugin must be built with the SAME Go
+// toolchain, build tags, and flags (-trimpath, CGO) and the SAME pinned
SDK
+// as the running data node; `abi_version` is checked at load.
+// - Vectorized input: the plugin's Decide is called once per columnar batch
of
+// traces, not once per trace.
+// - Projection: the plugin declares the columns it needs (SDK Project →
+// Projection{Tags, SpanIDs, Spans}); the engine materializes only those
tag
+// columns and, only when requested, the spans stream — like a query
+// projection.
+//
+// Operational constraints (Go `plugin`): Linux/macOS only; plugins cannot be
+// unloaded, so changing one requires a node restart (no hot-reload); a plugin
+// panic is contained with recover() and fails open (the whole batch is
+// retained). Loading arbitrary code is operator-only and gated behind a server
+// flag plus a trusted plugin directory.
+message SamplerPlugin {
+ // Plugin .so filename, resolved within the data node's trusted plugin
+ // directory. The engine rejects any path that escapes that directory.
+ string path = 1 [(validate.rules).string.min_len = 1];
+ // Constructor symbol the engine looks up; defaults to "NewSampler" if empty.
+ string symbol = 2;
+ // ABI version the plugin was built against. The engine refuses to load the
+ // plugin unless this equals its own compiled sdk.ABIVersion.
+ uint32 abi_version = 3 [(validate.rules).uint32 = {gte: 1}];
+ // Plugin-defined configuration, set directly in the pipeline config as a
+ // structured object. The engine does not interpret its keys: it serializes
+ // the Struct to canonical JSON and hands the bytes to the plugin's
+ // constructor (SDK NewSampler([]byte)), which unmarshals them into the
+ // plugin's own typed config and validates them — a malformed config fails
+ // the load. Optional: a plugin that needs no configuration leaves it unset.
+ google.protobuf.Struct config = 4;
+}
+
+message TracePipelineRegistryServiceCreateRequest {
+ TracePipelineConfig trace_pipeline_config = 1 [(validate.rules).message =
{required: true}];
+}
+
+message TracePipelineRegistryServiceCreateResponse {
+ int64 mod_revision = 1;
+}
+
+message TracePipelineRegistryServiceUpdateRequest {
+ TracePipelineConfig trace_pipeline_config = 1 [(validate.rules).message =
{required: true}];
+}
+
+message TracePipelineRegistryServiceUpdateResponse {
+ int64 mod_revision = 1;
+}
+
+message TracePipelineRegistryServiceDeleteRequest {
+ common.v1.Metadata metadata = 1 [(validate.rules).message = {required:
true}];
+}
+
+message TracePipelineRegistryServiceDeleteResponse {
+ bool deleted = 1;
+ // delete_time is the server-assigned tombstone timestamp in unix nanos.
+ int64 delete_time = 2;
+ // mod_revision is the etcd revision of the tombstone; zero if the server
did not record one.
+ int64 mod_revision = 3;
+}
+
+message TracePipelineRegistryServiceGetRequest {
+ common.v1.Metadata metadata = 1 [(validate.rules).message = {required:
true}];
+}
+
+message TracePipelineRegistryServiceGetResponse {
+ TracePipelineConfig trace_pipeline_config = 1;
+}
+
+message TracePipelineRegistryServiceExistRequest {
+ common.v1.Metadata metadata = 1 [(validate.rules).message = {required:
true}];
+}
+
+message TracePipelineRegistryServiceExistResponse {
+ bool has_group = 1;
+ bool has_trace_pipeline_config = 2;
+}
+
+message TracePipelineRegistryServiceListRequest {
+ string group = 1 [(validate.rules).string.min_len = 1];
+}
+
+message TracePipelineRegistryServiceListResponse {
+ repeated TracePipelineConfig trace_pipeline_config = 1;
+}
+
+// TracePipelineRegistryService manages TracePipelineConfig resources,
mirroring
+// the registry services of every other schema resource. Create/Update run the
+// admission and conflict checks of §2.3/§2.4 of the design.
+service TracePipelineRegistryService {
+ rpc Create(TracePipelineRegistryServiceCreateRequest) returns
(TracePipelineRegistryServiceCreateResponse) {
+ option (google.api.http) = {
+ post: "/v1/trace-pipeline/schema"
+ body: "*"
+ };
+ }
+
+ rpc Update(TracePipelineRegistryServiceUpdateRequest) returns
(TracePipelineRegistryServiceUpdateResponse) {
+ option (google.api.http) = {
+ put:
"/v1/trace-pipeline/schema/{trace_pipeline_config.metadata.group}/{trace_pipeline_config.metadata.name}"
+ body: "*"
+ };
+ }
+
+ rpc Delete(TracePipelineRegistryServiceDeleteRequest) returns
(TracePipelineRegistryServiceDeleteResponse) {
+ option (google.api.http) = {delete:
"/v1/trace-pipeline/schema/{metadata.group}/{metadata.name}"};
+ }
+
+ rpc Get(TracePipelineRegistryServiceGetRequest) returns
(TracePipelineRegistryServiceGetResponse) {
+ option (google.api.http) = {get:
"/v1/trace-pipeline/schema/{metadata.group}/{metadata.name}"};
+ }
+
+ rpc List(TracePipelineRegistryServiceListRequest) returns
(TracePipelineRegistryServiceListResponse) {
+ option (google.api.http) = {get:
"/v1/trace-pipeline/schema/lists/{group}"};
+ }
+
+ // Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch
Get instead.
+ rpc Exist(TracePipelineRegistryServiceExistRequest) returns
(TracePipelineRegistryServiceExistResponse);
+}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index ff5fd5760..4026f3260 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -378,6 +378,28 @@
- [MeasureService](#banyandb-measure-v1-MeasureService)
+-
[banyandb/pipeline/v1/trace_pipeline.proto](#banyandb_pipeline_v1_trace_pipeline-proto)
+ - [Plugin](#banyandb-pipeline-v1-Plugin)
+ - [SamplerPlugin](#banyandb-pipeline-v1-SamplerPlugin)
+ - [StageRule](#banyandb-pipeline-v1-StageRule)
+ - [TracePipelineConfig](#banyandb-pipeline-v1-TracePipelineConfig)
+ -
[TracePipelineRegistryServiceCreateRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceCreateRequest)
+ -
[TracePipelineRegistryServiceCreateResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceCreateResponse)
+ -
[TracePipelineRegistryServiceDeleteRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteRequest)
+ -
[TracePipelineRegistryServiceDeleteResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteResponse)
+ -
[TracePipelineRegistryServiceExistRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceExistRequest)
+ -
[TracePipelineRegistryServiceExistResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceExistResponse)
+ -
[TracePipelineRegistryServiceGetRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceGetRequest)
+ -
[TracePipelineRegistryServiceGetResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceGetResponse)
+ -
[TracePipelineRegistryServiceListRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceListRequest)
+ -
[TracePipelineRegistryServiceListResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceListResponse)
+ -
[TracePipelineRegistryServiceUpdateRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateRequest)
+ -
[TracePipelineRegistryServiceUpdateResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateResponse)
+
+ - [PipelineEvent](#banyandb-pipeline-v1-PipelineEvent)
+
+ -
[TracePipelineRegistryService](#banyandb-pipeline-v1-TracePipelineRegistryService)
+
- [banyandb/property/v1/gossip.proto](#banyandb_property_v1_gossip-proto)
- [PropagationContext](#banyandb-property-v1-PropagationContext)
- [PropagationRequest](#banyandb-property-v1-PropagationRequest)
@@ -5899,6 +5921,371 @@ WriteResponse is the response contract for write
+<a name="banyandb_pipeline_v1_trace_pipeline-proto"></a>
+<p align="right"><a href="#top">Top</a></p>
+
+## banyandb/pipeline/v1/trace_pipeline.proto
+
+
+
+<a name="banyandb-pipeline-v1-Plugin"></a>
+
+### Plugin
+Plugin is one link in a pipeline's processing chain — a generic,
kind-tagged
+envelope around a user-supplied native Go plugin. The set oneof arm selects
+the kind; today the only kind is a sampler (SamplerPlugin). Adding a new kind
+is purely additive: define its payload message and add a new arm to `kind`,
+leaving existing arms and field numbers untouched.
+
+A chain of Plugin (TracePipelineConfig.plugins, StageRule.plugins) is a
+sequential pipe: links run in declared order, each link processes the traces
+the previous link kept, and a link that fails is bypassed (its input passes
+through unchanged). The plugin↔engine contract for each kind (the vectorized
+batch, the projection handshake, the verdict) lives in the pinned Go SDK
+module `pkg/pipeline/sdk`, not in this proto.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| name | [string](#string) | | Operator-facing identity for this link, used
in diagnostics and admission errors. Must be non-empty; cross-element
uniqueness within a chain is enforced server-side. |
+| sampler | [SamplerPlugin](#banyandb-pipeline-v1-SamplerPlugin) | | Sampler
kind: owns a keep/drop verdict over a vectorized batch of traces. |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-SamplerPlugin"></a>
+
+### SamplerPlugin
+SamplerPlugin configures a user-supplied native Go plugin (a .so loaded
+in-process via the Go `plugin` package) that owns a keep/drop verdict over a
+vectorized batch of traces. It is the sampler kind of Plugin (Plugin.sampler)
+and is the keep/drop link wherever a chain runs: in a TracePipelineConfig
+chain it gates at the enabled PipelineEvent(s) (merge / finalization); in a
+StageRule chain it is per-stage retention at a stage's migration-out
boundary.
+
+The full plugin↔engine contract (the vectorized batch type, the projection
+handshake, and the verdict shape) lives in the pinned Go SDK module
+`pkg/pipeline/sdk`, not in this proto; this message only locates and admits
+the plugin. Three properties of that contract:
+ - Strong compatibility: the boundary exchanges only stdlib/primitive types
+ defined in the pinned SDK, so no third-party struct version is pinned
+ across the .so boundary. The plugin must be built with the SAME Go
+ toolchain, build tags, and flags (-trimpath, CGO) and the SAME pinned SDK
+ as the running data node; `abi_version` is checked at load.
+ - Vectorized input: the plugin's Decide is called once per columnar
batch of
+ traces, not once per trace.
+ - Projection: the plugin declares the columns it needs (SDK Project →
+ Projection{Tags, SpanIDs, Spans}); the engine materializes only those tag
+ columns and, only when requested, the spans stream — like a query
+ projection.
+
+Operational constraints (Go `plugin`): Linux/macOS only; plugins cannot be
+unloaded, so changing one requires a node restart (no hot-reload); a plugin
+panic is contained with recover() and fails open (the whole batch is
+retained). Loading arbitrary code is operator-only and gated behind a server
+flag plus a trusted plugin directory.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| path | [string](#string) | | Plugin .so filename, resolved within the data
node's trusted plugin directory. The engine rejects any path that escapes
that directory. |
+| symbol | [string](#string) | | Constructor symbol the engine looks up;
defaults to "NewSampler" if empty. |
+| abi_version | [uint32](#uint32) | | ABI version the plugin was built
against. The engine refuses to load the plugin unless this equals its own
compiled sdk.ABIVersion. |
+| config | [google.protobuf.Struct](#google-protobuf-Struct) | |
Plugin-defined configuration, set directly in the pipeline config as a
structured object. The engine does not interpret its keys: it serializes the
Struct to canonical JSON and hands the bytes to the plugin's constructor
(SDK NewSampler([]byte)), which unmarshals them into the plugin's own typed
config and validates them — a malformed config fails the load. Optional: a
plugin that needs no configuration leaves it unset. |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-StageRule"></a>
+
+### StageRule
+StageRule binds the pipeline to one lifecycle stage of the targeted Group
+and declares that stage's retention plugin chain. The rule fires at the
+stage's migration-out boundary (i.e. when a segment migrates from this
stage
+to the next stage); routine compaction is governed by PIPELINE_EVENT_MERGE on
+TracePipelineConfig, not by StageRule.
+
+Per-stage retention uses the SAME chain mechanism as gating: each stage's
+`plugins` chain owns the keep/drop verdict for traces leaving that stage. A
+StageRule with an empty `plugins` chain has no filtering effect — every trace
+at this stage migrates unchanged. The "rising bar" across stages (Hot
keeps
+more, Cold keeps less) is expressed by each stage's plugin config (see §4.2
+of the design doc), not by a fixed predicate vocabulary.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| stage | [string](#string) | | Stage name from the Group's
ResourceOpts.stages (e.g. "hot", "warm", "cold"). Must
be non-empty; an empty stage name cannot match any lifecycle stage. |
+| plugins | [Plugin](#banyandb-pipeline-v1-Plugin) | repeated | Per-stage
retention chain: the ordered plugins (a sequential pipe) that decide keep/drop
for traces leaving this stage at its migration-out boundary. Empty means no
per-stage drop (every trace migrates). Same contract and composition as the
gating chain (see Plugin). |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineConfig"></a>
+
+### TracePipelineConfig
+TracePipelineConfig is the root configuration for a storage-node trace
pipeline.
+It reuses existing catalog identifiers (group via metadata, stage names, schema
+names) for targeting instead of declaring a parallel metadata model.
+
+The pipeline has up to three filter points:
+ 1. PIPELINE_EVENT_MERGE — in-merge filter during Hot-phase LSM compaction
(default).
+ 2. PIPELINE_EVENT_FINALIZE — tail-sampling gate at Hot-phase finalization.
+ 3. Per-stage retention via StageRule.plugins, applied at the stage's
+ migration-out boundary (when the segment migrates to the next stage).
+ Always implicit when any StageRule carries a plugin chain.
+Events 1 and 2 are toggleable via `enabled_events`. The gating policy those
+events evaluate is the `plugins` chain (a sequential pipe of native Go
+plugins; today each link is a sampler).
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
Identity and revision tracking; metadata.group is the Group this pipeline lives
in and applies to, consistent with every other schema resource. Required: every
config needs a name/group for registry handling. |
+| enabled | [bool](#bool) | | Active status of the pipeline. |
+| stages | [StageRule](#banyandb-pipeline-v1-StageRule) | repeated | Per-stage
retention rules: which lifecycle stages this pipeline acts on, with the
retention plugin chain for each. Each rule fires at its stage's
migration-out boundary regardless of `enabled_events`. Empty means the only
filters are the `enabled_events` events (no per-stage drop). |
+| schema_names | [string](#string) | repeated | Explicit schema names to
target within the Group (exact match on Metadata.name). Each entry must be
non-empty; cross-element uniqueness is enforced server-side. |
+| schema_name_regex | [string](#string) | | RE2 regular expression matched
against schema names. A schema is targeted if it is listed in schema_names OR
matches this pattern. When both are empty, every schema in the Group is
targeted. |
+| plugins | [Plugin](#banyandb-pipeline-v1-Plugin) | repeated | Gating policy:
an ordered chain of plugins (a sequential pipe) evaluated by any enabled event
(PIPELINE_EVENT_MERGE and/or PIPELINE_EVENT_FINALIZE). Links run in declared
order, each processing the traces the previous link kept; a link that fails is
bypassed (fail-open). Today every link is a sampler, so the chain is the
conjunction of the links' keep/drop verdicts. Empty means the only
retention is the per-stage StageRu [...]
+| enabled_events | [PipelineEvent](#banyandb-pipeline-v1-PipelineEvent) |
repeated | Pipeline-wide events to run. Empty defaults to
[PIPELINE_EVENT_MERGE] — the in-merge filter is on, the finalization gate is
off. To enable the finalization gate, include PIPELINE_EVENT_FINALIZE; to
disable the merge filter, list only [PIPELINE_EVENT_FINALIZE]; the explicit
empty default value is also acceptable to mean "merge only". Each
element must be a defined, non-UNSPECIFIED value; duplicate [...]
+| merge_grace | [google.protobuf.Duration](#google-protobuf-Duration) | |
Per-trace maturity window for the in-merge filter (§7.1). A trace is eligible
for dropping during an LSM compaction merge only once its latest span timestamp
is older than `now - merge_grace`; younger traces pass through the merge
unchanged. Bounds the expected intra-trace span arrival spread (typically
seconds). Used iff `enabled_events` contains PIPELINE_EVENT_MERGE. Strictly
positive if set; engine default 30s [...]
+| finalize_grace | [google.protobuf.Duration](#google-protobuf-Duration) | |
Per-segment settling window for the scheduled finalization pass (§7.3). A
segment is treated as settled, and the authoritative final filter runs, once
the event-time watermark exceeds `segment.End + finalize_grace`. Bounds
segment-wide late arrival (typically minutes). Used iff `enabled_events`
contains PIPELINE_EVENT_FINALIZE. Strictly positive if set; engine default 5m
if unset. |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceCreateRequest"></a>
+
+### TracePipelineRegistryServiceCreateRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| trace_pipeline_config |
[TracePipelineConfig](#banyandb-pipeline-v1-TracePipelineConfig) | | |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceCreateResponse"></a>
+
+### TracePipelineRegistryServiceCreateResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) | | |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteRequest"></a>
+
+### TracePipelineRegistryServiceDeleteRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
|
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteResponse"></a>
+
+### TracePipelineRegistryServiceDeleteResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| deleted | [bool](#bool) | | |
+| delete_time | [int64](#int64) | | delete_time is the server-assigned
tombstone timestamp in unix nanos. |
+| mod_revision | [int64](#int64) | | mod_revision is the etcd revision of the
tombstone; zero if the server did not record one. |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceExistRequest"></a>
+
+### TracePipelineRegistryServiceExistRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
|
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceExistResponse"></a>
+
+### TracePipelineRegistryServiceExistResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| has_group | [bool](#bool) | | |
+| has_trace_pipeline_config | [bool](#bool) | | |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceGetRequest"></a>
+
+### TracePipelineRegistryServiceGetRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | |
|
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceGetResponse"></a>
+
+### TracePipelineRegistryServiceGetResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| trace_pipeline_config |
[TracePipelineConfig](#banyandb-pipeline-v1-TracePipelineConfig) | | |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceListRequest"></a>
+
+### TracePipelineRegistryServiceListRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| group | [string](#string) | | |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceListResponse"></a>
+
+### TracePipelineRegistryServiceListResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| trace_pipeline_config |
[TracePipelineConfig](#banyandb-pipeline-v1-TracePipelineConfig) | repeated | |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateRequest"></a>
+
+### TracePipelineRegistryServiceUpdateRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| trace_pipeline_config |
[TracePipelineConfig](#banyandb-pipeline-v1-TracePipelineConfig) | | |
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateResponse"></a>
+
+### TracePipelineRegistryServiceUpdateResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) | | |
+
+
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-PipelineEvent"></a>
+
+### PipelineEvent
+PipelineEvent identifies a pipeline-wide event that can be independently
+enabled. Per-stage retention (StageRule.plugins) fires implicitly at the
+stage's migration-out boundary and is not toggleable via this enum.
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| PIPELINE_EVENT_UNSPECIFIED | 0 | |
+| PIPELINE_EVENT_MERGE | 1 | In-merge filter during Hot-phase LSM compaction
merges (Warm/Cold compactions stay lossless). Per-trace drops are gated by
`merge_grace` so partial traces are not destroyed prematurely. Cheap, runs
often; verdicts wait for trace maturity (see §7.1). |
+| PIPELINE_EVENT_FINALIZE | 2 | Tail-sampling gate at Hot-phase segment
finalization, after the segment has settled (event-time watermark past
`segment.End + finalize_grace`). Heavy but authoritative; sees the complete
trace (see §7.3). |
+
+
+
+
+
+
+
+<a name="banyandb-pipeline-v1-TracePipelineRegistryService"></a>
+
+### TracePipelineRegistryService
+TracePipelineRegistryService manages TracePipelineConfig resources, mirroring
+the registry services of every other schema resource. Create/Update run the
+admission and conflict checks of §2.3/§2.4 of the design.
+
+| Method Name | Request Type | Response Type | Description |
+| ----------- | ------------ | ------------- | ------------|
+| Create |
[TracePipelineRegistryServiceCreateRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceCreateRequest)
|
[TracePipelineRegistryServiceCreateResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceCreateResponse)
| |
+| Update |
[TracePipelineRegistryServiceUpdateRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateRequest)
|
[TracePipelineRegistryServiceUpdateResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceUpdateResponse)
| |
+| Delete |
[TracePipelineRegistryServiceDeleteRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteRequest)
|
[TracePipelineRegistryServiceDeleteResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceDeleteResponse)
| |
+| Get |
[TracePipelineRegistryServiceGetRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceGetRequest)
|
[TracePipelineRegistryServiceGetResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceGetResponse)
| |
+| List |
[TracePipelineRegistryServiceListRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceListRequest)
|
[TracePipelineRegistryServiceListResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceListResponse)
| |
+| Exist |
[TracePipelineRegistryServiceExistRequest](#banyandb-pipeline-v1-TracePipelineRegistryServiceExistRequest)
|
[TracePipelineRegistryServiceExistResponse](#banyandb-pipeline-v1-TracePipelineRegistryServiceExistResponse)
| Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch
Get instead. |
+
+
+
+
+
<a name="banyandb_property_v1_gossip-proto"></a>
<p align="right"><a href="#top">Top</a></p>
diff --git a/docs/design/post-trace-pipeline.md
b/docs/design/post-trace-pipeline.md
new file mode 100644
index 000000000..663510622
--- /dev/null
+++ b/docs/design/post-trace-pipeline.md
@@ -0,0 +1,608 @@
+# BanyanDB Storage-Node Post-Trace Pipeline & Storage Tier Mapping
Specification
+
+This document presents the complete technical design for the **BanyanDB
Storage-Node Post-Trace Pipeline** and its integration with physical hardware
storage tiers and time-aging schedulers.
+
+Unlike traditional streaming-based telemetry engines that perform span
assembly inside active, memory-heavy windowing pipelines, BanyanDB relies on a
**native trace model**. Spans are stored, sorted, and indexed by `trace_id`
directly within the storage engine layout on individual data nodes.
+
+By decoupling trace assembly from the active ingestion path, the post-trace
pipeline executes asynchronous tail-sampling, per-stage retention filtering,
and data reduction natively during storage lifecycle events on the data nodes.
This design is grounded in recent academic advancements in post-hoc retroactive
tracing and storage-level file merge analysis.
+
+## Architectural Concept: Decoupled Gating and Per-Stage Retention
+
+To maximize storage efficiency and compute performance on the BanyanDB data
nodes, trace evaluation is separated into two logical phases, which
subsequently feed into an automated **time-aging system** for partition-level
migration:
+
+```mermaid
+flowchart TD
+ GA["Grouped Trace Assembly"]
+ GA --> G["1. Gating (plugins chain, at merge/finalize)<br/>Goal: decide
whether a trace is retained at all<br/>Criteria: native Go plugin-chain verdict
(operator-defined)"]
+ G -->|"Retain"| R["2. Per-Stage Retention (StageRule.plugins)<br/>Goal: at
each stage's migration-out event, keep/drop per the stage's plugin-chain
verdict<br/>Criteria: per-stage native Go plugin-chain verdict
(operator-defined)"]
+ G -->|"Drop / Purge"| D["Discard Block<br/>Reclaim Space"]
+ R --> T["3. Time-Aging System (Stage-Stepped Migration Engine)<br/>Goal:
each stage's StageRule decides which traces migrate to the next
stage;<br/>routine Hot-phase compaction may also drop traces when
PIPELINE_EVENT_MERGE is on (default)<br/>Stage Migration: Hot → Warm → Cold
(LifecycleStage order)<br/>RULE: medium is a node-group concern (may be
heterogeneous); the per-stage plugin governs retention, not medium selection"]
+```
+
+### 1.1 Gating (the `plugins` chain)
+
+- **Operation Type**: An ordered chain of native Go plugins (`plugins`)
delivers the keep/drop verdict. Each link is a `Plugin` (today the only kind is
a sampler); the chain is a sequential pipe — links run in declared order, each
evaluating the traces the previous link kept, so an all-sampler chain is the
conjunction of the links' verdicts (§2.5). It is the sole gating mechanism,
invoked at whichever of the toggleable events is enabled.
+
+- **Responsibility**: Gating runs **only in the Hot phase**, at whichever of
the toggleable events is enabled — `PIPELINE_EVENT_MERGE` (in-merge on Hot
compactions, §7.1; default-on) and/or `PIPELINE_EVENT_FINALIZE` (on settled Hot
segments, §7.3); Warm and Cold compactions are byte-for-byte lossless. It
determines whether a freshly-assembled trace block **survives** at all —
passing into the stage lifecycle — or is **purged** to reclaim storage space.
+
+- **Compute Profile**: Operator-defined. The plugin receives a vectorized
batch of traces and, via its `Project()` declaration, materializes only the
columns it needs — the named tag columns and, only when requested, the heavy
span bodies (§2.5). Verdicts are expected to be pure in their inputs (the
`TraceBatch` plus the frozen `config`), so they are deterministic in `trace_id`
and stable across re-evaluation once the trace is mature per `merge_grace` or
settled per `finalize_grace`.
+
+- **Contract**: An ordered chain of native Go plugins (`.so`s loaded
in-process via the Go `plugin` package); each link owns a keep/drop verdict and
the chain composes them as a sequential pipe, invoked by the same enabled
events at the same maturity/settling gates. Each link's contract — a vectorized
columnar batch in, a boolean keep-mask out, with up-front column projection —
is specified in §2.5.
+
+### 1.2 Per-Stage Retention (`StageRule.plugins`)
+
+- **Operation Type**: Per-stage keep/drop verdict by a native Go plugin chain.
Each `StageRule` carries its own `plugins` chain; a trace it drops is omitted
from the part written for the next stage at the stage's migration-out boundary.
+
+- **Responsibility**: Decides **which traces survive each stage** as data
ages. Each `StageRule` fires once per segment lifetime, at the stage's
**migration-out** boundary (when the segment migrates to the next stage). The
"rising bar" effect — Hot keeps more, Cold keeps less — is expressed by
tightening each stage plugin's config at successive stages. The pipeline does
**not** choose the physical storage medium; that is a node-group /
`LifecycleStage` placement concern (§4.1). Routine H [...]
+
+- **Compute Profile**: Operator-defined — the same vectorized contract as the
gating plugin (§2.5): the stage plugin receives a batch of traces, projects
only the columns it declares via `Project()`, and returns a boolean keep-mask.
`MinTS`/`MaxTS` are free from block metadata; tag columns are decoded only when
the plugin's projection requests them.
+
+## Protobuf Message Design
+
+The pipeline configuration is a single trace-typed message,
`TracePipelineConfig`. Rather than a parallel abstract metadata layer, it
reuses the existing catalog identifiers — Group (via `metadata`), lifecycle
stage names, and schema names — and adds only the trace-specific gating (a
native plugin chain) and per-stage retention rules. General stream parsing and
metrics aggregation configurations are excluded to maintain a strict focus on
trace-centric analytical operations.
+
+### 2.1 Targeting Model: Reuse Existing Catalog Identifiers
+
+This design deliberately does **not** introduce an abstract `Pipeline`
resource or an `ExecutionTrigger` enum. Both would re-declare targeting that
the storage model already owns and let the two drift. A trace pipeline is
instead expressed entirely with identifiers that already exist:
+
+- **Group** — named by the config's own `metadata.group`
(`common.v1.Metadata`). A `TracePipelineConfig` lives in, and applies to, that
Group, exactly as every other schema resource does. The Group already fixes the
`catalog`, so catalog is never repeated.
+- **Lifecycle stages** — a `StageRule` per targeted stage, each naming a stage
from the Group's `ResourceOpts.stages` (e.g. `"hot"`, `"warm"`, `"cold"`) and
carrying that stage's **retention `plugins` chain** (the same native-plugin
mechanism as gating). Each rule fires at the stage's migration-out boundary.
Stage names are the same vocabulary queries already accept
(`trace/v1/query.proto`'s `stages`). All retention policy is configured here,
per stage and per pipeline — there is no hard [...]
+- **Schema selector** — an explicit `schema_names` list (exact match on
`common.v1.Metadata.name`) plus a `schema_name_regex` (RE2). A schema matches
if it is listed OR matches the regex; both empty targets every schema in the
Group.
+
+The former `ExecutionTrigger` (COMPACTION / MIGRATION / SCHEDULED) is replaced
by three anchored events: the **in-merge filter at LSM compaction** (§7.1,
toggleable via `PIPELINE_EVENT_MERGE` — on by default), the **plugin gating
pass at finalization** (§7.3, toggleable via `PIPELINE_EVENT_FINALIZE`), and
the **per-stage retention pass at migration-out** (§7.2, always-on when any
`StageRule` carries a `plugins` chain). The first two run the gating chain; the
third runs the per-stage `Sta [...]
+
+Multiple `TracePipelineConfig`s may coexist in a Group only when their
effective coverage is disjoint; the schema registry enforces this with a
per-tuple uniqueness rule (§2.3).
+
+### 2.2 Trace Pipeline Specification (`trace_pipeline.proto`)
+
+The full message definitions live in the proto source at
`api/proto/banyandb/pipeline/v1/trace_pipeline.proto` (package
`banyandb.pipeline.v1`); they are not duplicated here. `TracePipelineConfig` is
the single root resource: it carries its own identity (`metadata`), the
targeting fields from §2.1, and the trace-specific gating (a native plugin
chain) and per-stage retention rules. There is no embedded abstract `Pipeline`
and no `ExecutionTrigger`; the anchored filter points are the plug [...]
+
+The message set is:
+
+- **`TracePipelineConfig`** — root resource: `metadata`, `enabled`, the
per-stage `stages` rules, the `schema_names` / `schema_name_regex` selector,
the gating policy as a native-plugin `plugins` chain, the `enabled_events` list
(defaults to `[PIPELINE_EVENT_MERGE]`), and the two grace windows `merge_grace`
(§5.1 / §7.1) and `finalize_grace` (§7.3) — each consulted only when its
corresponding event is enabled.
+- **`PipelineEvent`** — enum of pipeline-wide events: `PIPELINE_EVENT_MERGE`
(in-merge filter at LSM compaction, §7.1) and `PIPELINE_EVENT_FINALIZE` (plugin
gating pass at segment finalization, §7.3). Per-stage retention (`StageRule`)
fires implicitly at migration-out and is not in this enum.
+- **`StageRule`** — binds the pipeline to one lifecycle stage and carries that
stage's retention `plugins` chain: a `stage` name plus a `repeated Plugin`. The
rule fires at the stage's migration-out boundary, where the chain's keep-mask
decides which traces migrate; a `StageRule` with an empty `plugins` chain has
no filtering effect (see §4.2).
+- **`Plugin`** — one link in a chain: a `name` plus a `oneof kind` whose set
arm selects the kind. The set arm is the discriminator; today the only arm is
`sampler` (a `SamplerPlugin`). Adding a kind is purely additive — a new payload
message and a new `oneof` arm — so existing configs and field numbers are
untouched (§2.6).
+- **`SamplerPlugin`** — the sampler kind of `Plugin`: `path` (the `.so` within
the trusted plugin dir), `symbol` (constructor, default `NewSampler`),
`abi_version` (checked against the host at load), and a structured `config`
(`google.protobuf.Struct`) set directly in the pipeline config — the engine
serializes it to JSON and hands it to the plugin, which unmarshals it into its
own typed config. The vectorized-batch / projection / verdict contract is the
Go SDK, not the proto (§2.5).
+- **`TracePipelineRegistryService`** — the CRUD registry surface (`Create` /
`Update` / `Delete` / `Get` / `List` / `Exist`, with HTTP mappings under
`/v1/trace-pipeline/schema`), mirroring every other schema resource's
`*RegistryService`. `Create`/`Update` are where the admission and conflict
checks of §2.3 / §2.4 run. Without this service a `TracePipelineConfig` would
be an orphaned, un-writable resource.
+
+### 2.3 Uniqueness and Conflict Policy
+
+Multiple `TracePipelineConfig` resources can coexist in a Group, but their
**effective coverage must be disjoint on two independent keys**, so the
behavior at every point is deterministic — there is no implicit ordering,
priority, or composition of overlapping pipelines:
+
+- **Gating key `(Group, Schema, Event)`** — at most one active pipeline may
gate a given schema at a given `PipelineEvent` (`PIPELINE_EVENT_MERGE` /
`PIPELINE_EVENT_FINALIZE`). Gating runs only in the Hot phase (§1.1),
independent of `stages`.
+- **Retention key `(Group, Schema, Source-Stage)`** — at most one active
pipeline may carry a `StageRule` for a given schema at a given source stage's
migration-out boundary.
+
+**Effective coverage of a pipeline.** A `TracePipelineConfig` P with
`enabled=true` covers:
+
+- **Gating tuples** `(P.metadata.group, schema, event)` for each `schema`
selected by `P.schema_names`/`P.schema_name_regex` (both empty selectors target
every schema in the Group) and each `event` in `P.enabled_events` (default
`[PIPELINE_EVENT_MERGE]`).
+- **Retention tuples** `(P.metadata.group, schema, stage)` for each selected
`schema` and each `stage` named by a `StageRule` in `P.stages`. An empty
`P.stages` covers **no** retention tuples — it declares no per-stage retention,
consistent with the proto semantics that empty `stages` means no per-stage drop.
+
+**Conflict detection on write.** When a `TracePipelineConfig` is created or
updated, the registry expands its gating and retention tuples against the
current schemas and stages of its Group and rejects the write if **any** tuple
on either key is already covered by another `enabled` pipeline in the same
Group. The error names the conflicting pipeline and the overlapping tuple(s).
+
+**Schema additions.** When a new `Trace` schema is created in a Group, the
registry re-validates every `enabled` `TracePipelineConfig` in that Group
against the new schema. If the new schema would cause two pipelines to overlap
on any gating or retention tuple, the schema creation is rejected; the operator
must narrow the pipelines (e.g. add explicit `schema_names`) before adding the
schema.
+
+**Disabling and replacement.** Setting `enabled=false` removes a pipeline from
active coverage immediately, freeing its tuples for another pipeline to claim.
This is the safe path for atomically swapping in a new pipeline: disable the
old, enable or create the new, then delete the old. There is no implicit
precedence between two `enabled` pipelines — the registry will not pick a
winner.
+
+**Runtime defense-in-depth.** At each enabled Hot-phase event (§7.1/§7.3) the
engine selects the active pipeline by gating key `(group, schema, event)`; at a
migration-out boundary (§7.2) it selects by retention key `(group, schema,
source_stage)`. If a registry inconsistency ever exposes more than one active
match for a key, the engine logs an error and applies **none** of them —
failing safe to retain rather than risk a nondeterministic destructive drop.
+
+**Recommended pattern.** For most workloads, a single pipeline per Group with
a broad `schema_name_regex` is the simplest configuration. Multiple pipelines
in a Group are useful only when their schema selectors are mutually exclusive
(for example `schema_names: ["segment"]` and `schema_names: ["zipkin_span"]` in
a hypothetical mixed group) so no tuple is doubly covered.
+
+### 2.4 Admission Validation
+
+The proto pins bounds with PGV so a malformed config is rejected at parse time
rather than producing undefined behavior. The schema registry's admission
control adds one cross-field rule that PGV cannot express.
+
+**PGV-enforced bounds** (rejected at proto parse / `Write` time):
+
+- `TracePipelineConfig.merge_grace`, `TracePipelineConfig.finalize_grace` —
strictly positive when set; unset means engine default (30s and 5m
respectively). Each is consulted only when its corresponding `PipelineEvent` is
in `enabled_events`.
+- `TracePipelineConfig.enabled_events` — each element must be a defined,
non-`UNSPECIFIED` enum value (`repeated.items.enum = {defined_only: true,
not_in: [0]}`).
+- `TracePipelineConfig.schema_names` — each element non-empty
(`repeated.items.string.min_len = 1`).
+- `StageRule.stage` — non-empty (`min_len: 1`); `Plugin.name` — non-empty
(`min_len: 1`); `SamplerPlugin.path` — non-empty (`min_len: 1`);
`SamplerPlugin.abi_version` — `gte: 1`.
+
+**Admission rules (server-side; not expressible in PGV):**
+
+- **Required rule blocks when enabled.** When `TracePipelineConfig.enabled =
true`, the config must declare at least one of: (a) a non-empty gating
`plugins` chain paired with a non-empty **effective event set** — that is,
after applying the `[PIPELINE_EVENT_MERGE]` default for an unset/empty
`enabled_events`, at least one of `PIPELINE_EVENT_MERGE` or
`PIPELINE_EVENT_FINALIZE` actually fires (this check must be evaluated against
the defaulted set, not a raw `len(enabled_events) > 0`, so [...]
+- **Well-formedness checks** (PGV cannot express these cross-element rules):
`enabled_events` is normalized to a duplicate-free set (duplicates are
de-duped, not an error); every `StageRule.stage` and every `schema_names` entry
is unique within the config (duplicates rejected); every `StageRule.stage`
names a real stage in the Group's `ResourceOpts.stages` and is **non-terminal**
— a `StageRule` on the last (Cold) stage is rejected, since the terminal stage
has no migration-out boundary [...]
+- **Plugin admission (for every link in every chain — gating and per-stage).**
Validation is split by topology, because plugins live on the **data nodes**
while the registry write path runs on the **liaison/metadata** layer — the
liaison cannot itself `dlopen` a data-node `.so`. (1) At write time the liaison
validates only the **static** fields it can check without the binary: every
`Plugin` has a set `kind` arm and a non-empty `name` (unique within its chain),
and for the sampler kind n [...]
+
+Both the liaison write path and the per-node runtime apply their respective
checks. If a registry inconsistency ever lets a malformed config reach a data
node, the node logs an error and falls back to **retain** for every tuple the
malformed config would have governed — never a destructive drop (consistent
with the §2.3 runtime fail-safe).
+
+### 2.5 Native Plugin Contract & Chain Composition
+
+A pipeline's gating policy and each stage's retention are an **ordered chain
of native Go plugins** (`plugins`). A `Plugin` is one link — a generic,
kind-tagged envelope whose set `oneof` arm selects the kind; today the only
kind is a **sampler**, a **user-supplied `.so` loaded in-process via the
standard Go `plugin` package** that owns a keep/drop verdict. The chain is
invoked by the enabled events (§7.1 / §7.3) at the same maturity
(`merge_grace`) and settling (`finalize_grace`) gates. [...]
+
+This is an **operator-only, trusted** extension point: loading a `.so` runs
arbitrary code inside the data node, so it is gated behind a server flag and a
fixed trusted plugin directory; the proto config only *references* a plugin
already vetted on disk (§2.4). It is not tenant-facing.
+
+The contract is designed around three hard requirements.
+
+**(2) Vectorized parameter — the engine's native trace columns.** The batch is
**not** a parallel columnar form invented for the plugin; it is the engine's
own native trace block (`banyand/trace/block.go`). The merger already streams
and assembles one columnar `block` per `trace_id` — `spans [][]byte` (the span
column), `tags []tag` (tag columns, each a row-aligned `values [][]byte` + a
`valueType`), `spanIDs []string`, and `minTS`/`maxTS` — so the plugin receives
exactly the projected c [...]
+
+**(1) Strong compatibility of the parameter and return types.** Go's `plugin`
package is strict by design: the [official docs](https://pkg.go.dev/plugin)
state host and plugin must be built with "exactly the same version of the
toolchain, the same build tags, and the same values of certain flags and
environment variables," and that "all common dependencies … [must be] built
from exactly the same source code" — in practice "built together by a single …
component." This lock is *irreducibl [...]
+
+- The boundary types live in a single pinned SDK module, `pkg/pipeline/sdk`,
that mirrors the native column layout with stdlib slices (`[][]byte`,
`[]string`, `int64`) plus the engine's already-public, byte-sized value-type
enum `pbv1.ValueType` (`pkg/pb/v1`). Tag values cross as the raw marshaled
`[][]byte` exactly as the block holds them; the SDK re-exports the `pbv1`
decode helper so a plugin decodes a value by its `ValueType` without importing
engine internals directly.
+- Because the batch *is* the native trace layout, the plugin is by design
**version-locked to the BanyanDB release** — which is consistent with Go
plugin's irreducible toolchain lock, not an additional cost. The SDK module is
the single version-tagged surface operators build against; `pkg/pb/v1` is a
stable `byte` enum, and no `banyand/trace`-internal struct is exported across
the boundary.
+- The plugin re-exports an `ABIVersion` constant; the engine refuses to load
on mismatch with its own compiled `sdk.ABIVersion`, turning a silent miscompile
into a clear, fail-fast error. Configuration is a structured
`google.protobuf.Struct` (`SamplerPlugin.config`) set directly in the pipeline
config; the engine serializes it to canonical JSON and the plugin unmarshals
those `[]byte` into its own typed config — so the wire form is structured and
inspectable while the `.so` boundary sta [...]
+- **Distribution:** operators build plugins against the released,
version-tagged `pkg/pipeline/sdk` using the **same CI image / Go version /
`-trimpath` / CGO flags** as the data node. (Background constraints,
well-documented but outside this design's verified scope: Go plugins are
**Linux/macOS only**, **cannot be unloaded** — so changing a plugin requires a
node restart, there is no hot-reload — and a plugin **panic crashes the host**
unless contained; see fail-open below.)
+
+**(3) Projection / column selection — spans optional, more than tags.** The
plugin declares the columns it needs up front via `Project()`, which returns a
`Projection{ Tags []string; SpanIDs bool; Spans bool }`. The engine turns
`Tags` into the **same `model.TagProjection`** the block reader already honors
(`blockMetadata.tagProjection`), so only those tag columns are decoded into
`block.tags` — literally the query engine's tag-projection path
(`trace/v1/query.proto`), not a new mechanis [...]
+
+**Go SDK (`pkg/pipeline/sdk`).** The batch types mirror the native trace
`block`/`tag` (`banyand/trace/block.go`); the engine fills them with the
block's own slices, shared **read-only** (not copied) — see the read-only
contract below:
+
+```go
+package sdk
+
+// ABIVersion is compiled into the host and re-exported by every plugin; the
+// engine refuses to load a plugin whose ABIVersion differs.
+const ABIVersion = 1
+
+// Exported symbols the .so MUST provide (the constructor name is the kind's
+// convention — NewSampler for the sampler kind):
+// var ABIVersion int // == sdk.ABIVersion
+// func NewSampler(config []byte) (Sampler, error)
+
+// Kind identifies a Plugin's role in a chain; it mirrors the set arm of the
+// proto Plugin.kind oneof. New kinds are added here in lockstep with new arms.
+type Kind uint8
+
+const (
+ KindUnspecified Kind = iota // zero value; a real plugin never reports it
+ KindSampler // keep/drop verdict (see Sampler)
+)
+
+// Plugin is the common interface every kind satisfies — the generic link type
+// the engine handles uniformly. The engine constructs a Plugin, checks Kind
+// against the proto oneof arm that named it, then type-asserts to that kind's
+// interface (e.g. Sampler). Project and Close are shared by every kind.
+type Plugin interface {
+ // Kind reports the plugin kind, for bookkeeping and as a cross-check
+ // against the proto oneof arm. Constant for the plugin's lifetime.
+ Kind() Kind
+
+ // Project is the column-selection handshake, called ONCE at load. The
+ // engine honors it for the plugin's lifetime: Tags drives the native
+ // model.TagProjection (only those tag columns are decoded); SpanIDs and
+ // Spans gate the spans stream. Intrinsic columns are always present.
+ Project() Projection
+
+ // Close releases any resources the plugin holds (called once at unload).
+ Close() error
+}
+
+// Sampler is the keep/drop kind of Plugin (Kind reports KindSampler). In a
+// chain it is a conjunction link: each Sampler narrows the traces the next
+// link sees.
+type Sampler interface {
+ Plugin
+
+ // Decide receives a vectorized batch of assembled per-trace blocks and
+ // returns a keep-mask aligned to batch.Traces. The batch is READ-ONLY:
+ // Decide must not mutate any slice it receives (see the read-only
contract).
+ Decide(batch *TraceBatch) (Verdict, error)
+}
+
+// Projection is the plugin's up-front column request — one handshake covering
+// every optional column. Intrinsic columns (TraceID, MinTS/MaxTS) are always
+// materialized and are NOT listed here.
+type Projection struct {
+ // Tags names the tag columns to decode; the engine builds a
+ // model.TagProjection from them. Empty => no tag columns decoded.
+ Tags []string
+ // SpanIDs opts IN to the span-id column. Default false. span ids and span
+ // bodies share one encoded data block in the native layout, so requesting
+ // span ids forces a read of the spans stream (it is NOT free metadata).
+ SpanIDs bool
+ // Spans opts IN to the heavy span-body column. Default false: the engine
+ // leaves TraceBlock.Spans nil and, on the merge fast path (mustReadRaw),
+ // never decodes span bodies. Set true only when the verdict reads them.
+ Spans bool
+}
+
+// TraceBatch is a vectorized batch of assembled per-trace blocks — the
engine's
+// native columnar trace layout, not a parallel form invented for the plugin.
+type TraceBatch struct {
+ Traces []TraceBlock
+}
+
+// TraceBlock mirrors the native trace `block`: all columns are indexed in
+// lockstep by span row i in [0,Len). The engine shares the block's slices
+// READ-ONLY. Intrinsic columns are always populated; SpanIDs, Tags, and Spans
+// appear only as requested by Project().
+type TraceBlock struct {
+ // Intrinsic — always present (from blockMetadata, no spans-stream decode):
+ TraceID string
+ MinTS int64 // earliest span start (unix nanos)
+ MaxTS int64 // latest span end; Duration = MaxTS - MinTS
+ // Projected — per Project():
+ Tags []TagColumn // projected tag columns (per Projection.Tags)
+ SpanIDs []string // span-id column, row-aligned; nil unless
Projection.SpanIDs
+ Spans [][]byte // span bodies (heaviest column); nil unless
Projection.Spans
+}
+
+// TagColumn mirrors the native `tag`: a row-aligned column of marshaled values
+// plus the value type needed to decode them via the SDK's pbv1 helper.
+type TagColumn struct {
+ Name string
+ ValueType pbv1.ValueType // pkg/pb/v1 — the engine's stable byte-sized enum
+ Values [][]byte // one marshaled value per span row; nil == absent
+}
+
+// Verdict is the per-trace decision, aligned to batch.Traces.
+type Verdict struct {
+ Keep []bool // len(Keep) MUST == len(batch.Traces); Keep[i] retains
Traces[i]
+}
+```
+
+These types are real and live in [`pkg/pipeline/sdk`](../../pkg/pipeline/sdk);
the block above is the conceptual layout, and the canonical definitions plus
the value-decode helpers (`TagColumn.At`, `DecodeTagValue`) are in that
package. A complete reference plugin implementing the Scenario 6.1 sampler —
config parsing, `Project()`, and tag/span extraction — lives at
[`pkg/pipeline/sdk/_example/segment-tail-sampler`](../../pkg/pipeline/sdk/_example/segment-tail-sampler).
+
+**Verdict shape — boolean keep-mask.** `Decide` returns a `[]bool` aligned to
`batch.Traces`: `Keep[i]` retains trace `i`. This is the simplest
fully-vectorized contract and makes the alignment invariant trivial to check
(the engine rejects a verdict whose length ≠ `len(batch.Traces)`). The
keep/drop is per `trace_id`, matching the merger's per-`trace_id` write
granularity (§7.1).
+
+**Chain composition — sequential pipe.** `TracePipelineConfig.plugins`
(gating) and each `StageRule.plugins` (per-stage retention) are *ordered
chains*. The engine materializes the **union** of every link's `Project()`
once, then runs the links in declared order, handing each link the traces the
previous link kept — a read-only sub-batch view, not a copy. The chain's
survivors are what the event retains. For an all-sampler chain this is exactly
the conjunction (AND) of the links' keep-ma [...]
+
+**Read-only batch contract.** The plugin receives the native block's slices
**shared, not copied** — so `Decide` MUST treat `TraceBatch` (and every
`[]byte`/`[]string`/`TagColumn` inside it) as **read-only**. The keep-mask is
the plugin's **only** output channel. Crucially, persistence is structurally
immune to a misbehaving plugin: the engine applies the returned mask to its
**own** untouched block data when writing the reduced part — it never
serializes the plugin-visible slices back t [...]
+
+**Failure handling — fail-open.** The engine wraps every `NewSampler`/`Decide`
call in `recover()`. On panic, returned error, or a length-mismatched verdict,
the engine **retains every trace in the batch** and emits a log + metric. A
retention filter must never *drop* data because of a plugin bug; a misbehaving
plugin degrades to "keep everything," not "drop everything." (A
consecutive-failure counter may later disable the sampler entirely and alert,
but the per-batch default is always k [...]
+
+**Determinism.** `Decide` is expected to be **pure** in its inputs
(`TraceBatch` + the frozen `config`). A plugin that consults wall-clock or
mutable external state breaks the "re-evaluation is stable across
merge/finalize" property the timing model (§3.1) relies on; the SDK documents
this expectation.
+
+**Where it runs.** The plugin runs at each enabled event: at
`PIPELINE_EVENT_MERGE` the batch collects the mature (`merge_grace`-passed)
traces the merger has assembled (§7.1); at `PIPELINE_EVENT_FINALIZE` it
collects the settled segment's traces (§7.3). In both cases the engine wraps
the native assembled `block` it already holds as a `TraceBlock` (sharing the
column slices read-only) and buffers several before calling `Decide` once,
applying the same metadata-vs-decode cost split as §7. [...]
+
+### 2.6 Extending with a New Plugin Kind
+
+The sampler is the only kind this design ships, but `Plugin` is a generic
envelope so new kinds are additive — no change to `TracePipelineConfig`,
`StageRule`, the chain semantics, or existing `.so`s. Adding a hypothetical
**transformer** kind (one that *rewrites* batch data — drop or redact tags,
enrich, collapse spans — rather than only filtering) would touch three places,
in lockstep:
+
+1. **Proto** — add a payload message and a new arm to the `oneof`: `oneof kind
{ SamplerPlugin sampler = 2; TransformerPlugin transformer = 3; }`. Existing
arms and field numbers are untouched, so old configs still parse.
+2. **SDK** — add a `Transformer` sub-interface that embeds `Plugin` and adds
its own processing method (e.g. `Transform(*TraceBatch) (*TraceBatch, error)`),
plus a `KindTransformer` value and a `NewTransformer` constructor convention. A
transform that emits a *new* batch needs a mutable output channel, which is
exactly why this design keeps the batch strictly read-only and defers the
transform kind: the read-only contract (above) holds precisely because the only
shipped kind is a filter.
+3. **Engine** — when the chain executor meets a `transformer` link it
type-asserts to `Transformer` and feeds its output to the next link instead of
applying a keep-mask. The sequential-pipe model (§2.5) already hands each link
the previous link's output, so a transform link slots in without changing how
the chain is wired — it simply rewrites the sub-batch the next link sees.
+
+The same three-step pattern (proto arm → SDK sub-interface + constructor →
engine dispatch) covers any future kind (exporter, router, …); the
discriminator is always the set `oneof` arm, cross-checked by `Plugin.Kind()`.
+
+## Retention-Decision Timing & Trace Completeness
+
+Both the plugin gating policy (§1.1) and the per-stage retention plugin (§1.2)
operate on the **whole** trace, not on individual spans as they arrive:
`D_total` needs the earliest start and latest end, `has_error` scans all spans,
and tag matchers scan all spans. Spans of one trace, however, arrive at the
storage node anywhere from milliseconds to hours apart, and BanyanDB writes
each span into the segment selected by its **event time** (the `start_time`
tag), not its arrival time (`bany [...]
+
+### 3.1 When Retention Decisions Are Safe
+
+Retention decisions are therefore **not** made at write time. They are made by
the post-trace passes that re-assemble a trace by `trace_id`:
+
+- **In-merge filter, during Hot-phase compaction** — when
`PIPELINE_EVENT_MERGE` is enabled (the default), the plugin gating policy is
evaluated in-line during Hot-phase LSM compaction (Warm/Cold compactions stay
lossless). Compaction is part-count-driven, so it can fire while a trace is
still growing; per-trace drops are deferred until `traceMaxTs < now −
merge_grace` (§7.1).
+- **Plugin gating, at finalization** — when `PIPELINE_EVENT_FINALIZE` is
enabled, the plugin gating policy is evaluated once per **Hot** segment after
it has **settled**: its event-time window has closed *and* the event-time
watermark has advanced past it by the `finalize_grace` period (§7.3). BanyanDB
has no hard "segment sealed" event — `create()` pre-creates the next segment up
to an hour *before* the current window ends, and late data keeps routing to an
old segment by event time — s [...]
+- **Per-stage retention, at migration-out** — by the time a segment is
migrated to the next stage (≥ its stage's TTL, orders of magnitude longer than
`finalize_grace`), every trace it holds has settled, so per-stage plugin
evaluation is final.
+
+Two completeness caveats follow from event-time segmentation:
+
+1. **Spans arriving after finalization.** A span whose `start_time` falls in
an already-finalized segment is still accepted (while the segment is within
retention) but is missed by the finalization-time evaluation. The
`finalize_grace` period bounds how often this happens; a late span that arrives
after the gate has run does not retroactively change the verdict for
already-dropped traces.
+
+2. **Traces spanning multiple segments.** A trace longer than
`segment_interval`, or one that straddles a boundary, is physically split
across two segments, and each segment's pass evaluates only its own fragment
(`D_total` and the indicators are fragment-local). BanyanDB performs no
cross-segment trace assembly at the storage layer. Where trace duration is far
below `segment_interval` (e.g. the showcase's ≈2.8 s max trace against 1-day
segments) the fragment equals the whole trace and t [...]
+
+## Integration of the Retention System & Time-Aging System
+
+Distributed telemetry's diagnostic utility naturally decays as time passes.
The pipeline must therefore decide, per stage, what to keep — but it must
**not** try to pick the physical storage medium, because in BanyanDB the medium
is already determined by where a `LifecycleStage` is placed.
+
+### 4.1 Stage Placement vs. Rule-Governed Retention
+
+The physical medium is **not** fixed per tier by this design. Each
`LifecycleStage` (`common.v1.LifecycleStage`) is routed to a node group through
its `node_selector`; the medium is whatever hardware that node group runs, and
operators may deploy heterogeneous backings (for example two warm node groups
on different SSD classes). A common placement is Hot → local NVMe, Warm → SATA
SSD/HDD, Cold → object storage, but that mapping is an operator deployment
choice, not a rule this pipeline e [...]
+
+Because medium selection already belongs to stage placement, per-stage
`StageRule` retention **does not route traces to a medium**. Within whatever
stage currently holds a trace, the stage plugin only acts as a **retention
gate** — dictating whether a trace is retained when a partition is rewritten or
migrated, or discarded/pruned to reduce storage footprint.
+
+### 4.2 Stage-Stepped Retention via Rising Plugin Config
+
+"Aging" is expressed structurally: each `StageRule` carries its own retention
`plugins` chain, and the bar rises as data moves to colder stages by tightening
each chain's plugin **config**. **Semantics at a stage's migration-out
boundary:** the stage chain returns a keep/drop verdict per `trace_id` over the
vectorized batch (§2.5); a trace it drops is omitted from the part written for
the next stage. A `StageRule` with an empty `plugins` chain has no filtering
effect (every trace at that [...]
+
+A typical profile rises across **three points** — Hot-phase gating, then each
migration-out boundary — tightening the config at each step; for example, with
one shared retention plugin the operator configures per stage:
+
+- **Gating (Hot phase):** keep if duration ≥ threshold, OR error, OR a key tag
matches; probabilistically sample the healthy rest.
+- **Hot → Warm:** keep if duration ≥ 100 ms, OR error, OR a key tag matches
(PostgreSQL, ActiveMQ, etc.).
+- **Warm → Cold:** keep errors only — slow-but-healthy traces are dropped at
this boundary, so only errors enter Cold and persist for the full 30-day
retention. Cold itself does no sampling (Cold compactions are lossless).
+
+The plugin's verdict is deterministic in its inputs, so retention stays
self-explanatory: a trace is retained at Warm because the stage plugin's config
matched `db.type=PostgreSQL`, even though it was only 3 ms. Time enters only
through *when* a partition reaches each stage, which is governed by
`LifecycleStage.ttl` / `segment_interval`.
+
+## Downstream Lifecycle Actions Governed by Per-Stage Retention
+
+The time-aging engine has up to three retention points: LSM-merge filtering
during routine compaction (§5.1, gated by `PIPELINE_EVENT_MERGE` — on by
default), per-stage retention at the migration-out boundary (§5.2, always-on
when any `StageRule` carries a `plugins` chain), and segment-granularity
eviction (§5.3, unchanged).
+
+### 5.1 Compaction Rewrites (when `PIPELINE_EVENT_MERGE` is enabled)
+
+When `PIPELINE_EVENT_MERGE` is in `enabled_events` (the default), the data
node hooks the **Hot-phase** LSM merge stream so that traces failing the plugin
gating policy are omitted from the consolidated output (Warm and Cold
compactions are never filtered — they stay byte-for-byte lossless). Hot
compaction is then both a space-reclamation pass and an early retention pass —
drops happen incrementally as Hot parts age, rather than once-per-segment at
finalization. Per-trace drops are gated [...]
+
+When the event is disabled (`enabled_events` omits `PIPELINE_EVENT_MERGE`),
routine LSM compaction stays byte-for-byte lossless; retention falls entirely
to the finalization gate (§7.3, when `PIPELINE_EVENT_FINALIZE` is enabled) and
the per-stage retention plugin (§5.2).
+
+### 5.2 Partition-Level Tier Migration
+
+Both showcase trace groups (`sw_trace`, `sw_zipkinTrace`) declare a Hot → Warm
→ Cold lifecycle in `ResourceOpts.stages`: Hot holds the active window (1-day
ttl), Warm runs on `node_selector type=warm` (7-day ttl), and Cold runs on
`node_selector type=cold` with `close=true` (30-day ttl). When a Hot segment
matures past its 1-day ttl, the migration worker copies it to the Warm node
group; the medium behind each node group (e.g. NVMe → SATA SSD → object
storage) is the operator's deployme [...]
+
+- The migration engine runs the source stage's retention `plugins` chain
against every trace in the partition.
+
+- **No dynamic splitting is performed:** all retained data is written to the
next stage's node group. Traces the source stage's plugin drops are omitted
from the target write stream, reducing the physical size of the migrated
partition. With Scenario 6.1's hot retention plugin (config `min_duration:
100ms`) a healthy `/homepage` trace (2802 ms) is kept and migrates to Warm; a
PostgreSQL-touching trace is kept by the config's tag rule and migrates too; a
healthy fast trace (6 ms) matches [...]
+
+- When the partition matures past the Warm 7-day ttl, the Warm stage's
retention plugin runs at this Warm→Cold boundary — the gate that decides what
enters Cold (typically the strictest config; in Scenario 6.1 it keeps errors
only). Only the traces it keeps are written into the Cold parts and then
retained for the full Cold TTL; everything else is dropped here. Cold itself
does no further sampling — Cold compactions are lossless.
+
+### 5.3 Eviction: Per-Trace Drop During Rewrites, Segment-Granularity GC
+
+BanyanDB does not tombstone individual traces, and this design does not add
per-trace tombstones. The trace part layout is column-oriented (separate
`primary`, `spans`, and `tags` streams), so there is no per-trace delete
primitive. Eviction therefore happens at two distinct granularities:
+
+- **Per-trace removal is a side effect of the pre-migration rewrite (§7.2),
not GC.** A trace the current stage's retention plugin drops is simply omitted
from the reduced part produced for the next stage. No tombstone is written; the
trace's columns are not copied into the new part, and the space is reclaimed
when the old part is retired with the source segment.
+
+- **Whole-segment eviction stays the existing mechanism.** Reclaiming an
entire time segment remains governed by the current retention path — TTL expiry
plus the disk high/low watermarks (`banyand/trace/svc_standalone.go`,
`TopicDeleteExpiredTraceSegments`). Per-stage plugins do not place tombstones
and do not trigger segment deletion; they only change how much of a segment
migrates to the next stage.
+
+## Operational Scenario Configurations
+
+Below are two operational scenarios represented as complete
`TracePipelineConfig` instances, one per trace group found in the
skywalking-showcase cluster (`sw_trace` and `sw_zipkinTrace`). Group names,
schema names, tags, latencies, and lifecycle stages are taken from real data in
that cluster; the retention outcomes quoted are derived by evaluating the
per-stage plugins against real sampled traces.
+
+### 6.1 Scenario 1: SkyWalking-Native Segment Retention (`sw_trace`)
+
+- **Objective**: On the showcase `sw_trace` group (schema `segment`), keep
every error trace all the way through migrations for incident forensics, keep
genuinely slow requests through Warm, always keep traces that touch PostgreSQL
or the ActiveMQ `queue-songs-ping` queue at the Hot→Warm boundary, and
probabilistically sample the healthy remainder at gating. The group's real Hot
→ Warm → Cold stages (ttl 1d / 7d / 30d) carry rising retention strictness at
each migration-out boundary. The [...]
+
+- **Configuration JSON**:
+
+```Plain Text
+{
+ "metadata": { "group": "sw_trace", "name": "segment-tail-sampler" },
+ "enabled": true,
+ "stages": [
+ {
+ "stage": "hot",
+ "plugins": [
+ {
+ "name": "hot-retention",
+ "sampler": {
+ "path": "segment-stage-retention.so",
+ "abi_version": 1,
+ "config": {
+ "min_duration": "0.100s",
+ "keep_errors": true,
+ "keep_tag_rules": [
+ { "tag_key": "db.type", "equals": "PostgreSQL" },
+ { "tag_key": "mq.queue", "equals": "queue-songs-ping" }
+ ]
+ }
+ }
+ }
+ ]
+ },
+ {
+ "stage": "warm",
+ "plugins": [
+ {
+ "name": "warm-retention",
+ "sampler": {
+ "path": "segment-stage-retention.so",
+ "abi_version": 1,
+ "config": {
+ "keep_errors": true
+ }
+ }
+ }
+ ]
+ }
+ ],
+ "schema_names": ["segment"],
+ "plugins": [
+ {
+ "name": "segment-tail-sampler",
+ "sampler": {
+ "path": "segment-tail-sampler.so",
+ "abi_version": 1,
+ "config": {
+ "duration_threshold": "0.500s",
+ "keep_errors": true,
+ "healthy_sample_rate": 0.1,
+ "keep_tag_rules": [
+ { "tag_key": "db.type", "equals": "PostgreSQL" },
+ { "tag_key": "mq.queue", "equals": "queue-songs-ping" }
+ ]
+ }
+ }
+ }
+ ],
+ "enabled_events": ["PIPELINE_EVENT_MERGE", "PIPELINE_EVENT_FINALIZE"],
+ "merge_grace": "30s",
+ "finalize_grace": "300s"
+}
+```
+
+> Each plugin link's `config` is a structured `google.protobuf.Struct` set
directly in the pipeline config (not an opaque blob); the engine does not
interpret its keys — it serializes the object to JSON and hands the bytes to
the plugin's constructor, which unmarshals them into its own typed config. The
gating chain here is a single `sampler` link whose `Project()` returns
`Projection{ Tags: ["is_error", "db.type", "mq.queue"], SpanIDs: false, Spans:
false }`, so only those three tag col [...]
+
+- **Retention Dynamics** (real `sw_trace` traces; gating runs in Hot, owned by
the gating chain; per-stage retention owned by each stage's `plugins` chain):
+
+ - The error trace `5fcdb353-…` (`POST /test`, `agent::app`, `is_error=1`,
4 ms) is a sure-keep at gating: the gating plugin keeps it because its config
sets `keep_errors`. At Hot→Warm, the hot retention plugin keeps it (config
`keep_errors`) → migrated. At Warm→Cold, the warm retention plugin
(errors-only) keeps it → migrated into Cold, where it is retained for the full
30-day Cold TTL (Cold does no further sampling).
+
+ - The slow healthy trace `b03bb932-…` (`/homepage`, `agent::ui` →
`agent::frontend`, 2802 ms) is a sure-keep at gating: the gating plugin keeps
it because 2802 ms > 500 ms `duration_threshold` in its config (from `MaxTS -
MinTS`, no span decode). At Hot→Warm: the hot plugin keeps it (2802 ms ≥ its
100 ms `min_duration`) → kept through Warm. At Warm→Cold: the warm plugin
(errors-only) drops it (no error) → **dropped at Warm→Cold; never enters Cold**.
+
+ - A PostgreSQL-touching trace (e.g. `b31e4be8-…`, `agent::songs`
`UndertowDispatch`, 3 ms, `db.type=PostgreSQL`) is sure-kept at gating via the
gating plugin's `db.type` keep-tag-rule. At Hot→Warm: the hot plugin keeps it
(its config's `db.type` rule) → kept through Warm. At Warm→Cold: the warm
plugin (errors-only) drops it (no error) → **dropped at Warm→Cold; never enters
Cold**.
+
+ - A healthy fast trace such as `GET:/songs` at 6 ms (`agent::songs`,
`http.status_code=200`) is only kept at gating if the gating plugin's
`healthy_sample_rate` (`0.1`) hash retains it (deterministic `hash(trace_id) <
0.1`, since it matches no sure-keep rule). If kept, at Hot→Warm the hot plugin
drops it (6 ms < 100 ms, no error, no tag match) → **dropped at Hot→Warm
migration**.
+
+### 6.2 Scenario 2: Istio / Zipkin Mesh Edge Sampling (`sw_zipkinTrace`)
+
+- **Objective**: On the showcase `sw_zipkinTrace` group (schema
`zipkin_span`), apply a lower-cost edge sampler to the Istio service-mesh
spans. The Zipkin schema has no first-class `is_error` column, so server errors
are caught with a tag rule on the flattened `query` attributes rather than
`keep_errors`; mesh gateway spans are kept by tag. Targets the group's Warm and
Cold stages. As in §6.1 the gating verdict is owned by a native Go plugin chain
(a single sampler link, §2.5); per-stag [...]
+
+- **Configuration JSON**:
+
+```Plain Text
+{
+ "metadata": { "group": "sw_zipkinTrace", "name": "zipkin-edge-sampler" },
+ "enabled": true,
+ "stages": [
+ {
+ "stage": "warm",
+ "plugins": [
+ {
+ "name": "warm-retention",
+ "sampler": {
+ "path": "zipkin-stage-retention.so",
+ "abi_version": 1,
+ "config": {
+ "min_duration": "1s",
+ "keep_tag_rules": [
+ { "tag_key": "query", "regex": "http\\.status_code=5\\d\\d" },
+ { "tag_key": "local_endpoint_service_name", "equals":
"gateway.sample-services" }
+ ]
+ }
+ }
+ }
+ ]
+ }
+ ],
+ "schema_names": ["zipkin_span"],
+ "plugins": [
+ {
+ "name": "zipkin-edge-sampler",
+ "sampler": {
+ "path": "zipkin-edge-sampler.so",
+ "abi_version": 1,
+ "config": {
+ "duration_threshold": "1.000s",
+ "keep_errors": false,
+ "healthy_sample_rate": 0.05,
+ "keep_tag_rules": [
+ { "tag_key": "query", "regex": "http\\.status_code=5\\d\\d" }
+ ]
+ }
+ }
+ }
+ ],
+ "merge_grace": "30s"
+}
+```
+
+> Each link's `config` is a structured `google.protobuf.Struct` set directly
in the pipeline config (not an opaque blob); the engine does not interpret its
keys — it serializes the object to JSON and hands the bytes to the plugin's
constructor, which unmarshals them into its own typed config. The gating
chain's single `sampler` link `Project()` returns `Projection{ Tags: ["query"],
SpanIDs: false, Spans: false }`, so only the `query` tag column (plus the
intrinsic `trace_id` / `MinTS` / [...]
+
+- **Retention Dynamics** (real `sw_zipkinTrace` spans; gating runs in Hot,
owned by the gating chain; per-stage retention owned by the Warm `plugins`
chain):
+
+ - The slowest mesh call observed — `trace_id 0961e077…`, a 30.7 s
`istio.skywalking-showcase` client span to Grafana's live-WS endpoint
(`http.status_code=101`) — is a sure-keep at gating: the gating plugin keeps it
because 30.7 s > 1 s `duration_threshold` in its config (from `MaxTS - MinTS`,
no span decode). At Warm→Cold: the warm retention plugin keeps it (30.7 s ≥ its
1 s `min_duration`) → migrated into Cold, where it is retained for the full
Cold TTL (Cold does no further sampling).
+
+ - A gateway span on `gateway.sample-services` at the mesh p90 (~19 ms):
the gating plugin passes it only via the `0.05` `healthy_sample_rate` hash (19
ms < 1 s, no 5xx, so it matches no sure-keep rule). If kept, at Warm→Cold the
warm plugin keeps it (19 ms < 1 s, but its `local_endpoint_service_name =
gateway.sample-services` rule matches) → migrated into Cold, where it is
retained for the full Cold TTL.
+
+ - A typical p50 mesh span (~2 ms) is kept at gating only via the gating
plugin's `0.05` sample. At Warm→Cold the warm plugin drops it (2 ms < 1 s, not
a gateway span, no 5xx) → **dropped at Warm→Cold**.
+
+ - Any span carrying a `5xx` in its `query` attributes is sure-kept at
gating (Hot) by the gating plugin's `query` keep-tag-rule, and kept at
Warm→Cold by the warm plugin's `query` rule → migrated into Cold and preserved
for the full Cold TTL (Cold does no further sampling). Whole Warm/Cold segments
are still reclaimed on their own schedule by `LifecycleStage.ttl`.
+
+## Post-Trace Data Flow Architecture
+
+The execution of the post-trace pipeline occurs natively on BanyanDB Data
Nodes via three distinct pathways, matching storage lifecycle transitions.
+
+```mermaid
+flowchart TD
+ RI["Raw Ingestion"] --> LN["Liaison Node"] --> FW["Fast Local Write"] -->
MB["Data Node Memory Buffer"]
+ MB --> PL["Storage Node Post-Trace Processing Loops"]
+ PL --> H71["7.1 LSM Compaction Merge Filter Hook<br/>gated by
PIPELINE_EVENT_MERGE; per-trace merge_grace defers drops"]
+ PL --> H72["7.2 Pre-Migration Filter Rewrite<br/>rewrite segment to a
reduced part, then migrate it unchanged"]
+ PL --> H73["7.3 Scheduled Final Filter on a Settled Segment<br/>gated by
PIPELINE_EVENT_FINALIZE; watermark + finalize_grace"]
+```
+
+### 7.1 LSM Compaction Merge Phase Loop (when `PIPELINE_EVENT_MERGE` is
enabled)
+
+During **Hot-phase** LSM file compaction, immature part files containing
segmented trace blocks are merged into unified, consolidated parts. The merge
loop already streams blocks grouped by `trace_id`, so it is the ideal place to
evaluate traces without a separate read pass — *provided* drops are deferred
until each trace is mature. (Warm and Cold compactions run the same merger with
the gating hook disabled, so they stay lossless.)
+
+> **Design decision (accepted):** The trace LSM merger is refactored to expose
a **trace-filter hook**. When `PIPELINE_EVENT_MERGE` is in `enabled_events` and
a `TracePipelineConfig` targets the schema being compacted, blocks belonging to
traces failing the plugin gating policy are omitted from the consolidated
output: mature traces are accumulated into a projected `TraceBatch` (§2.5) and
the plugin's boolean keep-mask drives the per-`trace_id` drop. With
`PIPELINE_EVENT_MERGE` disabled [...]
+
+#### Integration point in the real merger
+
+The hook is injected into `mergeBlocks` (`banyand/trace/merger.go`), which is
the single point where every emitted block flows to the block writer. Both
write paths are wrapped by the filter:
+
+- `blockWriter.mustWriteRawBlock` — the fast path that copies a single-block
trace as raw bytes without decoding.
+- `blockWriter.mustWriteBlock` — the slow path that emits a decoded,
accumulated block for a `trace_id`.
+
+```mermaid
+flowchart TD
+ IP["Immature Parts"] --> BR["blockReader.nextBlockMetadata<br/>streams
blocks ordered by trace_id"]
+ BR --> PA["Per-trace assembly<br/>existing pending-block accumulation in
mergeBlocks"]
+ PA --> M{"Mature?<br/>traceMaxTs (timestampsMetadata.max) older than now −
merge_grace"}
+ M -->|"NO (trace may still grow)"| RET["RETAIN unchanged"]
+ M -->|"YES (stopped growing)"| CHK["plugin gating policy check"]
+ CHK -->|"KEEP"| WB["mustWriteBlock / mustWriteRawBlock"]
+ CHK -->|"DROP"| SK["blocks skipped; spans, tags, primary entries never
written<br/>(space reclaimed in-stream, no delete mutation)"]
+```
+
+#### Filter contract and what the merge already gives us for free
+
+1. **Duration is free; projected columns cost a decode.** Block metadata
carries `timestampsMetadata{min,max}` (`block_metadata.go`), so `D_total` is
derivable on the raw fast path without unmarshaling — duration checks are free.
A plugin that projects tag columns (`Project().Tags`) — or opts into span
bodies (`Project().Spans`) — forces the decoded slow path (`loadBlockData`) for
those columns; a plugin that needs neither stays on the raw fast path.
+
+2. **Decisions are per `trace_id`, not per block.** A single trace may be
emitted as multiple blocks when its accumulated span size crosses
`maxUncompressedSpanSize`. The filter computes one keep/drop verdict per
`trace_id` and applies it to every block carrying that id, so a trace is never
partially written.
+
+3. **A trace is dropped only after it stops growing (per-trace
`merge_grace`).** Compaction is part-count-driven (`getPartsToMerge`), not
time-driven — so a merge routinely runs on the active write window while a
trace's remaining spans are still seconds away. Dropping such a trace on its
partial spans would orphan the late ones. The filter therefore evaluates a
trace only once its latest span timestamp is older than `now − merge_grace`
(the trace's max timestamp is read for free from `t [...]
+
+4. **Derived part state reflects only retained traces.** When a trace is
dropped, its entries are excluded from `partMetadata` counts, the
`traceIDFilter` bloom filter (`mustWriteTraceIDFilter`), and the `tagType` set
written at `Flush`. The filter runs before these are finalized so the
consolidated part stays self-consistent.
+
+5. **Secondary indexes must be pruned in lockstep — this is net-new engine
work.** Today `mergePartsThenSendIntroduction` merges sidx parts via
`sidxInstance.Merge(closeCh, partIDMap, newPartID)` — a **part-ID-based** merge
with no per-`trace_id` predicate, so a core trace drop would leave **dangling
sidx entries** pointing at traces no longer in the reduced part. Closing this
gap is a required, not-yet-existing contract: the trace filter must surface the
set of dropped `trace_id`s, and [...]
+
+6. **Drops are final and the merge is crash-safe.** Because a trace is dropped
only after `merge_grace` has elapsed since its last span (point 3), the verdict
acts on a trace that has stopped growing, so the drop is final rather than
premature. The merge writes the new part atomically and only retires source
parts after the introduction is applied, so a crash mid-merge leaves the
immature parts intact for retry; re-running compaction on an already-filtered
part is a no-op.
+
+### 7.2 Hot-to-Warm Tier Migration (Pre-Migration Filter Rewrite)
+
+When the lifecycle agent migrates a segment to the next stage, the existing
transfer is an opaque byte stream of part directories — it is not a per-trace
filter. Today's path walks shards via `traceMigrationVisitor.VisitShard`
(`banyand/backup/lifecycle/trace_migration_visitor.go`), reads source parts
with `generateAllPartData` (which opens part files via
`trace.CreatePartFileReaderFromPath`, `banyand/trace/part.go`), and ships them
with `streamPartToTargetShard` over a chunked sync clie [...]
+
+> **Design decision (accepted):** Do **not** filter inside the byte-copy
transfer. Instead, run a **pre-migration filter pass** on the source segment
that reads the existing parts through the pipeline and writes a **new, reduced
part**; the unchanged migration then streams that new part to the next stage.
This keeps "migration" a faithful byte copy and isolates all lossy behaviour in
a dedicated rewrite step.
+
+```mermaid
+flowchart TD
+ HT["Hot Tier segment (settled; at least stage TTL old)"]
+ HT --> PR["Pre-migration rewrite pass — new visitor over the segment's
parts<br/>read each part: blockReader over
CreatePartFileReaderFromPath<br/>apply pipeline filter (same trace-filter
contract as §7.1)<br/>write retained traces to a NEW reduced part via
blockWriter"]
+ PR --> RP["Reduced part on Hot tier"]
+ RP --> EM["Existing migration (unchanged)<br/>VisitShard →
generateAllPartData → streamPartToTargetShard<br/>opaque chunked byte copy of
the reduced part"]
+ EM --> WT["Warm tier segment"]
+```
+
+1. The pre-migration pass attaches between `generateAllPartData` and
`streamPartToTargetShard` in `VisitShard`: each source part is read with
`blockReader`/`blockWriter`, the source stage's retention `plugins` chain runs
per `trace_id`, and a reduced part is produced in place of the source.
+
+2. The filter contract is per-`trace_id` retain/drop verdicts, with dropped
traces excluded from `partMetadata`, the `traceIDFilter` bloom filter,
`tagType`, and the parallel sidx parts — the latter via the same trace-ID-aware
sidx rewrite required for the merge path (§7.1, point 5), which is net-new
engine work. Because a segment only migrates once it is at least its stage TTL
old (e.g. a Hot segment after ~1 day), its event-time window closed long ago
and any late arrivals have settled [...]
+
+3. The reduced part — not the original — is what `streamPartToTargetShard`
transfers. The migration protocol, replica fan-out, and destination
registration are untouched, so the considerable savings come purely from
shipping fewer bytes to the next tier.
+
+### 7.3 Scheduled Final Filter on a Settled Segment
+
+The plugin gating pass must run only once a Hot segment is unlikely to gain
more spans — otherwise a trace whose remaining spans are still arriving could
be judged prematurely. (Finalization, like the in-merge filter, runs only in
the Hot phase.) The catch: **BanyanDB has no "segment sealed" signal** to
trigger this, so the pass is driven by an event-time watermark and a grace
period instead.
+
+Why there is no seal event (grounding in
`banyand/internal/storage/rotation.go`):
+
+- **Writes route by event time, so old segments keep receiving data.** A span
is written into the segment whose window contains its `start_time`
(`banyand/trace/write_standalone.go` → `CreateSegmentIfNotExist(time.Unix(0,
ts))`), not its arrival time. A span that arrives hours late still lands in its
old event-time segment for as long as that segment exists.
+- **Segment creation is look-ahead, not rollover.** The rotation loop
pre-creates the *next* segment up to `creationGap` (1 hour) **before** the
current window ends: it fires `segmentController.create` only while `0 <
latest.End - eventTime <= newSegmentTimeGap`. At that moment the current
segment is still the active write target, so `create()` cannot mean "the
previous segment is sealed."
+- **`closeIdleSegments` is not a seal.** It only releases idle in-memory
handles after an idle timeout (`banyand/internal/storage/segment.go`) and
reopens them on the next access; it says nothing about window completeness.
+- **The only definitive boundary is TTL removal** (`retentionTask`, cron `5
0`), which deletes the whole segment — far too late to be a finalization
trigger.
+
+> **Design decision (accepted):** Drive the gating pass from an **event-time
watermark plus a settling grace period**, not from a (non-existent) seal event.
The rotation path already tracks the maximum observed event time
(`latestTickTime`, fed by `Tick`, `rotation.go`). A segment is treated as
**settled** once `watermark > segment.End + finalize_grace`, where
`finalize_grace` is `TracePipelineConfig.finalize_grace` — a configured
expected-late-arrival lag (engine default `5m` if unset). [...]
+
+```mermaid
+flowchart TD
+ WM["Event-time watermark (latestTickTime) advances"]
+ WM -->|"post-trace scheduler cron tick"| SEG["For each segment with
watermark past segment.End + finalize_grace, not yet finalized<br/>walk each
(segment, shard) tsTable's parts via the trace Visitor<br/>apply pipeline
filter once: trace treated as complete, verdict final"]
+ SEG --> RW["Rewrite to reduced parts + mark segment finalized"]
+```
+
+1. The scheduler runs the pipeline's plugin gating policy across the settled
segment's per-shard `tsTable` parts (a `tsTable` is scoped per `(segment,
shard)`).
+
+2. Because the watermark is past `segment.End + finalize_grace`, the segment
is very unlikely to gain more spans, so the verdict is stable: this pass
decides whether a trace survives at all. Surviving traces continue through the
lifecycle and are filtered again at each migration-out boundary by the relevant
`StageRule` (§7.2).
+
+3. The `finalize_grace` period trades latency for completeness: a larger
`finalize_grace` catches more late spans before finalizing but delays space
reclamation. The cron tick doubles as a catch-up sweep — after a node restart
it still finds settled-but-unfinalized segments and applies the filter, after
which normal retention/GC proceeds.
+
+## Step-by-Step Execution Sequence
+
+To illustrate the relationship, here is the complete processing loop executed
by the post-trace engine on a BanyanDB Data Node across a trace's lifetime
(Example trace from Scenario 6.1: `5fcdb353-…`, `POST /test` on `agent::app`,
`is_error=1`, with `enabled_events = [PIPELINE_EVENT_MERGE,
PIPELINE_EVENT_FINALIZE]`):
+
+```mermaid
+flowchart TD
+ subgraph A["A. In-merge filter at HOT-PHASE LSM COMPACTION
(PIPELINE_EVENT_MERGE enabled)"]
+ A1["1. mergeBlocks streams blocks ordered by trace_id
(banyand/trace/merger.go).<br/>Maturity check: traceMaxTs (=
timestampsMetadata.max) older than now − merge_grace (30s)?"]
+ A1 -->|"NO"| A2["Pass blocks through unchanged; defer the verdict"]
+ A1 -->|"YES (stopped growing)"| A3["Engine builds a projected
TraceBatch; plugin Decide returns a keep-mask.<br/>The §6.1
segment-tail-sampler keeps this trace because is_error is set (config keeps
errors);<br/>a healthy trace below the 0.500s threshold with no matching tag
would fall to the 0.1 sample.<br/>Verdict for 5fcdb353-…: KEEP (drops here
reclaim space during routine compaction)"]
+ end
+ subgraph B["B. Plugin gating pass at HOT FINALIZATION
(PIPELINE_EVENT_FINALIZE enabled, once per settled Hot segment)"]
+ B2["2. Post-trace scheduler tick observes a segment with watermark
past segment.End + finalize_grace (300s), not yet finalized"]
+ B2 --> B3["3. Pick the active TracePipelineConfig for (group, schema,
stage); match targeting fields (schema_names / schema_name_regex). No match?
Skip."]
+ B3 --> B4["4. For each surviving trace_id (not already dropped at A),
evaluate the plugin gating policy — same as step 1.<br/>Surviving traces
continue into the stage lifecycle; failures are dropped from the segment."]
+ end
+ subgraph C["C. Per-stage retention at MIGRATION-OUT (once per stage
boundary, when a StageRule has a plugin)"]
+ C5["5. Source stage migrates to the next stage (e.g. Hot → Warm after
the 1-day TTL). Pre-migration rewrite reads each source part with blockReader
and runs the source stage's retention plugin, returning a keep-mask per
trace_id:<br/>Hot → Warm (hot StageRule.plugins, config min_duration 0.100s):
duration ≥ 0.100s? (no, only 4ms); keep_errors AND is_error? YES → Keep,
migrate<br/>Warm → Cold (warm StageRule.plugins, config keep_errors only):
keep_errors AND is_error? YES → Keep, [...]
+ C5 --> C6["6. RETENTION OUTCOME: the error trace survives the gating
plugin at A and B, then the stage retention plugin keeps it (keep_errors) at
every migration boundary in C.<br/>Retained Hot → Warm → Cold; written into the
Cold (type=cold) parts. Cold stage TTL = 30 days (per the group's
ResourceOpts.stages)."]
+ end
+ A3 --> B2
+ B4 --> C5
+```
+
+## References
+
+- OpenTelemetry Collector — Tail Sampling Processor (`tailsamplingprocessor`):
the keep-all-errors / latency / probabilistic-policy model a typical sampler
plugin implements.
+- L. Zhang, Z. Xie, V. Anand, Y. Vigfusson, J. Mace. "The Benefit of
Hindsight: Tracing Edge-Cases in Distributed Systems." NSDI '23 — motivation
for post-hoc / retroactive trace retention.
+- BanyanDB schema and query model:
`api/proto/banyandb/common/v1/common.proto`,
`api/proto/banyandb/trace/v1/query.proto`.
+- Example data source: the `skywalking-showcase` BanyanDB cluster — groups
`sw_trace` (schema `segment`) and `sw_zipkinTrace` (schema `zipkin_span`). All
scenario group/schema/tag names, lifecycle stages, latencies, and retention
outcomes are derived from real traces sampled from this cluster via the
`/v1/trace/data` API.
+- Native plugin sampler (§2.5): the vectorized batch reuses BanyanDB's own
native trace columnar block (`banyand/trace/block.go` — `block`/`tag`) and the
existing tag projection (`blockMetadata.tagProjection`,
`pkg/query/model.TagProjection`), not a new columnar form. External grounding:
Go [`plugin` package docs](https://pkg.go.dev/plugin) (toolchain/build
version-lock); Apache Arrow [columnar
format](https://arrow.apache.org/docs/format/Columnar.html) and Apache
DataFusion [scalar UDFs [...]
diff --git a/pkg/pipeline/sdk/_example/segment-tail-sampler/README.md
b/pkg/pipeline/sdk/_example/segment-tail-sampler/README.md
new file mode 100644
index 000000000..337746726
--- /dev/null
+++ b/pkg/pipeline/sdk/_example/segment-tail-sampler/README.md
@@ -0,0 +1,67 @@
+# segment-tail-sampler (reference plugin)
+
+A worked example of a BanyanDB post-trace sampler plugin built on
+[`pkg/pipeline/sdk`](../../). It implements the Scenario 6.1 (`sw_trace`)
sampler
+from
[`docs/design/post-trace-pipeline.md`](../../../../../docs/design/post-trace-pipeline.md)
+and exists to show plugin authors three things:
+
+1. **Read the proto config.** `SamplerPlugin.config` is a
+ `google.protobuf.Struct`. The engine serializes it to canonical JSON and
+ passes it to `NewSampler([]byte)`, which unmarshals it into a typed
`config`.
+2. **Declare a projection.** `Project()` returns only the tag columns the
+ verdict reads (the error tag + each rule's tag key), and opts into the
+ span-id column only when a span-count rule is configured. Span bodies are
+ never read, so the merge raw fast path is preserved.
+3. **Extract tags and spans.** `Decide` decodes tag values by their
+ `pbv1.ValueType` (via `TagColumn.At`) and reads the span-id column
+ (`TraceBlock.Len` / `SpanIDs`).
+
+It is the `sampler` kind of the generic `Plugin`: because `sdk.Sampler` embeds
+`sdk.Plugin`, the sampler also implements `Kind() sdk.Kind` (returning
+`sdk.KindSampler`). In a `TracePipelineConfig` or `StageRule` it is one link in
+an ordered `plugins` chain (a sequential pipe); a chain of samplers keeps a
+trace only if every link keeps it.
+
+## Keep logic
+
+A trace is kept when **any** sure-keep rule matches; otherwise a deterministic
+hash of `trace_id` admits a fraction of the healthy remainder:
+
+1. `duration ≥ duration_threshold` (free from the intrinsic `MinTS`/`MaxTS`).
+2. `keep_errors` and the error tag is truthy on any span.
+3. any `keep_tag_rules` entry matches any span (`exists` / `equals` / `in` /
`regex`).
+4. `min_span_count` configured and the trace has at least that many spans.
+5. else `hash(trace_id) < healthy_sample_rate`.
+
+## Config
+
+Set as the `config` `Struct` of a `sampler` plugin link in the
+`TracePipelineConfig`'s `plugins` chain; shown here as the JSON the engine
hands
+to the plugin:
+
+```json
+{
+ "duration_threshold": "0.500s",
+ "keep_errors": true,
+ "error_tag": "is_error",
+ "healthy_sample_rate": 0.1,
+ "keep_tag_rules": [
+ { "tag_key": "db.type", "equals": "PostgreSQL" },
+ { "tag_key": "mq.queue", "equals": "queue-songs-ping" }
+ ]
+}
+```
+
+## Build
+
+Build with the **same Go toolchain and the same pinned `pkg/pipeline/sdk`** as
+the running data node (Go's `plugin` package requires it):
+
+```sh
+go build -buildmode=plugin -trimpath \
+ -o segment-tail-sampler.so \
+ ./pkg/pipeline/sdk/_example/segment-tail-sampler
+```
+
+Then place `segment-tail-sampler.so` in the data node's trusted plugin
+directory and reference it by `path` in `SamplerPlugin`.
diff --git a/pkg/pipeline/sdk/_example/segment-tail-sampler/main.go
b/pkg/pipeline/sdk/_example/segment-tail-sampler/main.go
new file mode 100644
index 000000000..05f4748bc
--- /dev/null
+++ b/pkg/pipeline/sdk/_example/segment-tail-sampler/main.go
@@ -0,0 +1,321 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Command segment-tail-sampler is the reference post-trace sampler plugin from
+// docs/design/post-trace-pipeline.md §6.1. It is a worked example of the
+// pkg/pipeline/sdk contract and illustrates the three things a real plugin
+// must do:
+//
+// 1. Parse the operator-supplied config. SamplerPlugin.config is a
+// google.protobuf.Struct; the engine serializes it to canonical JSON and
+// hands it to NewSampler as bytes, which this plugin unmarshals into its
+// own typed config.
+// 2. Declare a projection (Project) so the engine materializes only the
+// columns the verdict reads — here a handful of tag columns, plus the
+// span-id column only when a span-count rule is configured.
+// 3. Extract tags and spans from the vectorized batch in Decide — decoding
tag
+// values by their ValueType and reading the span-id column.
+//
+// Build it as a Go plugin (it is deliberately under an _example directory so
+// `go build ./...` and the linters skip it):
+//
+// go build -buildmode=plugin -trimpath \
+// -o segment-tail-sampler.so \
+// ./pkg/pipeline/sdk/_example/segment-tail-sampler
+//
+// It must be built with the same Go toolchain and the same pinned
+// pkg/pipeline/sdk as the running data node (see §2.5).
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "hash/fnv"
+ "regexp"
+ "sort"
+ "strconv"
+ "time"
+
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/pipeline/sdk"
+)
+
+// ABIVersion re-exports the SDK ABI version. The engine refuses to load the
+// plugin unless this equals its own compiled sdk.ABIVersion.
+var ABIVersion = sdk.ABIVersion
+
+// tagRule is one sure-keep tag predicate. Exactly one matcher field is
honored,
+// checked in the order exists, equals, in, regex.
+type tagRule struct {
+ re *regexp.Regexp
+ In []string `json:"in"`
+ Regex string `json:"regex"`
+ TagKey string `json:"tag_key"`
+ Equals string `json:"equals"`
+ Exists bool `json:"exists"`
+}
+
+// config is the JSON shape the operator sets in SamplerPlugin.config.
+type config struct {
+ DurationThreshold string `json:"duration_threshold"`
+ ErrorTag string `json:"error_tag"`
+ KeepTagRules []tagRule `json:"keep_tag_rules"`
+ HealthySampleRate float64 `json:"healthy_sample_rate"`
+ MinSpanCount int `json:"min_span_count"`
+ KeepErrors bool `json:"keep_errors"`
+}
+
+// segmentTailSampler keeps a trace when any sure-keep rule matches, and
+// otherwise admits a deterministic fraction of the healthy remainder.
+type segmentTailSampler struct {
+ errorTag string
+ rules []tagRule
+ requiredTags []string
+ durationThreshold time.Duration
+ healthySampleRate float64
+ minSpanCount int
+ keepErrors bool
+ wantSpanIDs bool
+}
+
+// NewSampler is the constructor symbol the engine looks up. It parses and
+// validates the operator config, compiles any regex matchers, and computes the
+// projection. A returned error rejects the plugin at admission.
+func NewSampler(configJSON []byte) (sdk.Sampler, error) {
+ var c config
+ if len(configJSON) > 0 {
+ if err := json.Unmarshal(configJSON, &c); err != nil {
+ return nil, fmt.Errorf("segment-tail-sampler: invalid
config JSON: %w", err)
+ }
+ }
+ if c.HealthySampleRate < 0 || c.HealthySampleRate > 1 {
+ return nil, fmt.Errorf("segment-tail-sampler:
healthy_sample_rate %v out of [0,1]", c.HealthySampleRate)
+ }
+ s := &segmentTailSampler{
+ rules: c.KeepTagRules,
+ healthySampleRate: c.HealthySampleRate,
+ keepErrors: c.KeepErrors,
+ errorTag: c.ErrorTag,
+ minSpanCount: c.MinSpanCount,
+ }
+ if s.errorTag == "" {
+ s.errorTag = "is_error"
+ }
+ if c.DurationThreshold != "" {
+ d, err := time.ParseDuration(c.DurationThreshold)
+ if err != nil {
+ return nil, fmt.Errorf("segment-tail-sampler: invalid
duration_threshold %q: %w", c.DurationThreshold, err)
+ }
+ if d <= 0 {
+ return nil, fmt.Errorf("segment-tail-sampler:
duration_threshold must be positive, got %v", d)
+ }
+ s.durationThreshold = d
+ }
+
+ // Build the projection: the error tag (when keep_errors is set) and
every
+ // rule's tag key. Compile regex matchers once, here, not per batch.
+ tagSet := make(map[string]struct{})
+ if s.keepErrors {
+ tagSet[s.errorTag] = struct{}{}
+ }
+ for i := range s.rules {
+ if s.rules[i].TagKey == "" {
+ return nil, fmt.Errorf("segment-tail-sampler:
keep_tag_rules[%d] has empty tag_key", i)
+ }
+ if s.rules[i].Regex != "" {
+ re, err := regexp.Compile(s.rules[i].Regex)
+ if err != nil {
+ return nil, fmt.Errorf("segment-tail-sampler:
keep_tag_rules[%d] bad regex %q: %w", i, s.rules[i].Regex, err)
+ }
+ s.rules[i].re = re
+ }
+ tagSet[s.rules[i].TagKey] = struct{}{}
+ }
+ s.requiredTags = make([]string, 0, len(tagSet))
+ for k := range tagSet {
+ s.requiredTags = append(s.requiredTags, k)
+ }
+ // Stable order keeps Project() reproducible across runs (Go map
iteration is
+ // randomized); the engine treats Tags as a set, but logs, tests, and
caches
+ // benefit from determinism.
+ sort.Strings(s.requiredTags)
+ // A span-count rule reads the span-id column, so project it on demand.
+ s.wantSpanIDs = s.minSpanCount > 0
+ return s, nil
+}
+
+// Kind reports the sampler kind, satisfying the generic sdk.Plugin interface
+// that sdk.Sampler embeds.
+func (s *segmentTailSampler) Kind() sdk.Kind { return sdk.KindSampler }
+
+// Project declares the columns the verdict reads: the rule/error tag columns,
+// plus the span-id column only when a span-count rule is configured. Span
+// bodies are never read, so Spans stays false and the merge raw fast path is
+// preserved.
+func (s *segmentTailSampler) Project() sdk.Projection {
+ return sdk.Projection{Tags: s.requiredTags, SpanIDs: s.wantSpanIDs}
+}
+
+// Decide returns a keep-mask aligned to batch.Traces. The batch is read-only.
+func (s *segmentTailSampler) Decide(batch *sdk.TraceBatch) (sdk.Verdict,
error) {
+ keep := make([]bool, len(batch.Traces))
+ for i := range batch.Traces {
+ k, err := s.keepTrace(&batch.Traces[i])
+ if err != nil {
+ return sdk.Verdict{}, err
+ }
+ keep[i] = k
+ }
+ return sdk.Verdict{Keep: keep}, nil
+}
+
+// Close releases resources; this sampler holds none.
+func (s *segmentTailSampler) Close() error { return nil }
+
+// keepTrace applies the sure-keep rules, then the deterministic healthy
sample.
+func (s *segmentTailSampler) keepTrace(b *sdk.TraceBlock) (bool, error) {
+ // Duration is free from the intrinsic MinTS/MaxTS — no decode.
+ if s.durationThreshold > 0 && time.Duration(b.MaxTS-b.MinTS) >=
s.durationThreshold {
+ return true, nil
+ }
+ // Error keep: decode the error tag column and look for any truthy row.
+ if s.keepErrors {
+ hit, err := s.hasError(b)
+ if err != nil {
+ return false, err
+ }
+ if hit {
+ return true, nil
+ }
+ }
+ // Sure-keep tag rules.
+ for i := range s.rules {
+ hit, err := matchRule(b, &s.rules[i])
+ if err != nil {
+ return false, err
+ }
+ if hit {
+ return true, nil
+ }
+ }
+ // Span-count rule: read the span-id column the projection requested.
+ if s.minSpanCount > 0 && b.Len() >= s.minSpanCount {
+ return true, nil
+ }
+ // Healthy remainder: deterministic hash(trace_id) < rate, stable across
+ // re-evaluation at merge and finalization.
+ if s.healthySampleRate > 0 && sampleFraction(b.TraceID) <
s.healthySampleRate {
+ return true, nil
+ }
+ return false, nil
+}
+
+// hasError reports whether the error tag is truthy on any span row.
+func (s *segmentTailSampler) hasError(b *sdk.TraceBlock) (bool, error) {
+ col := b.Tag(s.errorTag)
+ if col == nil {
+ return false, nil
+ }
+ for row := range col.Values {
+ v, err := col.At(row)
+ if err != nil {
+ return false, err
+ }
+ if v.IsNull() {
+ continue
+ }
+ switch v.ValueType() {
+ case pbv1.ValueTypeInt64:
+ if v.Int64() != 0 {
+ return true, nil
+ }
+ case pbv1.ValueTypeStr:
+ if str := v.Str(); str == "true" || str == "1" {
+ return true, nil
+ }
+ default:
+ }
+ }
+ return false, nil
+}
+
+// matchRule reports whether the rule matches any span row of the trace.
+func matchRule(b *sdk.TraceBlock, r *tagRule) (bool, error) {
+ col := b.Tag(r.TagKey)
+ if col == nil {
+ return false, nil
+ }
+ for row := range col.Values {
+ v, err := col.At(row)
+ if err != nil {
+ return false, err
+ }
+ if v.IsNull() {
+ continue
+ }
+ if r.Exists {
+ return true, nil
+ }
+ str := stringOf(v)
+ switch {
+ case r.Equals != "":
+ if str == r.Equals {
+ return true, nil
+ }
+ case len(r.In) > 0:
+ for _, candidate := range r.In {
+ if str == candidate {
+ return true, nil
+ }
+ }
+ case r.re != nil:
+ if r.re.MatchString(str) {
+ return true, nil
+ }
+ }
+ }
+ return false, nil
+}
+
+// stringOf renders a decoded value as a string for matching against the rule's
+// string predicates.
+func stringOf(v sdk.Value) string {
+ switch v.ValueType() {
+ case pbv1.ValueTypeStr:
+ return v.Str()
+ case pbv1.ValueTypeInt64, pbv1.ValueTypeTimestamp:
+ return strconv.FormatInt(v.Int64(), 10)
+ case pbv1.ValueTypeFloat64:
+ return strconv.FormatFloat(v.Float64(), 'g', -1, 64)
+ case pbv1.ValueTypeBinaryData:
+ return string(v.Bytes())
+ default:
+ return ""
+ }
+}
+
+// sampleFraction maps a trace_id to a stable fraction in [0,1) via FNV-1a, so
+// the keep decision is deterministic and reproducible across passes. The top
53
+// bits fill a float64 mantissa exactly (the technique math/rand uses), so the
+// result is strictly below 1 and a healthy_sample_rate of 1.0 keeps every
trace.
+func sampleFraction(traceID string) float64 {
+ h := fnv.New64a()
+ _, _ = h.Write([]byte(traceID))
+ return float64(h.Sum64()>>11) / (1 << 53)
+}
+
+func main() {}
diff --git a/pkg/pipeline/sdk/decode.go b/pkg/pipeline/sdk/decode.go
new file mode 100644
index 000000000..101cf695f
--- /dev/null
+++ b/pkg/pipeline/sdk/decode.go
@@ -0,0 +1,129 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sdk
+
+import (
+ "fmt"
+
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// Value is a single decoded tag value. The accessor matching ValueType returns
+// the decoded datum; the others return their zero value. A nil raw value
+// decodes to a null Value (IsNull reports true).
+type Value struct {
+ str string
+ bytes []byte
+ strArr []string
+ intArr []int64
+ int64Val int64
+ floatVal float64
+ valueType pbv1.ValueType
+ null bool
+}
+
+// IsNull reports whether the tag was absent on this row.
+func (v Value) IsNull() bool { return v.null }
+
+// ValueType returns the type tag of the decoded value.
+func (v Value) ValueType() pbv1.ValueType { return v.valueType }
+
+// Str returns the string value (valid for ValueTypeStr).
+func (v Value) Str() string { return v.str }
+
+// Int64 returns the integer value (valid for ValueTypeInt64 and, as unix
+// nanoseconds, ValueTypeTimestamp).
+func (v Value) Int64() int64 { return v.int64Val }
+
+// Float64 returns the float value (valid for ValueTypeFloat64).
+func (v Value) Float64() float64 { return v.floatVal }
+
+// Bytes returns the raw value (valid for ValueTypeBinaryData).
+func (v Value) Bytes() []byte { return v.bytes }
+
+// StrArr returns the string-array value (valid for ValueTypeStrArr).
+func (v Value) StrArr() []string { return v.strArr }
+
+// Int64Arr returns the integer-array value (valid for ValueTypeInt64Arr).
+func (v Value) Int64Arr() []int64 { return v.intArr }
+
+// At decodes the value at the given span row. A nil element decodes to a null
+// Value. It returns an error if row is out of range.
+func (c *TagColumn) At(row int) (Value, error) {
+ if row < 0 || row >= len(c.Values) {
+ return Value{}, fmt.Errorf("tag %q: row %d out of range
[0,%d)", c.Name, row, len(c.Values))
+ }
+ return DecodeTagValue(c.ValueType, c.Values[row])
+}
+
+// DecodeTagValue decodes one marshaled tag value, as stored in the native
trace
+// block, into a typed Value. It mirrors the engine's own per-row decode so a
+// plugin never needs to import banyand/trace internals. A nil raw value yields
+// a null Value.
+func DecodeTagValue(valueType pbv1.ValueType, raw []byte) (Value, error) {
+ if raw == nil {
+ return Value{valueType: valueType, null: true}, nil
+ }
+ switch valueType {
+ case pbv1.ValueTypeStr:
+ return Value{valueType: valueType, str: string(raw)}, nil
+ case pbv1.ValueTypeInt64:
+ if len(raw) != 8 {
+ return Value{}, fmt.Errorf("int64: expected 8 bytes,
got %d", len(raw))
+ }
+ return Value{valueType: valueType, int64Val:
convert.BytesToInt64(raw)}, nil
+ case pbv1.ValueTypeFloat64:
+ if len(raw) != 8 {
+ return Value{}, fmt.Errorf("float64: expected 8 bytes,
got %d", len(raw))
+ }
+ return Value{valueType: valueType, floatVal:
convert.BytesToFloat64(raw)}, nil
+ case pbv1.ValueTypeBinaryData:
+ return Value{valueType: valueType, bytes: raw}, nil
+ case pbv1.ValueTypeTimestamp:
+ if len(raw) != 8 {
+ return Value{}, fmt.Errorf("timestamp: expected 8
bytes, got %d", len(raw))
+ }
+ return Value{valueType: valueType, int64Val:
convert.BytesToInt64(raw)}, nil
+ case pbv1.ValueTypeInt64Arr:
+ if len(raw)%8 != 0 {
+ return Value{}, fmt.Errorf("int64 array: length %d is
not a multiple of 8", len(raw))
+ }
+ values := make([]int64, 0, len(raw)/8)
+ for i := 0; i < len(raw); i += 8 {
+ values = append(values,
convert.BytesToInt64(raw[i:i+8]))
+ }
+ return Value{valueType: valueType, intArr: values}, nil
+ case pbv1.ValueTypeStrArr:
+ var values []string
+ for idx := 0; idx < len(raw); {
+ end, next, err := encoding.UnmarshalVarArray(raw, idx)
+ if err != nil {
+ return Value{}, fmt.Errorf("str array: %w", err)
+ }
+ values = append(values, string(raw[idx:end]))
+ idx = next
+ }
+ return Value{valueType: valueType, strArr: values}, nil
+ case pbv1.ValueTypeUnknown:
+ return Value{valueType: valueType, null: true}, nil
+ default:
+ return Value{}, fmt.Errorf("unsupported value type: %d",
valueType)
+ }
+}
diff --git a/pkg/pipeline/sdk/sdk.go b/pkg/pipeline/sdk/sdk.go
new file mode 100644
index 000000000..0871e0da8
--- /dev/null
+++ b/pkg/pipeline/sdk/sdk.go
@@ -0,0 +1,207 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package sdk defines the contract between the BanyanDB post-trace pipeline
+// engine and user-supplied native Go plugins (each a .so loaded in-process via
+// the standard Go plugin package). It is the single pinned surface plugin
+// authors build against; see docs/design/post-trace-pipeline.md §2.5.
+//
+// A plugin is one kind of the generic Plugin interface. Every kind shares
Kind,
+// Project, and Close; each kind adds its own processing method (the sampler
kind
+// adds Decide — see Sampler). Today the only kind is the sampler. A plugin is
a
+// package main built with `-buildmode=plugin` that exports exactly two symbols
+// (the constructor name is the kind's convention — NewSampler for the sampler
+// kind):
+//
+// var ABIVersion int // == sdk.ABIVersion
+// func NewSampler(config []byte) (sdk.Sampler, error)
+//
+// The engine refuses to load a plugin whose ABIVersion differs from its own
+// compiled sdk.ABIVersion, turning a silent miscompile into a fail-fast error.
+// The config bytes are the canonical JSON serialization of the
+// google.protobuf.Struct set in the plugin's proto payload (e.g.
+// SamplerPlugin.config); the plugin unmarshals them into its own typed config.
+//
+// In a TracePipelineConfig or StageRule, plugins are wired as an ordered chain
+// (a sequential pipe): links run in declared order, each processing what the
+// previous link kept, and a link that fails is bypassed. For an all-sampler
+// chain this is the conjunction of the links' keep/drop verdicts.
+//
+// The boundary deliberately crosses only stdlib slices plus the engine's
+// stable, byte-sized pbv1.ValueType enum, so no banyand/trace-internal struct
+// is shared across the .so boundary.
+package sdk
+
+import (
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// ABIVersion is compiled into the host and must be re-exported, unchanged, by
+// every plugin. The engine refuses to load a plugin whose ABIVersion differs.
+const ABIVersion = 1
+
+// Kind identifies a Plugin's role in a chain. It mirrors the set arm of the
+// proto Plugin.kind oneof and lets the engine label and cross-check a plugin
+// before type-asserting it to the matching kind interface. New kinds are added
+// here in lockstep with new oneof arms.
+type Kind uint8
+
+const (
+ // KindUnspecified is the zero value; a real plugin never reports it.
+ KindUnspecified Kind = iota
+ // KindSampler is a plugin that owns a keep/drop verdict (see Sampler).
+ KindSampler
+)
+
+// Plugin is the common interface every plugin kind satisfies — the generic
link
+// type the engine handles uniformly. The engine constructs a Plugin, checks
+// Kind against the proto oneof arm that named it, then type-asserts to that
+// kind's interface (e.g. Sampler) for the kind-specific call. Project and
Close
+// are shared by every kind; the per-kind processing method lives on the kind
+// interface. Each kind keeps its own constructor symbol convention (the
sampler
+// kind defaults to NewSampler; a future kind would use its own, e.g.
+// NewTransformer).
+type Plugin interface {
+ // Kind reports which plugin kind this is, for engine bookkeeping and
as a
+ // cross-check against the proto oneof arm. It must be constant for the
+ // plugin's lifetime.
+ Kind() Kind
+
+ // Project is the column-selection handshake, called once at load. The
+ // engine honors it for the plugin's lifetime: Tags drives the native
tag
+ // projection (only those tag columns are decoded); SpanIDs and Spans
gate
+ // the spans stream. Intrinsic columns (TraceID, MinTS, MaxTS) are
always
+ // present regardless of the projection.
+ Project() Projection
+
+ // Close releases any resources the plugin holds. It is called once
when the
+ // pipeline config is removed; because Go plugins cannot be unloaded,
the .so
+ // itself stays mapped until the process restarts.
+ Close() error
+}
+
+// Sampler is the keep/drop kind of Plugin (Kind reports KindSampler). The
engine
+// calls Project once at load, then Decide once per batch, then Close at
unload.
+// In a chain it is a conjunction link: each Sampler narrows the traces the
next
+// link sees.
+type Sampler interface {
+ Plugin
+
+ // Decide receives a vectorized batch of assembled per-trace blocks and
+ // returns a keep-mask aligned to batch.Traces. The batch is READ-ONLY:
+ // Decide must not mutate any slice it receives. The keep-mask is the
only
+ // output channel; the engine writes retained traces from its own
untouched
+ // block data, so a returned error or a length-mismatched verdict makes
the
+ // engine fail open (retain the whole batch).
+ Decide(batch *TraceBatch) (Verdict, error)
+}
+
+// Projection is the plugin's up-front column request — one handshake covering
+// every optional column. Intrinsic columns (TraceID, MinTS, MaxTS) are always
+// materialized and are not listed here.
+type Projection struct {
+ // Tags names the tag columns to decode into TraceBlock.Tags. Empty
means no
+ // tag columns are decoded.
+ Tags []string
+ // SpanIDs opts in to the span-id column. Default false. Span ids and
span
+ // bodies share one encoded data block in the native layout, so
requesting
+ // span ids forces a read of the spans stream — it is not free metadata.
+ SpanIDs bool
+ // Spans opts in to the heavy span-body column. Default false: the
engine
+ // leaves TraceBlock.Spans nil and, on the merge raw fast path, never
decodes
+ // span bodies. Set true only when the verdict reads them.
+ Spans bool
+}
+
+// TraceBatch is a vectorized batch of assembled per-trace blocks. It is the
+// engine's native columnar trace layout, shared read-only with the plugin.
+type TraceBatch struct {
+ // Traces holds one block per trace_id. The verdict's keep-mask is
aligned
+ // to this slice.
+ Traces []TraceBlock
+}
+
+// TraceBlock mirrors the native trace block: every populated column is indexed
+// in lockstep by span row i in [0, Len). Intrinsic columns are always set;
+// Tags, SpanIDs, and Spans appear only as requested by Project. Slices are
+// shared with the engine and must be treated as read-only.
+type TraceBlock struct {
+ // TraceID identifies the trace; the keep/drop verdict is per trace_id.
+ TraceID string
+ // Tags holds the projected tag columns, one per Projection.Tags entry
that
+ // the trace actually carries.
+ Tags []TagColumn
+ // SpanIDs is the row-aligned span-id column; nil unless
Projection.SpanIDs.
+ SpanIDs []string
+ // Spans is the row-aligned span-body column (opaque marshaled bytes);
nil
+ // unless Projection.Spans.
+ Spans [][]byte
+ // MinTS is the earliest span start in unix nanoseconds.
+ MinTS int64
+ // MaxTS is the latest span end in unix nanoseconds; trace duration is
+ // MaxTS - MinTS.
+ MaxTS int64
+}
+
+// Len reports the number of span rows in the block. It is available only when
a
+// row-indexed column (a projected tag, SpanIDs, or Spans) was materialized;
+// with a metadata-only projection it returns 0.
+func (b *TraceBlock) Len() int {
+ if len(b.SpanIDs) > 0 {
+ return len(b.SpanIDs)
+ }
+ if len(b.Spans) > 0 {
+ return len(b.Spans)
+ }
+ for i := range b.Tags {
+ if n := len(b.Tags[i].Values); n > 0 {
+ return n
+ }
+ }
+ return 0
+}
+
+// Tag returns the projected tag column with the given name, or nil if the
trace
+// did not carry it (or the plugin did not project it).
+func (b *TraceBlock) Tag(name string) *TagColumn {
+ for i := range b.Tags {
+ if b.Tags[i].Name == name {
+ return &b.Tags[i]
+ }
+ }
+ return nil
+}
+
+// TagColumn mirrors the native tag: a row-aligned column of marshaled values
+// plus the value type needed to decode them via At.
+type TagColumn struct {
+ // Name is the tag key.
+ Name string
+ // Values holds one marshaled value per span row; a nil element means
the
+ // tag is absent on that row.
+ Values [][]byte
+ // ValueType is the engine's stable, byte-sized type tag for every
value in
+ // the column.
+ ValueType pbv1.ValueType
+}
+
+// Verdict is the per-trace decision, aligned to TraceBatch.Traces.
+type Verdict struct {
+ // Keep must have the same length as the batch; Keep[i] true retains
+ // Traces[i]. A length mismatch makes the engine fail open.
+ Keep []bool
+}
diff --git a/pkg/pipeline/sdk/sdk_test.go b/pkg/pipeline/sdk/sdk_test.go
new file mode 100644
index 000000000..7cff3a0bc
--- /dev/null
+++ b/pkg/pipeline/sdk/sdk_test.go
@@ -0,0 +1,164 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sdk_test
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/pipeline/sdk"
+)
+
+func strArr(values ...string) []byte {
+ var dest []byte
+ for _, v := range values {
+ dest = encoding.MarshalVarArray(dest, []byte(v))
+ }
+ return dest
+}
+
+func int64Arr(values ...int64) []byte {
+ var dest []byte
+ for _, v := range values {
+ dest = append(dest, convert.Int64ToBytes(v)...)
+ }
+ return dest
+}
+
+func TestDecodeTagValue(t *testing.T) {
+ tests := []struct {
+ check func(*testing.T, sdk.Value)
+ name string
+ raw []byte
+ vt pbv1.ValueType
+ }{
+ {
+ name: "str", vt: pbv1.ValueTypeStr, raw:
[]byte("PostgreSQL"),
+ check: func(t *testing.T, v sdk.Value) {
assert.Equal(t, "PostgreSQL", v.Str()) },
+ },
+ {
+ name: "int64", vt: pbv1.ValueTypeInt64, raw:
convert.Int64ToBytes(-42),
+ check: func(t *testing.T, v sdk.Value) {
assert.Equal(t, int64(-42), v.Int64()) },
+ },
+ {
+ name: "float64", vt: pbv1.ValueTypeFloat64, raw:
convert.Float64ToBytes(3.5),
+ check: func(t *testing.T, v sdk.Value) {
assert.InDelta(t, 3.5, v.Float64(), 1e-9) },
+ },
+ {
+ name: "binary", vt: pbv1.ValueTypeBinaryData, raw:
[]byte{0x01, 0x02, 0x03},
+ check: func(t *testing.T, v sdk.Value) {
assert.Equal(t, []byte{0x01, 0x02, 0x03}, v.Bytes()) },
+ },
+ {
+ name: "timestamp", vt: pbv1.ValueTypeTimestamp, raw:
convert.Int64ToBytes(1_700_000_000_000_000_000),
+ check: func(t *testing.T, v sdk.Value) {
assert.Equal(t, int64(1_700_000_000_000_000_000), v.Int64()) },
+ },
+ {
+ name: "int64_arr", vt: pbv1.ValueTypeInt64Arr, raw:
int64Arr(1, 2, 3),
+ check: func(t *testing.T, v sdk.Value) {
assert.Equal(t, []int64{1, 2, 3}, v.Int64Arr()) },
+ },
+ {
+ name: "str_arr", vt: pbv1.ValueTypeStrArr, raw:
strArr("a", "bb", "ccc"),
+ check: func(t *testing.T, v sdk.Value) {
assert.Equal(t, []string{"a", "bb", "ccc"}, v.StrArr()) },
+ },
+ {
+ name: "nil_is_null", vt: pbv1.ValueTypeStr, raw: nil,
+ check: func(t *testing.T, v sdk.Value) { assert.True(t,
v.IsNull()) },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ v, err := sdk.DecodeTagValue(tt.vt, tt.raw)
+ require.NoError(t, err)
+ assert.Equal(t, tt.vt, v.ValueType())
+ tt.check(t, v)
+ })
+ }
+}
+
+func TestDecodeTagValueErrors(t *testing.T) {
+ _, err := sdk.DecodeTagValue(pbv1.ValueTypeInt64Arr, []byte{0x01, 0x02})
+ require.Error(t, err, "int64 array length not a multiple of 8 must
error")
+
+ _, err = sdk.DecodeTagValue(pbv1.ValueType(255), []byte("x"))
+ require.Error(t, err, "unknown value type must error")
+
+ for _, vt := range []pbv1.ValueType{pbv1.ValueTypeInt64,
pbv1.ValueTypeFloat64, pbv1.ValueTypeTimestamp} {
+ _, err = sdk.DecodeTagValue(vt, []byte{0x01, 0x02, 0x03})
+ require.Errorf(t, err, "fixed-width type %d with fewer than 8
bytes must error", vt)
+ }
+}
+
+func TestTagColumnAt(t *testing.T) {
+ col := sdk.TagColumn{
+ Name: "db.type",
+ ValueType: pbv1.ValueTypeStr,
+ Values: [][]byte{[]byte("PostgreSQL"), nil, []byte("MySQL")},
+ }
+ v0, err := col.At(0)
+ require.NoError(t, err)
+ assert.Equal(t, "PostgreSQL", v0.Str())
+
+ v1, err := col.At(1)
+ require.NoError(t, err)
+ assert.True(t, v1.IsNull())
+
+ _, err = col.At(3)
+ require.Error(t, err, "row out of range must error")
+}
+
+func TestTraceBlockHelpers(t *testing.T) {
+ b := sdk.TraceBlock{
+ TraceID: "t-1",
+ MinTS: 100,
+ MaxTS: 2_802_000_000, // 2802ms in nanos
+ Tags: []sdk.TagColumn{
+ {Name: "db.type", ValueType: pbv1.ValueTypeStr, Values:
[][]byte{[]byte("PostgreSQL"), nil}},
+ },
+ SpanIDs: []string{"s-1", "s-2"},
+ }
+ assert.Equal(t, 2, b.Len(), "Len comes from the span-id column")
+ require.NotNil(t, b.Tag("db.type"))
+ assert.Nil(t, b.Tag("missing"))
+
+ // Len falls back to a projected tag column when only tags are present.
+ tagsOnly := sdk.TraceBlock{Tags: []sdk.TagColumn{{Name: "x", Values:
[][]byte{nil, nil, nil}}}}
+ assert.Equal(t, 3, tagsOnly.Len())
+
+ // Metadata-only block reports 0 rows.
+ metaOnly := sdk.TraceBlock{TraceID: "t-2", MinTS: 1, MaxTS: 2}
+ assert.Equal(t, 0, metaOnly.Len())
+}
+
+type fakeSampler struct{}
+
+func (fakeSampler) Kind() sdk.Kind { return
sdk.KindSampler }
+func (fakeSampler) Project() sdk.Projection { return
sdk.Projection{} }
+func (fakeSampler) Decide(*sdk.TraceBatch) (sdk.Verdict, error) { return
sdk.Verdict{}, nil }
+func (fakeSampler) Close() error { return nil }
+
+func TestSamplerIsPlugin(t *testing.T) {
+ var s sdk.Sampler = fakeSampler{}
+ var p sdk.Plugin = s // a Sampler is a Plugin
+ assert.Equal(t, sdk.KindSampler, p.Kind())
+ assert.NotEqual(t, sdk.KindUnspecified, p.Kind())
+}