This is an automated email from the ASF dual-hosted git repository.

wankai123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e9db8fba4 Add `@Stream(allowBootReshape = true)` opt-in for additive 
boot-time reshape of BanyanDB streams / measures. (#13878)
5e9db8fba4 is described below

commit 5e9db8fba4c6b11620137afcbf9ccbd788c1214d
Author: Wan Kai <[email protected]>
AuthorDate: Thu May 21 14:23:51 2026 +0800

    Add `@Stream(allowBootReshape = true)` opt-in for additive boot-time 
reshape of BanyanDB streams / measures. (#13878)
---
 CLAUDE.md                                          |   4 +
 NOTICE                                             |   2 +-
 dist-material/release-docs/NOTICE                  |   2 +-
 docs/en/changes/changes.md                         |   1 +
 .../runtime-rule-hot-update.md                     |  21 +
 .../oap/server/core/alarm/AlarmRecord.java         |   2 +-
 .../oap/server/core/analysis/Stream.java           |  26 +
 .../oap/server/core/storage/model/Model.java       |  10 +
 .../server/core/storage/model/StorageModels.java   |   5 +
 .../plugin/banyandb/BanyanDBIndexInstaller.java    | 133 +++-
 .../server/storage/plugin/banyandb/BanyanDBIT.java | 882 +++++++++++++++++++++
 .../src/test/resources/bydb.yml                    |   4 +
 .../elasticsearch/base/TimeSeriesUtilsTest.java    |   6 +-
 .../jdbc/common/dao/JDBCHistoryDeleteDAOIT.java    |   2 +-
 14 files changed, 1087 insertions(+), 13 deletions(-)

diff --git a/CLAUDE.md b/CLAUDE.md
index dacd9a41ba..1608fda657 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -246,6 +246,10 @@ Always use `--recurse-submodules` when cloning or update 
submodules manually.
 - `changes/changes.md` - Changelog (update when making changes)
 - `swip/` - SkyWalking Improvement Proposals
 
+## Handling Existing Tests
+
+If existing test code (UT, IT, or E2E) fails to pass, try to fix it. If any 
test cases need to be modified, or test code needs to be deleted, the change 
must be double-checked by a human before it is applied — surface the proposed 
modification or deletion and the reasoning, and wait for explicit approval 
rather than changing or removing the test yourself.
+
 ## Submitting Pull Requests
 
 Use the `/gh-pull-request` skill for committing and pushing to a PR branch. It 
runs pre-flight checks (compile, checkstyle, license headers) before every 
push, and creates the PR if one doesn't exist yet.
diff --git a/NOTICE b/NOTICE
index 7e684bf85d..0fef61473b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
 Apache SkyWalking
-Copyright 2017-2024 The Apache Software Foundation
+Copyright 2017-2026 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
diff --git a/dist-material/release-docs/NOTICE 
b/dist-material/release-docs/NOTICE
index 6c5c72afe1..90fdf3a8b5 100755
--- a/dist-material/release-docs/NOTICE
+++ b/dist-material/release-docs/NOTICE
@@ -1,5 +1,5 @@
 Apache SkyWalking
-Copyright 2017-2024 The Apache Software Foundation
+Copyright 2017-2026 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index ddf89306e5..322ada7e4d 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -279,6 +279,7 @@
 * LAL: support full arithmetic (`+`, `-`, `*`, `/`) on numeric operands and 
fix the original bug where `(tag("x") as Integer) + (tag("y") as Integer)` was 
treated as string concatenation — expressions like `input_tokens + 
output_tokens < 10000` produced the concatenated string `"2589115"` rather than 
the integer sum `2704`, so token-threshold conditions never triggered `abort 
{}`. Operand types are now inferred from explicit casts (`as Integer` / `as 
Long` / `as Float` / `as Double`), ty [...]
 * Fix: `avgHistogramPercentile` / `sumHistogramPercentile` meter functions 
reported the smallest finite bucket boundary (e.g. `10` for OTel 
`gen_ai_server_request_duration` whose `le` is rewritten from `0.01s` → `10ms`) 
for every rank when no samples were observed in any bucket. The percentile 
loop's `count >= roof` check matched on the first sorted bucket because both 
sides were `0`. `calculate()` now short-circuits to `0` for every rank when the 
windowed total is `0`.
 * Fix: MAL `expPrefix` now applies to every metric source in `exp`, not just 
the leading one. Previously the prefix was spliced after the first `.`, so 
secondary metrics inside arguments (e.g. the divisor in 
`a.sum(['s']).safeDiv(b.sum(['s']))`) silently skipped the prefix — a rule like 
envoy-ai-gateway's `request_latency_avg` (`sum / count`) would tag-rewrite only 
the dividend. The injection is now AST-aware: every bare-IDENTIFIER metric 
source is wrapped, while downsampling-type consta [...]
+* Add `@Stream(allowBootReshape = true)` opt-in for additive boot-time reshape 
of BanyanDB streams / measures. Code-defined stream classes (e.g. 
`AlarmRecord`) can now annotate their schema as eligible for in-place additive 
update at OAP boot — a new `@Column` is appended to the live tag-family / 
fields via `client.update` instead of being silently rejected with 
`SKIPPED_SHAPE_MISMATCH` (which previously forced operators to drop the measure 
/ stream and lose historical rows). The opt-in  [...]
 
 #### UI
 * Add mobile menu icon and i18n labels for the iOS layer.
diff --git a/docs/en/concepts-and-designs/runtime-rule-hot-update.md 
b/docs/en/concepts-and-designs/runtime-rule-hot-update.md
index f1901cc6e9..d20fc5ed86 100644
--- a/docs/en/concepts-and-designs/runtime-rule-hot-update.md
+++ b/docs/en/concepts-and-designs/runtime-rule-hot-update.md
@@ -160,6 +160,27 @@ explicit ERROR, boot always continues, the affected metric 
is disabled, and the
 operator reconciles explicitly through the on-demand workflow. The same shape 
is
 visible across every backend.
 
+### Code-defined stream opt-in (narrow exception)
+
+The "boot never reshapes" rule above applies to **runtime-rule (MAL / LAL)**
+registration — those rules ride the `/addOrUpdate` REST workflow when their
+backing schema needs to change.
+
+Streams whose schema lives in OAP source code (e.g. `AlarmRecord`) can opt in
+to **additive** boot-time reshape via
+`@Stream(allowBootReshape = true)`. When the flag is on and the diff is
+purely additive (new tag / new field; no type changes, no drops, no entity /
+interval / sharding-key flips), the installer calls `client.update` at boot
+to append the new column to the live measure / stream; non-additive
+divergences still record `SKIPPED_SHAPE_MISMATCH` and require an operator
+drop+recreate. Only the init / standalone OAP performs the reshape; non-init
+peers continue through the existing poll-and-wait loop so a single node
+drives DDL during a rolling restart.
+
+This opt-in is **BanyanDB-only**. JDBC and Elasticsearch are append-only on
+the data path and already accept additive column / mapping additions at boot
+without operator intervention, so the flag is unread on those backends.
+
 ## On-demand workflow
 
 Triggered by an HTTP call to one of the admin endpoints. A request arriving at 
any
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
index eafde843a7..a5a1e474b3 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
@@ -42,7 +42,7 @@ import static 
org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.AL
 @Getter
 @Setter
 @ScopeDeclaration(id = ALARM, name = "Alarm")
-@Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, 
builder = AlarmRecord.Builder.class, processor = RecordStreamProcessor.class)
+@Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, 
builder = AlarmRecord.Builder.class, processor = RecordStreamProcessor.class, 
allowBootReshape = true)
 @SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = 
AlarmRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
 @BanyanDB.TimestampColumn(AlarmRecord.START_TIME)
 @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS)
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java
index acab363060..ab00568eb0 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java
@@ -27,6 +27,8 @@ import 
org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamProcessor
 import 
org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
 import 
org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
 import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
+import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import 
org.apache.skywalking.oap.server.core.storage.model.StorageManipulationOpt;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 
 /**
@@ -59,4 +61,28 @@ public @interface Stream {
      * TopNStreamProcessor} and {@link NoneStreamProcessor} for more details.
      */
     Class<? extends StreamProcessor> processor();
+
+    /**
+     * Opt-in to additive boot-time reshape of the backend resource for this 
stream class.
+     * <strong>BanyanDB only</strong> — JDBC and Elasticsearch are append-only 
on the data
+     * path and don't refuse additive column / mapping additions at boot, so 
they don't
+     * need (and don't read) this flag.
+     *
+     * <p>Default is {@code false} — boot follows the standard {@code 
schemaCreateIfAbsent}
+     * policy and records {@link 
StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH} on
+     * any shape divergence, leaving reconciliation to the operator (the only 
workflow that
+     * may change backend schema).
+     *
+     * <p>When set to {@code true}, the BanyanDB installer is allowed to apply
+     * <strong>purely additive</strong> changes (new tag, new field) during 
boot via
+     * {@code client.update}. Type changes, drops, kind flips, entity / 
interval /
+     * sharding-key changes are still refused with {@code 
SKIPPED_SHAPE_MISMATCH} and
+     * require a manual drop+recreate (identity-breaking edits are explicit 
operator
+     * actions, not boot side-effects).
+     *
+     * <p>Only the init / standalone OAP performs the reshape; non-init peers 
continue
+     * through the existing poll-and-wait loop in {@link 
ModelInstaller#whenCreating} so a
+     * single node drives DDL during a rolling restart.
+     */
+    boolean allowBootReshape() default false;
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index 5ceb6294d2..ed139cc418 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -41,6 +41,16 @@ public class Model {
     private final boolean superDataset;
     private final Class<?> streamClass;
     private final boolean timeRelativeID;
+    /**
+     * BanyanDB only — when true, the BanyanDB installer is allowed to apply 
purely
+     * additive shape changes (new tag / new field) at boot. See
+     * {@link 
org.apache.skywalking.oap.server.core.analysis.Stream#allowBootReshape()}.
+     * JDBC and Elasticsearch ignore this flag (append-only data paths already 
accept
+     * additive column / mapping additions without operator intervention). 
Defaults to
+     * false for models registered without a {@code @Stream} annotation (e.g. 
runtime-rule
+     * MAL / LAL metrics, which reshape through the runtime-rule REST path 
instead).
+     */
+    private final boolean allowBootReshape;
     private final SQLDatabaseModelExtension sqlDBModelExtension;
     private final BanyanDBModelExtension banyanDBModelExtension;
     private final ElasticSearchModelExtension elasticSearchModelExtension;
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 558b47378f..72e8ce57e3 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -18,6 +18,7 @@
 package org.apache.skywalking.oap.server.core.storage.model;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
@@ -169,6 +170,9 @@ public class StorageModels implements IModelManager, 
ModelRegistry, ModelManipul
         seriesIDChecker.check(storage.getModelName());
         shardingKeyChecker.check(storage.getModelName());
 
+        final Stream streamAnnotation = aClass.getAnnotation(Stream.class);
+        final boolean allowBootReshape = streamAnnotation != null && 
streamAnnotation.allowBootReshape();
+
         Model model = new Model(
             storage.getModelName(),
             modelColumns,
@@ -177,6 +181,7 @@ public class StorageModels implements IModelManager, 
ModelRegistry, ModelManipul
             isSuperDatasetModel(aClass),
             aClass,
             storage.isTimeRelativeID(),
+            allowBootReshape,
             sqlDBModelExtension,
             banyanDBModelExtension,
             elasticSearchModelExtension
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index abf4f288e3..092214919e 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -172,7 +172,7 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
                             StreamModel streamModel = 
MetadataRegistry.INSTANCE.registerStreamModel(
                                 model, config);
                             if (runShapeChecks) {
-                                checkStream(streamModel.getStream(), c, opt);
+                                checkStream(model, streamModel.getStream(), c, 
opt);
                                 checkIndexRules(model.getName(), 
streamModel.getIndexRules(), c, opt);
                                 checkIndexRuleBinding(
                                     streamModel.getIndexRules(), 
metadata.getGroup(), metadata.name(),
@@ -184,7 +184,7 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
                     } else { // measure
                         MeasureModel measureModel = 
MetadataRegistry.INSTANCE.registerMeasureModel(model, config, 
downSamplingConfigService);
                         if (runShapeChecks) {
-                            checkMeasure(measureModel.getMeasure(), c, opt);
+                            checkMeasure(model, measureModel.getMeasure(), c, 
opt);
                             checkIndexRules(model.getName(), 
measureModel.getIndexRules(), c, opt);
                             checkIndexRuleBinding(
                                 measureModel.getIndexRules(), 
metadata.getGroup(), metadata.name(),
@@ -767,9 +767,16 @@ public class BanyanDBIndexInstaller extends ModelInstaller 
{
      * either update it (on-demand operator workflow — {@link 
StorageManipulationOpt#isWithSchemaChange()})
      * or skip the update and record {@link 
StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}
      * (static boot workflow — {@link 
StorageManipulationOpt#isSchemaCreateIfAbsent()}). Boot MUST
-     * NOT reshape the backend — reshape is an explicit operator action only.
+     * NOT reshape the backend by default — reshape is an explicit operator 
action.
+     *
+     * <p>Exception: when the model opts in via {@link 
Model#isAllowBootReshape()} and the diff
+     * is purely additive (new tag / new field, no type changes, no drops, 
identity preserved),
+     * the init OAP is allowed to apply the additive update during boot. 
Non-init OAPs continue
+     * through the poll-and-wait loop in
+     * {@link 
org.apache.skywalking.oap.server.core.storage.model.ModelInstaller#whenCreating}
+     * so only one node races on the DDL.
      */
-    private void checkMeasure(Measure measure, BanyanDBClient client, 
StorageManipulationOpt opt) throws BanyanDBException {
+    private void checkMeasure(Model model, Measure measure, BanyanDBClient 
client, StorageManipulationOpt opt) throws BanyanDBException {
         Measure hisMeasure = 
client.findMeasure(measure.getMetadata().getGroup(), 
measure.getMetadata().getName());
         if (hisMeasure == null) {
             throw new IllegalStateException("Measure: " + 
measure.getMetadata().getName() + " exist but can't find it from BanyanDB 
server");
@@ -782,6 +789,15 @@ public class BanyanDBIndexInstaller extends ModelInstaller 
{
                                        
.equals(measure.toBuilder().clearMetadata().build());
             if (!equals) {
                 if (!opt.getFlags().isUpdateOnMismatch()) {
+                    if (canBootReshape(model, opt) && 
isPurelyAdditiveMeasure(measure, hisMeasure)) {
+                        opt.recordModRevision(client.update(measure));
+                        log.info("boot reshape (additive) Measure: {} — 
applied @Stream(allowBootReshape=true). backend={}, declared={}",
+                            hisMeasure.getMetadata().getName(), hisMeasure, 
measure);
+                        opt.recordOutcome("measure", 
hisMeasure.getMetadata().getName(),
+                            StorageManipulationOpt.Outcome.UPDATED,
+                            "additive boot reshape: new tag / field added");
+                        return;
+                    }
                     log.error("BanyanDB measure {} shape mismatch at boot — 
backend holds a "
                         + "different shape than the declared rule. SKIPPING 
metric; operator "
                         + "must reshape via POST /runtime/rule/addOrUpdate or 
align the rule "
@@ -801,9 +817,10 @@ public class BanyanDBIndexInstaller extends ModelInstaller 
{
 
     /**
      * Check if the stream exists and update (or record shape mismatch) per 
mode.
-     * See {@link #checkMeasure} for the create-if-absent vs full-install 
contract.
+     * See {@link #checkMeasure} for the create-if-absent vs full-install 
contract,
+     * including the {@link Model#isAllowBootReshape()} additive opt-in.
      */
-    private void checkStream(Stream stream, BanyanDBClient client, 
StorageManipulationOpt opt) throws BanyanDBException {
+    private void checkStream(Model model, Stream stream, BanyanDBClient 
client, StorageManipulationOpt opt) throws BanyanDBException {
         Stream hisStream = client.findStream(stream.getMetadata().getGroup(), 
stream.getMetadata().getName());
         if (hisStream == null) {
             throw new IllegalStateException("Stream: " + 
stream.getMetadata().getName() + " exist but can't find it from BanyanDB 
server");
@@ -816,6 +833,15 @@ public class BanyanDBIndexInstaller extends ModelInstaller 
{
                                       
.equals(stream.toBuilder().clearUpdatedAt().clearCreatedAt().clearMetadata().build());
             if (!equals) {
                 if (!opt.getFlags().isUpdateOnMismatch()) {
+                    if (canBootReshape(model, opt) && 
isPurelyAdditiveStream(stream, hisStream)) {
+                        opt.recordModRevision(client.update(stream));
+                        log.info("boot reshape (additive) Stream: {} — applied 
@Stream(allowBootReshape=true). backend={}, declared={}",
+                            hisStream.getMetadata().getName(), hisStream, 
stream);
+                        opt.recordOutcome("stream", 
hisStream.getMetadata().getName(),
+                            StorageManipulationOpt.Outcome.UPDATED,
+                            "additive boot reshape: new tag added");
+                        return;
+                    }
                     log.error("BanyanDB stream {} shape mismatch at boot — 
backend holds a "
                         + "different shape than the declared rule. SKIPPING; 
operator must "
                         + "reshape via POST /runtime/rule/addOrUpdate. 
backend={}, declared={}",
@@ -831,6 +857,101 @@ public class BanyanDBIndexInstaller extends 
ModelInstaller {
         }
     }
 
+    /**
+     * Gate for boot-time reshape: three conditions, all required.
+     * <ul>
+     *   <li>The model opts in via {@link Model#isAllowBootReshape()}.</li>
+     *   <li>The opt is the static-boot policy ({@link 
StorageManipulationOpt#isSchemaCreateIfAbsent()}).
+     *       Restricting the reshape branch to this single mode keeps the 
policy boundary
+     *       explicit — {@code verifySchemaOnly()} must stay read-only even if 
a future
+     *       caller flips {@code updateOnMismatch} off, and {@code 
withSchemaChange()}
+     *       already takes the existing on-demand reshape path above.</li>
+     *   <li>This OAP must not be in {@code no-init} mode. Non-init OAPs leave 
DDL to the
+     *       init / standalone OAP and converge via the poll-wait loop in
+     *       {@link 
org.apache.skywalking.oap.server.core.storage.model.ModelInstaller#whenCreating}.</li>
+     * </ul>
+     */
+    private boolean canBootReshape(Model model, StorageManipulationOpt opt) {
+        return model.isAllowBootReshape()
+            && opt.isSchemaCreateIfAbsent()
+            && !RunningMode.isNoInitMode();
+    }
+
+    /**
+     * Purely-additive diff for a BanyanDB {@link Stream}: declared may add 
tag families or
+     * tags, but every tag-family / tag that already exists on the backend 
must be present
+     * with the same name and {@link BanyandbDatabase.TagType type}, the 
{@link BanyandbDatabase.Entity entity}
+     * column list must match exactly (reshape can't change shard / series-id 
semantics),
+     * and no tag may be dropped. Returns false for any non-additive 
divergence so the caller
+     * falls back to {@link 
StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}.
+     */
+    private boolean isPurelyAdditiveStream(Stream declared, Stream live) {
+        if (!declared.getEntity().equals(live.getEntity())) {
+            return false;
+        }
+        return isPurelyAdditiveTagFamilies(declared.getTagFamiliesList(), 
live.getTagFamiliesList());
+    }
+
+    /**
+     * Purely-additive diff for a BanyanDB {@link Measure}: same tag-family 
and entity rules as
+     * {@link #isPurelyAdditiveStream}, plus fields may be added but no field 
name / type /
+     * encoding / compression may change, and the scalar properties (interval, 
index_mode,
+     * sharding_key) must match exactly.
+     */
+    private boolean isPurelyAdditiveMeasure(Measure declared, Measure live) {
+        if (!declared.getEntity().equals(live.getEntity())) {
+            return false;
+        }
+        if (!declared.getInterval().equals(live.getInterval())) {
+            return false;
+        }
+        if (declared.getIndexMode() != live.getIndexMode()) {
+            return false;
+        }
+        if (!declared.getShardingKey().equals(live.getShardingKey())) {
+            return false;
+        }
+        if (!isPurelyAdditiveTagFamilies(declared.getTagFamiliesList(), 
live.getTagFamiliesList())) {
+            return false;
+        }
+        final Map<String, BanyandbDatabase.FieldSpec> declaredFields = 
declared.getFieldsList().stream()
+            .collect(Collectors.toMap(BanyandbDatabase.FieldSpec::getName, f 
-> f, (a, b) -> a));
+        for (BanyandbDatabase.FieldSpec liveField : live.getFieldsList()) {
+            BanyandbDatabase.FieldSpec declaredField = 
declaredFields.get(liveField.getName());
+            if (declaredField == null) {
+                return false;
+            }
+            if (!declaredField.equals(liveField)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean 
isPurelyAdditiveTagFamilies(List<BanyandbDatabase.TagFamilySpec> declared,
+                                                
List<BanyandbDatabase.TagFamilySpec> live) {
+        final Map<String, BanyandbDatabase.TagFamilySpec> declaredByName = 
declared.stream()
+            .collect(Collectors.toMap(BanyandbDatabase.TagFamilySpec::getName, 
f -> f, (a, b) -> a));
+        for (BanyandbDatabase.TagFamilySpec liveFamily : live) {
+            BanyandbDatabase.TagFamilySpec declaredFamily = 
declaredByName.get(liveFamily.getName());
+            if (declaredFamily == null) {
+                return false;
+            }
+            final Map<String, BanyandbDatabase.TagSpec> declaredTags = 
declaredFamily.getTagsList().stream()
+                .collect(Collectors.toMap(BanyandbDatabase.TagSpec::getName, t 
-> t, (a, b) -> a));
+            for (BanyandbDatabase.TagSpec liveTag : liveFamily.getTagsList()) {
+                BanyandbDatabase.TagSpec declaredTag = 
declaredTags.get(liveTag.getName());
+                if (declaredTag == null) {
+                    return false;
+                }
+                if (declaredTag.getType() != liveTag.getType()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
     private void checkTrace(Trace trace, BanyanDBClient client, 
StorageManipulationOpt opt) throws BanyanDBException {
         Trace hisTrace = client.findTrace(trace.getMetadata().getGroup(), 
trace.getMetadata().getName());
         if (hisTrace == null) {
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
new file mode 100644
index 0000000000..58e19d4f0a
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
@@ -0,0 +1,882 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.library.banyandb.v1.client.DataPoint;
+import org.apache.skywalking.library.banyandb.v1.client.MeasureQuery;
+import org.apache.skywalking.library.banyandb.v1.client.MeasureQueryResponse;
+import org.apache.skywalking.library.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.library.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.library.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
+import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageID;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import 
org.apache.skywalking.oap.server.core.storage.model.StorageManipulationOpt;
+import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.apache.skywalking.oap.server.library.it.ITVersions;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
+import org.apache.skywalking.oap.server.library.module.ModuleServiceHolder;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.MeasureBulkWriteProcessor;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
+import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.apache.skywalking.oap.server.testing.util.ReflectUtil;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@Slf4j
+@Testcontainers
+public class BanyanDBIT {
+    private static final String REGISTRY = "ghcr.io";
+    private static final String IMAGE_NAME = "apache/skywalking-banyandb";
+    private static final String TAG = ITVersions.get("SW_BANYANDB_COMMIT");
+
+    private static final String IMAGE = REGISTRY + "/" + IMAGE_NAME + ":" + 
TAG;
+    private static MockedStatic<DefaultScopeDefine> 
DEFAULT_SCOPE_DEFINE_MOCKED_STATIC;
+    protected static final int GRPC_PORT = 17912;
+    protected static final int HTTP_PORT = 17913;
+
+    @Container
+    public GenericContainer<?> banyanDB = new GenericContainer<>(
+        DockerImageName.parse(IMAGE))
+        .withCommand("standalone", "--stream-root-path", 
"/tmp/banyandb-stream-data",
+                     "--measure-root-path", "/tmp/banyand-measure-data"
+        )
+        .withExposedPorts(GRPC_PORT, HTTP_PORT)
+        .waitingFor(Wait.forHttp("/api/healthz").forPort(HTTP_PORT));
+
+    private BanyanDBStorageClient client;
+    private BanyanDBStorageConfig config;
+
+    protected void setUpConnection() throws Exception {
+        ModuleManager moduleManager = mock(ModuleManager.class);
+        ModuleDefine storageModule = mock(ModuleDefine.class);
+        BanyanDBStorageProvider provider = mock(BanyanDBStorageProvider.class);
+        Mockito.when(provider.getModule()).thenReturn(storageModule);
+
+        NoneTelemetryProvider telemetryProvider = 
mock(NoneTelemetryProvider.class);
+        Mockito.when(telemetryProvider.getService(MetricsCreator.class))
+               .thenReturn(new MetricsCreatorNoop());
+        TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
+        ReflectUtil.setInternalState(telemetryModule, "loadedProvider", 
telemetryProvider);
+        
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
+        log.info("create BanyanDB client and try to connect");
+        config = new BanyanDBConfigLoader(provider).loadConfig();
+        config.getGlobal().setTargets(banyanDB.getHost() + ":" + 
banyanDB.getMappedPort(GRPC_PORT));
+        client = new BanyanDBStorageClient(moduleManager, config);
+        client.connect();
+    }
+
+    private MeasureBulkWriteProcessor processor;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        DEFAULT_SCOPE_DEFINE_MOCKED_STATIC = 
mockStatic(DefaultScopeDefine.class);
+        DEFAULT_SCOPE_DEFINE_MOCKED_STATIC.when(() -> 
DefaultScopeDefine.nameOf(1)).thenReturn("any");
+        setUpConnection();
+        processor = client.createMeasureBulkProcessor(1000, 1, 1);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (DEFAULT_SCOPE_DEFINE_MOCKED_STATIC != null) {
+            DEFAULT_SCOPE_DEFINE_MOCKED_STATIC.close();
+            DEFAULT_SCOPE_DEFINE_MOCKED_STATIC = null;
+        }
+    }
+
+    @Test
+    public void testInstall() throws Exception {
+        DownSamplingConfigService downSamplingConfigService = new 
DownSamplingConfigService(Arrays.asList("minute"));
+        ModuleManager moduleManager = mock(ModuleManager.class);
+        ModuleProviderHolder moduleProviderHolder = 
mock(ModuleProviderHolder.class);
+        ModuleServiceHolder moduleServiceHolder = 
mock(ModuleServiceHolder.class);
+        
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+        when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+        
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+        StorageModels models = new StorageModels();
+        Model model = models.add(TestMetric.class, DefaultScopeDefine.SERVICE,
+                                 new Storage("testMetric", true, 
DownSampling.Minute),
+                                 StorageManipulationOpt.withSchemaChange()
+        );
+        BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, 
moduleManager, config);
+        installer.isExists(model, StorageManipulationOpt.withSchemaChange());
+        //test Group install
+        String groupName = MetadataRegistry.convertGroupName(
+            config.getGlobal().getNamespace(),
+            BanyanDB.MeasureGroup.METRICS_MINUTE.getName()
+        );
+        BanyandbCommon.Group group = client.client.findGroup(groupName);
+        assertEquals(BanyandbCommon.Catalog.CATALOG_MEASURE, 
group.getCatalog());
+        assertEquals(config.getMetricsMin().getSegmentInterval(), 
group.getResourceOpts().getSegmentInterval().getNum());
+        assertEquals(config.getMetricsMin().getShardNum(), 
group.getResourceOpts().getShardNum());
+        assertEquals(BanyandbCommon.IntervalRule.Unit.UNIT_DAY, 
group.getResourceOpts().getSegmentInterval().getUnit());
+        assertEquals(config.getMetricsMin().getTtl(), 
group.getResourceOpts().getTtl().getNum());
+        assertEquals(BanyandbCommon.IntervalRule.Unit.UNIT_DAY, 
group.getResourceOpts().getTtl().getUnit());
+
+        installer.createTable(model);
+        //test Measure install
+        BanyandbDatabase.Measure measure = 
client.client.findMeasure(groupName, "testMetric_minute");
+        assertEquals("storage-only", measure.getTagFamilies(0).getName());
+        assertEquals("service_id", 
measure.getTagFamilies(0).getTags(0).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
measure.getTagFamilies(0).getTags(0).getType());
+        assertEquals("searchable", measure.getTagFamilies(1).getName());
+        assertEquals("tag", measure.getTagFamilies(1).getTags(0).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
measure.getTagFamilies(1).getTags(0).getType());
+        assertEquals("service_id", measure.getEntity().getTagNames(0));
+        assertEquals("value", measure.getFields(0).getName());
+        assertEquals(BanyandbDatabase.FieldType.FIELD_TYPE_INT, 
measure.getFields(0).getFieldType());
+        //test TopNAggregation install
+        BanyandbDatabase.TopNAggregation topNAggregation = 
client.client.findTopNAggregation(
+            groupName, "testMetric-service");
+        assertEquals("value", topNAggregation.getFieldName());
+        assertEquals("service_id", topNAggregation.getGroupByTagNames(0));
+        assertEquals(BanyandbModel.Sort.SORT_DESC, 
topNAggregation.getFieldValueSort());
+        assertEquals(10, topNAggregation.getLruSize());
+        assertEquals(1000, topNAggregation.getCountersNumber());
+        //test IndexRule install
+        BanyandbDatabase.IndexRule indexRuleTag = 
client.client.findIndexRule(groupName, "tag");
+        assertEquals("url", indexRuleTag.getAnalyzer());
+        assertTrue(indexRuleTag.getNoSort());
+        //test IndexRuleBinding install
+        BanyandbDatabase.IndexRuleBinding indexRuleBinding = 
client.client.findIndexRuleBinding(
+            groupName, "testMetric_minute");
+        assertEquals("tag", indexRuleBinding.getRules(0));
+        assertEquals("testMetric_minute", 
indexRuleBinding.getSubject().getName());
+        //test data query
+        Instant now = Instant.now();
+        Instant begin = now.minus(15, ChronoUnit.MINUTES);
+        MeasureWrite measureWrite = client.createMeasureWrite(groupName, 
"testMetric_minute", now.toEpochMilli());
+        measureWrite.tag("storage-only", "service_id", 
TagAndValue.stringTagValue("service1"))
+                    .tag("searchable", "tag", 
TagAndValue.stringTagValue("tag1"))
+                    .field("value", TagAndValue.longFieldValue(100));
+        CompletableFuture<Void> f = processor.add(measureWrite);
+        f.exceptionally(exp -> {
+            Assertions.fail(exp.getMessage());
+            return null;
+        });
+        f.get(10, TimeUnit.SECONDS);
+
+        MeasureQuery query = new MeasureQuery(Lists.newArrayList(groupName), 
"testMetric_minute",
+                                              new TimestampRange(
+                                                  begin.toEpochMilli(),
+                                                  now.plus(1, 
ChronoUnit.MINUTES).toEpochMilli()
+                                              ), ImmutableMap.of("service_id", 
"storage-only", "tag", "searchable"),
+                                              ImmutableSet.of("value")
+        );
+        await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            MeasureQueryResponse resp = client.query(query);
+            assertNotNull(resp);
+            assertEquals(1, resp.getDataPoints().size());
+            assertEquals("service1", 
resp.getDataPoints().get(0).getTagValue("service_id"));
+            assertEquals("tag1", 
resp.getDataPoints().get(0).getTagValue("tag"));
+            assertEquals(100, (Long) 
resp.getDataPoints().get(0).getFieldValue("value"));
+        });
+
+        // StorageModels.add now dedupes by (name, downsampling); evict the 
original
+        // registration so the UpdateTestMetric registration takes effect.
+        models.remove(TestMetric.class, 
StorageManipulationOpt.withSchemaChange());
+        Model updatedModel = models.add(UpdateTestMetric.class, 
DefaultScopeDefine.SERVICE,
+                                        new Storage("testMetric", true, 
DownSampling.Minute),
+                                        
StorageManipulationOpt.withSchemaChange()
+        );
+        
config.getMetricsMin().setShardNum(config.getMetricsMin().getShardNum() + 1);
+        
config.getMetricsMin().setSegmentInterval(config.getMetricsMin().getSegmentInterval()
 + 2);
+        config.getMetricsMin().setTtl(config.getMetricsMin().getTtl() + 3);
+        BanyanDBIndexInstaller newInstaller = new 
BanyanDBIndexInstaller(client, moduleManager, config);
+        newInstaller.isExists(updatedModel, 
StorageManipulationOpt.withSchemaChange());
+        //test Group update — assert the live group now reflects the values we 
set on config,
+        //rather than hard-coding the post-mutation numbers (which would 
couple the test to the
+        //defaults shipped in bydb.yml).
+        BanyandbCommon.Group updatedGroup = client.client.findGroup(groupName);
+        assertEquals(config.getMetricsMin().getShardNum(), 
updatedGroup.getResourceOpts().getShardNum());
+        assertEquals(config.getMetricsMin().getSegmentInterval(), 
updatedGroup.getResourceOpts().getSegmentInterval().getNum());
+        assertEquals(config.getMetricsMin().getTtl(), 
updatedGroup.getResourceOpts().getTtl().getNum());
+        //test Measure update
+        BanyandbDatabase.Measure updatedMeasure = 
client.client.findMeasure(groupName, "testMetric_minute");
+        assertEquals("storage-only", 
updatedMeasure.getTagFamilies(0).getName());
+        assertEquals("service_id", 
updatedMeasure.getTagFamilies(0).getTags(0).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
updatedMeasure.getTagFamilies(0).getTags(0).getType());
+        assertEquals("searchable", updatedMeasure.getTagFamilies(1).getName());
+        assertEquals("tag", 
updatedMeasure.getTagFamilies(1).getTags(0).getName());
+        assertEquals("new_tag", 
updatedMeasure.getTagFamilies(1).getTags(1).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
updatedMeasure.getTagFamilies(1).getTags(0).getType());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
updatedMeasure.getTagFamilies(1).getTags(1).getType());
+        assertEquals("service_id", updatedMeasure.getEntity().getTagNames(0));
+        assertEquals("value", updatedMeasure.getFields(0).getName());
+        assertEquals(BanyandbDatabase.FieldType.FIELD_TYPE_INT, 
updatedMeasure.getFields(0).getFieldType());
+        assertEquals("new_value", updatedMeasure.getFields(1).getName());
+        assertEquals(BanyandbDatabase.FieldType.FIELD_TYPE_INT, 
updatedMeasure.getFields(1).getFieldType());
+        //test IndexRule update
+        BanyandbDatabase.IndexRule updatedIndexRuleTag = 
client.client.findIndexRule(groupName, "tag");
+        assertEquals("", updatedIndexRuleTag.getAnalyzer());
+        assertFalse(updatedIndexRuleTag.getNoSort());
+        BanyandbDatabase.IndexRule updatedIndexRuleNewTag = 
client.client.findIndexRule(groupName, "new_tag");
+        assertTrue(updatedIndexRuleNewTag.getNoSort());
+        //test IndexRuleBinding update
+        BanyandbDatabase.IndexRuleBinding updatedIndexRuleBinding = 
client.client.findIndexRuleBinding(
+            groupName, "testMetric_minute");
+        assertEquals("tag", updatedIndexRuleBinding.getRules(0));
+        assertEquals("new_tag", updatedIndexRuleBinding.getRules(1));
+        assertEquals("testMetric_minute", 
updatedIndexRuleBinding.getSubject().getName());
+        //test data
+        MeasureWrite updatedMeasureWrite = 
client.createMeasureWrite(groupName, "testMetric_minute", now.plus(10, 
ChronoUnit.MINUTES).toEpochMilli());
+        updatedMeasureWrite.tag("storage-only", "service_id", 
TagAndValue.stringTagValue("service2"))
+                           .tag("searchable", "tag", 
TagAndValue.stringTagValue("tag1"))
+                           .tag("searchable", "new_tag", 
TagAndValue.stringTagValue("new_tag1"))
+                           .field("value", TagAndValue.longFieldValue(101))
+                           .field("new_value", 
TagAndValue.longFieldValue(1000));
+        CompletableFuture<Void> cf = processor.add(updatedMeasureWrite);
+        cf.exceptionally(exp -> {
+            Assertions.fail(exp.getMessage());
+            return null;
+        });
+        cf.get(10, TimeUnit.SECONDS);
+        MeasureQuery updatedQuery = new MeasureQuery(
+            Lists.newArrayList(groupName), "testMetric_minute",
+            new TimestampRange(begin.toEpochMilli(), now.plus(15, 
ChronoUnit.MINUTES).toEpochMilli()),
+            ImmutableMap.of("service_id", "storage-only", "tag", "searchable", 
"new_tag", "searchable"),
+                                                     ImmutableSet.of("value", 
"new_value")
+        );
+        await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            MeasureQueryResponse updatedResp = client.query(updatedQuery);
+            assertNotNull(updatedResp);
+            assertEquals(2, updatedResp.getDataPoints().size());
+            // Index by service_id so the assertions don't depend on 
server-side ordering
+            // (MeasureQuery doesn't set an orderBy and BanyanDB result order 
is not contractually stable).
+            Map<String, DataPoint> byService = 
updatedResp.getDataPoints().stream()
+                .collect(Collectors.toMap(dp -> (String) 
dp.getTagValue("service_id"), dp -> dp));
+            DataPoint dp1 = byService.get("service1");
+            assertNotNull(dp1);
+            assertEquals("tag1", dp1.getTagValue("tag"));
+            assertEquals(100, (Long) dp1.getFieldValue("value"));
+            DataPoint dp2 = byService.get("service2");
+            assertNotNull(dp2);
+            assertEquals("tag1", dp2.getTagValue("tag"));
+            assertEquals("new_tag1", dp2.getTagValue("new_tag"));
+            assertEquals(101, (Long) dp2.getFieldValue("value"));
+            assertEquals(1000, (Long) dp2.getFieldValue("new_value"));
+        });
+    }
+
+    @Test
+    public void testStreamInstallAndUpdate() throws Exception {
+        DownSamplingConfigService downSamplingConfigService = new 
DownSamplingConfigService(Arrays.asList("minute"));
+        ModuleManager moduleManager = mock(ModuleManager.class);
+        ModuleProviderHolder moduleProviderHolder = 
mock(ModuleProviderHolder.class);
+        ModuleServiceHolder moduleServiceHolder = 
mock(ModuleServiceHolder.class);
+        
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+        when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+        
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+        StorageModels models = new StorageModels();
+        Model model = models.add(TestStream.class, DefaultScopeDefine.SERVICE,
+                                 new Storage("testStream", true, 
DownSampling.Second),
+                                 StorageManipulationOpt.withSchemaChange()
+        );
+        BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, 
moduleManager, config);
+        installer.isExists(model, StorageManipulationOpt.withSchemaChange());
+        // test Group install
+        String groupName = MetadataRegistry.convertGroupName(
+            config.getGlobal().getNamespace(),
+            BanyanDB.StreamGroup.RECORDS_LOG.getName()
+        );
+        BanyandbCommon.Group group = client.client.findGroup(groupName);
+        assertEquals(BanyandbCommon.Catalog.CATALOG_STREAM, 
group.getCatalog());
+        assertEquals(config.getRecordsLog().getSegmentInterval(), 
group.getResourceOpts().getSegmentInterval().getNum());
+        assertEquals(config.getRecordsLog().getShardNum(), 
group.getResourceOpts().getShardNum());
+        assertEquals(BanyandbCommon.IntervalRule.Unit.UNIT_DAY, 
group.getResourceOpts().getSegmentInterval().getUnit());
+        assertEquals(config.getRecordsLog().getTtl(), 
group.getResourceOpts().getTtl().getNum());
+        assertEquals(BanyandbCommon.IntervalRule.Unit.UNIT_DAY, 
group.getResourceOpts().getTtl().getUnit());
+
+        installer.createTable(model);
+        // test Stream install
+        BanyandbDatabase.Stream stream = client.client.findStream(groupName, 
"testStream");
+        assertEquals("storage-only", stream.getTagFamilies(0).getName());
+        assertEquals("service_id", 
stream.getTagFamilies(0).getTags(0).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
stream.getTagFamilies(0).getTags(0).getType());
+        assertEquals("timestamp", 
stream.getTagFamilies(0).getTags(1).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_INT, 
stream.getTagFamilies(0).getTags(1).getType());
+        assertEquals("searchable", stream.getTagFamilies(1).getName());
+        assertEquals("tag", stream.getTagFamilies(1).getTags(0).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
stream.getTagFamilies(1).getTags(0).getType());
+        assertEquals("service_id", stream.getEntity().getTagNames(0));
+        // test IndexRule install
+        BanyandbDatabase.IndexRule indexRuleTag = 
client.client.findIndexRule(groupName, "tag");
+        assertEquals("url", indexRuleTag.getAnalyzer());
+        assertTrue(indexRuleTag.getNoSort());
+        // test IndexRuleBinding install
+        BanyandbDatabase.IndexRuleBinding indexRuleBinding = 
client.client.findIndexRuleBinding(
+            groupName, "testStream");
+        assertEquals("tag", indexRuleBinding.getRules(0));
+        assertEquals("testStream", indexRuleBinding.getSubject().getName());
+
+        // StorageModels.add now dedupes by (name, downsampling); evict the 
original
+        // registration so the UpdateTestStream registration takes effect.
+        models.remove(TestStream.class, 
StorageManipulationOpt.withSchemaChange());
+        Model updatedModel = models.add(UpdateTestStream.class, 
DefaultScopeDefine.SERVICE,
+                                        new Storage("testStream", true, 
DownSampling.Second),
+                                        
StorageManipulationOpt.withSchemaChange()
+        );
+        BanyanDBIndexInstaller newInstaller = new 
BanyanDBIndexInstaller(client, moduleManager, config);
+        newInstaller.isExists(updatedModel, 
StorageManipulationOpt.withSchemaChange());
+        // test Stream update
+        BanyandbDatabase.Stream updatedStream = 
client.client.findStream(groupName, "testStream");
+        assertEquals("storage-only", 
updatedStream.getTagFamilies(0).getName());
+        assertEquals("service_id", 
updatedStream.getTagFamilies(0).getTags(0).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
updatedStream.getTagFamilies(0).getTags(0).getType());
+        assertEquals("timestamp", 
updatedStream.getTagFamilies(0).getTags(1).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_INT, 
updatedStream.getTagFamilies(0).getTags(1).getType());
+        assertEquals("searchable", updatedStream.getTagFamilies(1).getName());
+        assertEquals("tag", 
updatedStream.getTagFamilies(1).getTags(0).getName());
+        assertEquals("new_tag", 
updatedStream.getTagFamilies(1).getTags(1).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
updatedStream.getTagFamilies(1).getTags(0).getType());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
updatedStream.getTagFamilies(1).getTags(1).getType());
+        assertEquals("service_id", updatedStream.getEntity().getTagNames(0));
+        // test IndexRule update
+        BanyandbDatabase.IndexRule updatedIndexRuleTag = 
client.client.findIndexRule(groupName, "tag");
+        assertEquals("", updatedIndexRuleTag.getAnalyzer());
+        assertFalse(updatedIndexRuleTag.getNoSort());
+        BanyandbDatabase.IndexRule updatedIndexRuleNewTag = 
client.client.findIndexRule(groupName, "new_tag");
+        assertTrue(updatedIndexRuleNewTag.getNoSort());
+        // test IndexRuleBinding update
+        BanyandbDatabase.IndexRuleBinding updatedIndexRuleBinding = 
client.client.findIndexRuleBinding(
+            groupName, "testStream");
+        assertEquals("tag", updatedIndexRuleBinding.getRules(0));
+        assertEquals("new_tag", updatedIndexRuleBinding.getRules(1));
+        assertEquals("testStream", 
updatedIndexRuleBinding.getSubject().getName());
+    }
+
+    /**
+     * Drive the allowBootReshape path: install {@link TestStream} first under
+     * {@link StorageManipulationOpt#withSchemaChange()} so the stream lives 
on the backend
+     * with the original shape, then re-install a class that adds {@code 
new_tag} and opts in
+     * via {@code allowBootReshape = true} under {@link 
StorageManipulationOpt#schemaCreateIfAbsent()}.
+     * The installer should apply the additive update during boot rather than 
recording
+     * {@link StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}.
+     */
+    @Test
+    public void testStreamAdditiveBootReshape() throws Exception {
+        DownSamplingConfigService downSamplingConfigService = new 
DownSamplingConfigService(Arrays.asList("minute"));
+        ModuleManager moduleManager = mock(ModuleManager.class);
+        ModuleProviderHolder moduleProviderHolder = 
mock(ModuleProviderHolder.class);
+        ModuleServiceHolder moduleServiceHolder = 
mock(ModuleServiceHolder.class);
+        
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+        when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+        
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+        StorageModels models = new StorageModels();
+        Model baseModel = models.add(TestStream.class, 
DefaultScopeDefine.SERVICE,
+                                     new Storage("testStream", true, 
DownSampling.Second),
+                                     
StorageManipulationOpt.withSchemaChange());
+        BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, 
moduleManager, config);
+        installer.isExists(baseModel, 
StorageManipulationOpt.withSchemaChange());
+        installer.createTable(baseModel);
+
+        String groupName = MetadataRegistry.convertGroupName(
+            config.getGlobal().getNamespace(), 
BanyanDB.StreamGroup.RECORDS_LOG.getName());
+        BanyandbDatabase.Stream initial = client.client.findStream(groupName, 
"testStream");
+        assertEquals(1, initial.getTagFamilies(1).getTagsCount());
+
+        models.remove(TestStream.class, 
StorageManipulationOpt.withSchemaChange());
+        Model reshapedModel = models.add(TestStreamAdditiveReshapeOn.class, 
DefaultScopeDefine.SERVICE,
+                                         new Storage("testStream", true, 
DownSampling.Second),
+                                         
StorageManipulationOpt.withSchemaChange());
+        assertTrue(reshapedModel.isAllowBootReshape());
+
+        StorageManipulationOpt bootOpt = 
StorageManipulationOpt.schemaCreateIfAbsent();
+        new BanyanDBIndexInstaller(client, moduleManager, 
config).isExists(reshapedModel, bootOpt);
+
+        BanyandbDatabase.Stream reshaped = client.client.findStream(groupName, 
"testStream");
+        assertEquals(2, reshaped.getTagFamilies(1).getTagsCount());
+        assertEquals("new_tag", 
reshaped.getTagFamilies(1).getTags(1).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
reshaped.getTagFamilies(1).getTags(1).getType());
+
+        boolean updatedRecorded = bootOpt.getOutcomes().stream()
+            .anyMatch(o -> "stream".equals(o.getResourceType())
+                && "testStream".equals(o.getResourceName())
+                && o.getStatus() == StorageManipulationOpt.Outcome.UPDATED);
+        assertTrue(updatedRecorded, "expected UPDATED outcome for additive 
boot reshape, got " + bootOpt.getOutcomes());
+    }
+
+    /**
+     * Same setup as {@link #testStreamAdditiveBootReshape} but the re-install 
class leaves
+     * {@code allowBootReshape} at its default ({@code false}). Boot must 
refuse to reshape
+     * even though the diff is additive — record {@link 
StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}
+     * and leave the live stream untouched.
+     */
+    @Test
+    public void testStreamAdditiveBootReshape_optOutSkips() throws Exception {
+        DownSamplingConfigService downSamplingConfigService = new 
DownSamplingConfigService(Arrays.asList("minute"));
+        ModuleManager moduleManager = mock(ModuleManager.class);
+        ModuleProviderHolder moduleProviderHolder = 
mock(ModuleProviderHolder.class);
+        ModuleServiceHolder moduleServiceHolder = 
mock(ModuleServiceHolder.class);
+        
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+        when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+        
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+        StorageModels models = new StorageModels();
+        Model baseModel = models.add(TestStream.class, 
DefaultScopeDefine.SERVICE,
+                                     new Storage("testStream", true, 
DownSampling.Second),
+                                     
StorageManipulationOpt.withSchemaChange());
+        BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, 
moduleManager, config);
+        installer.isExists(baseModel, 
StorageManipulationOpt.withSchemaChange());
+        installer.createTable(baseModel);
+
+        models.remove(TestStream.class, 
StorageManipulationOpt.withSchemaChange());
+        Model reshapedModel = models.add(TestStreamAdditiveReshapeOff.class, 
DefaultScopeDefine.SERVICE,
+                                         new Storage("testStream", true, 
DownSampling.Second),
+                                         
StorageManipulationOpt.withSchemaChange());
+        assertFalse(reshapedModel.isAllowBootReshape());
+
+        StorageManipulationOpt bootOpt = 
StorageManipulationOpt.schemaCreateIfAbsent();
+        new BanyanDBIndexInstaller(client, moduleManager, 
config).isExists(reshapedModel, bootOpt);
+
+        String groupName = MetadataRegistry.convertGroupName(
+            config.getGlobal().getNamespace(), 
BanyanDB.StreamGroup.RECORDS_LOG.getName());
+        BanyandbDatabase.Stream live = client.client.findStream(groupName, 
"testStream");
+        assertEquals(1, live.getTagFamilies(1).getTagsCount());
+
+        boolean skipRecorded = bootOpt.getOutcomes().stream()
+            .anyMatch(o -> "stream".equals(o.getResourceType())
+                && "testStream".equals(o.getResourceName())
+                && o.getStatus() == 
StorageManipulationOpt.Outcome.SKIPPED_SHAPE_MISMATCH);
+        assertTrue(skipRecorded, "expected SKIPPED_SHAPE_MISMATCH without 
allowBootReshape opt-in, got " + bootOpt.getOutcomes());
+    }
+
+    /**
+     * Measure variant of {@link #testStreamAdditiveBootReshape}: install 
{@link TestMetric},
+     * then re-install {@link TestMetricAdditiveReshapeOn} which adds {@code 
new_tag} and
+     * {@code new_value} (a field) and opts in via {@code allowBootReshape = 
true}. The boot
+     * installer should apply the additive update and record
+     * {@link StorageManipulationOpt.Outcome#UPDATED}.
+     */
+    @Test
+    public void testMeasureAdditiveBootReshape() throws Exception {
+        DownSamplingConfigService downSamplingConfigService = new 
DownSamplingConfigService(Arrays.asList("minute"));
+        ModuleManager moduleManager = mock(ModuleManager.class);
+        ModuleProviderHolder moduleProviderHolder = 
mock(ModuleProviderHolder.class);
+        ModuleServiceHolder moduleServiceHolder = 
mock(ModuleServiceHolder.class);
+        
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+        when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+        
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+        StorageModels models = new StorageModels();
+        Model baseModel = models.add(TestMetric.class, 
DefaultScopeDefine.SERVICE,
+                                     new Storage("testMetric", true, 
DownSampling.Minute),
+                                     
StorageManipulationOpt.withSchemaChange());
+        BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, 
moduleManager, config);
+        installer.isExists(baseModel, 
StorageManipulationOpt.withSchemaChange());
+        installer.createTable(baseModel);
+
+        models.remove(TestMetric.class, 
StorageManipulationOpt.withSchemaChange());
+        Model reshapedModel = models.add(TestMetricAdditiveReshapeOn.class, 
DefaultScopeDefine.SERVICE,
+                                         new Storage("testMetric", true, 
DownSampling.Minute),
+                                         
StorageManipulationOpt.withSchemaChange());
+        assertTrue(reshapedModel.isAllowBootReshape());
+
+        StorageManipulationOpt bootOpt = 
StorageManipulationOpt.schemaCreateIfAbsent();
+        new BanyanDBIndexInstaller(client, moduleManager, 
config).isExists(reshapedModel, bootOpt);
+
+        String groupName = MetadataRegistry.convertGroupName(
+            config.getGlobal().getNamespace(), 
BanyanDB.MeasureGroup.METRICS_MINUTE.getName());
+        BanyandbDatabase.Measure reshaped = 
client.client.findMeasure(groupName, "testMetric_minute");
+        assertEquals(2, reshaped.getTagFamilies(1).getTagsCount());
+        assertEquals("new_tag", 
reshaped.getTagFamilies(1).getTags(1).getName());
+        assertEquals(2, reshaped.getFieldsCount());
+        assertEquals("new_value", reshaped.getFields(1).getName());
+
+        boolean updatedRecorded = bootOpt.getOutcomes().stream()
+            .anyMatch(o -> "measure".equals(o.getResourceType())
+                && "testMetric_minute".equals(o.getResourceName())
+                && o.getStatus() == StorageManipulationOpt.Outcome.UPDATED);
+        assertTrue(updatedRecorded, "expected UPDATED outcome for additive 
measure boot reshape, got " + bootOpt.getOutcomes());
+    }
+
+    /**
+     * Opt-in is necessary but not sufficient: even with {@code 
allowBootReshape = true} a
+     * diff that is not purely additive (here, the {@code tag} column flips 
from
+     * {@code String} → {@code long}, i.e. {@code TAG_TYPE_STRING} → {@code 
TAG_TYPE_INT})
+     * must be refused at boot. {@link 
StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}
+     * is recorded and the live stream stays unchanged, forcing the operator 
to do an
+     * explicit drop+recreate (the only safe path for identity-breaking 
changes).
+     */
+    @Test
+    public void testStreamNonAdditiveBootReshape_optInStillSkips() throws 
Exception {
+        DownSamplingConfigService downSamplingConfigService = new 
DownSamplingConfigService(Arrays.asList("minute"));
+        ModuleManager moduleManager = mock(ModuleManager.class);
+        ModuleProviderHolder moduleProviderHolder = 
mock(ModuleProviderHolder.class);
+        ModuleServiceHolder moduleServiceHolder = 
mock(ModuleServiceHolder.class);
+        
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+        when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+        
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+        StorageModels models = new StorageModels();
+        Model baseModel = models.add(TestStream.class, 
DefaultScopeDefine.SERVICE,
+                                     new Storage("testStream", true, 
DownSampling.Second),
+                                     
StorageManipulationOpt.withSchemaChange());
+        BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, 
moduleManager, config);
+        installer.isExists(baseModel, 
StorageManipulationOpt.withSchemaChange());
+        installer.createTable(baseModel);
+
+        models.remove(TestStream.class, 
StorageManipulationOpt.withSchemaChange());
+        Model reshapedModel = models.add(TestStreamNonAdditiveReshapeOn.class, 
DefaultScopeDefine.SERVICE,
+                                         new Storage("testStream", true, 
DownSampling.Second),
+                                         
StorageManipulationOpt.withSchemaChange());
+        assertTrue(reshapedModel.isAllowBootReshape());
+
+        StorageManipulationOpt bootOpt = 
StorageManipulationOpt.schemaCreateIfAbsent();
+        new BanyanDBIndexInstaller(client, moduleManager, 
config).isExists(reshapedModel, bootOpt);
+
+        String groupName = MetadataRegistry.convertGroupName(
+            config.getGlobal().getNamespace(), 
BanyanDB.StreamGroup.RECORDS_LOG.getName());
+        BanyandbDatabase.Stream live = client.client.findStream(groupName, 
"testStream");
+        assertEquals("tag", live.getTagFamilies(1).getTags(0).getName());
+        assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING, 
live.getTagFamilies(1).getTags(0).getType());
+
+        boolean skipRecorded = bootOpt.getOutcomes().stream()
+            .anyMatch(o -> "stream".equals(o.getResourceType())
+                && "testStream".equals(o.getResourceName())
+                && o.getStatus() == 
StorageManipulationOpt.Outcome.SKIPPED_SHAPE_MISMATCH);
+        assertTrue(skipRecorded,
+            "expected SKIPPED_SHAPE_MISMATCH for non-additive (type-change) 
diff even with opt-in, got " + bootOpt.getOutcomes());
+    }
+
+    /**
+     * Non-additive variant: {@code tag} is now {@code long} (TAG_TYPE_INT) 
where the live
+     * stream has it as {@code String} (TAG_TYPE_STRING). Boot must refuse to 
reshape even
+     * with {@code allowBootReshape = true}.
+     */
+    @Stream(name = "testStream", scopeId = DefaultScopeDefine.SERVICE,
+        builder = TestStreamNonAdditiveReshapeOn.Builder.class, processor = 
RecordStreamProcessor.class,
+        allowBootReshape = true)
+    @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG)
+    @BanyanDB.TimestampColumn("timestamp")
+    private static class TestStreamNonAdditiveReshapeOn extends Record {
+        @Column(name = "service_id")
+        @BanyanDB.SeriesID(index = 0)
+        private String serviceId;
+        @Column(name = "tag")
+        private long tag;
+        @Column(name = "timestamp")
+        private long timestamp;
+
+        @Override
+        public StorageID id() {
+            return new StorageID();
+        }
+
+        static class Builder implements StorageBuilder<StorageData> {
+            @Override
+            public StorageData storage2Entity(final Convert2Entity converter) {
+                return null;
+            }
+
+            @Override
+            public void entity2Storage(final StorageData entity, final 
Convert2Storage converter) {
+            }
+        }
+    }
+
+    /**
+     * Mirror of {@link UpdateTestStream} but trimmed to a purely-additive 
shape change
+     * (only {@code new_tag} is added; {@code tag}'s index settings stay 
matched to
+     * {@link TestStream}) and with the new {@code allowBootReshape} opt-in 
flipped on.
+     */
+    @Stream(name = "testStream", scopeId = DefaultScopeDefine.SERVICE,
+        builder = TestStreamAdditiveReshapeOn.Builder.class, processor = 
RecordStreamProcessor.class,
+        allowBootReshape = true)
+    @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG)
+    @BanyanDB.TimestampColumn("timestamp")
+    private static class TestStreamAdditiveReshapeOn extends Record {
+        @Column(name = "service_id")
+        @BanyanDB.SeriesID(index = 0)
+        private String serviceId;
+        @Column(name = "tag")
+        @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)
+        private String tag;
+        @Column(name = "new_tag")
+        private String newTag;
+        @Column(name = "timestamp")
+        private long timestamp;
+
+        @Override
+        public StorageID id() {
+            return new StorageID();
+        }
+
+        static class Builder implements StorageBuilder<StorageData> {
+            @Override
+            public StorageData storage2Entity(final Convert2Entity converter) {
+                return null;
+            }
+
+            @Override
+            public void entity2Storage(final StorageData entity, final 
Convert2Storage converter) {
+            }
+        }
+    }
+
+    /**
+     * Same additive shape as {@link TestStreamAdditiveReshapeOn} but without 
the opt-in —
+     * boot must refuse to reshape and record SKIPPED_SHAPE_MISMATCH.
+     */
+    @Stream(name = "testStream", scopeId = DefaultScopeDefine.SERVICE,
+        builder = TestStreamAdditiveReshapeOff.Builder.class, processor = 
RecordStreamProcessor.class)
+    @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG)
+    @BanyanDB.TimestampColumn("timestamp")
+    private static class TestStreamAdditiveReshapeOff extends Record {
+        @Column(name = "service_id")
+        @BanyanDB.SeriesID(index = 0)
+        private String serviceId;
+        @Column(name = "tag")
+        @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)
+        private String tag;
+        @Column(name = "new_tag")
+        private String newTag;
+        @Column(name = "timestamp")
+        private long timestamp;
+
+        @Override
+        public StorageID id() {
+            return new StorageID();
+        }
+
+        static class Builder implements StorageBuilder<StorageData> {
+            @Override
+            public StorageData storage2Entity(final Convert2Entity converter) {
+                return null;
+            }
+
+            @Override
+            public void entity2Storage(final StorageData entity, final 
Convert2Storage converter) {
+            }
+        }
+    }
+
+    /**
+     * Mirror of {@link UpdateTestMetric} but trimmed to a purely-additive 
shape change
+     * (new tag, new field) and with {@code allowBootReshape = true}.
+     */
+    @Stream(name = "testMetric", scopeId = DefaultScopeDefine.SERVICE,
+        builder = TestMetricAdditiveReshapeOn.Builder.class, processor = 
MetricsStreamProcessor.class,
+        allowBootReshape = true)
+    private static class TestMetricAdditiveReshapeOn {
+        @Column(name = "service_id")
+        @BanyanDB.SeriesID(index = 0)
+        @BanyanDB.ShardingKey(index = 0)
+        private String serviceId;
+        @Column(name = "tag")
+        @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)
+        private String tag;
+        @Column(name = "new_tag")
+        private String newTag;
+        @Column(name = "value", dataType = Column.ValueDataType.COMMON_VALUE)
+        @BanyanDB.MeasureField
+        private long value;
+        @Column(name = "new_value", storageOnly = true)
+        @BanyanDB.MeasureField
+        private long newValue;
+
+        static class Builder implements StorageBuilder<StorageData> {
+            @Override
+            public StorageData storage2Entity(final Convert2Entity converter) {
+                return null;
+            }
+
+            @Override
+            public void entity2Storage(final StorageData entity, final 
Convert2Storage converter) {
+            }
+        }
+    }
+
+    @Stream(name = "testStream", scopeId = DefaultScopeDefine.SERVICE,
+        builder = TestStream.Builder.class, processor = 
RecordStreamProcessor.class)
+    @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG)
+    @BanyanDB.TimestampColumn("timestamp")
+    private static class TestStream extends Record {
+        @Column(name = "service_id")
+        @BanyanDB.SeriesID(index = 0)
+        private String serviceId;
+        @Column(name = "tag")
+        @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)
+        private String tag;
+        @Column(name = "timestamp")
+        private long timestamp;
+
+        @Override
+        public StorageID id() {
+            return new StorageID();
+        }
+
+        static class Builder implements StorageBuilder<StorageData> {
+            @Override
+            public StorageData storage2Entity(final Convert2Entity converter) {
+                return null;
+            }
+
+            @Override
+            public void entity2Storage(final StorageData entity, final 
Convert2Storage converter) {
+
+            }
+        }
+    }
+
+    @Stream(name = "testStream", scopeId = DefaultScopeDefine.SERVICE,
+        builder = UpdateTestStream.Builder.class, processor = 
RecordStreamProcessor.class)
+    @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG)
+    @BanyanDB.TimestampColumn("timestamp")
+    private static class UpdateTestStream extends Record {
+        @Column(name = "service_id")
+        @BanyanDB.SeriesID(index = 0)
+        private String serviceId;
+        @Column(name = "tag")
+        @BanyanDB.EnableSort
+        private String tag;
+        @Column(name = "new_tag")
+        private String newTag;
+        @Column(name = "timestamp")
+        private long timestamp;
+
+        @Override
+        public StorageID id() {
+            return new StorageID();
+        }
+
+        static class Builder implements StorageBuilder<StorageData> {
+            @Override
+            public StorageData storage2Entity(final Convert2Entity converter) {
+                return null;
+            }
+
+            @Override
+            public void entity2Storage(final StorageData entity, final 
Convert2Storage converter) {
+
+            }
+        }
+    }
+
+    @Stream(name = "testMetric", scopeId = DefaultScopeDefine.SERVICE,
+        builder = TestMetric.Builder.class, processor = 
MetricsStreamProcessor.class)
+    private static class TestMetric {
+        @Column(name = "service_id")
+        @BanyanDB.SeriesID(index = 0)
+        @BanyanDB.ShardingKey(index = 0)
+        private String serviceId;
+        @Column(name = "tag")
+        @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)
+        private String tag;
+        @Column(name = "value", dataType = Column.ValueDataType.COMMON_VALUE)
+        @BanyanDB.MeasureField
+        private long value;
+
+        static class Builder implements StorageBuilder<StorageData> {
+            @Override
+            public StorageData storage2Entity(final Convert2Entity converter) {
+                return null;
+            }
+
+            @Override
+            public void entity2Storage(final StorageData entity, final 
Convert2Storage converter) {
+
+            }
+        }
+    }
+
+    @Stream(name = "testMetric", scopeId = DefaultScopeDefine.SERVICE,
+        builder = UpdateTestMetric.Builder.class, processor = 
MetricsStreamProcessor.class)
+    private static class UpdateTestMetric {
+        @Column(name = "service_id")
+        @BanyanDB.SeriesID(index = 0)
+        private String serviceId;
+        @Column(name = "tag")
+        @BanyanDB.EnableSort
+        private String tag;
+        @Column(name = "new_tag")
+        private String newTag;
+        @Column(name = "value", dataType = Column.ValueDataType.COMMON_VALUE)
+        @BanyanDB.MeasureField
+        private long value;
+        @Column(name = "new_value", storageOnly = true)
+        @BanyanDB.MeasureField
+        private long newValue;
+
+        static class Builder implements StorageBuilder<StorageData> {
+            @Override
+            public StorageData storage2Entity(final Convert2Entity converter) {
+                return null;
+            }
+
+            @Override
+            public void entity2Storage(final StorageData entity, final 
Convert2Storage converter) {
+
+            }
+        }
+    }
+}
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.yml
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.yml
index 1260ffa34d..b1ce48367b 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.yml
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.yml
@@ -43,6 +43,7 @@ global:
   # The batch size for querying profile data.
   profileDataQueryBatchSize: 
${SW_STORAGE_BANYANDB_QUERY_PROFILE_DATA_BATCH_SIZE:100}
   asyncProfilerTaskQueryMaxSize: 
${SW_STORAGE_BANYANDB_ASYNC_PROFILER_TASK_QUERY_MAX_SIZE:200}
+  pprofTaskQueryMaxSize: ${SW_STORAGE_BANYANDB_PPROF_TASK_QUERY_MAX_SIZE:200}
   user: ${SW_STORAGE_BANYANDB_USER:""}
   password: ${SW_STORAGE_BANYANDB_PASSWORD:""}
   # If the BanyanDB server is configured with TLS, configure the TLS cert file 
path and enable TLS connection.
@@ -52,6 +53,9 @@ global:
   # The namespace in BanyanDB to store the data of OAP, if not set, the 
default is "sw".
   # OAP will create BanyanDB Groups using the format of "{namespace}_{group 
name}", such as "sw_records".
   namespace: ${SW_NAMESPACE:"sw"}
+  # The compatible server API versions of BanyanDB.
+  # The compatible BanyanDB Server version number can be found via the [API 
versions 
mapping](https://skywalking.apache.org/docs/skywalking-banyandb/latest/installation/versions/).
+  compatibleServerApiVersions: 
${SW_STORAGE_BANYANDB_COMPATIBLE_SERVER_API_VERSIONS:"0.10"}
 
 groups:
   # The group settings of record.
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
index 931edd6615..0002de624b 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
@@ -43,15 +43,15 @@ public class TimeSeriesUtilsTest {
     @BeforeEach
     public void prepare() {
         superDatasetModel = new Model("superDatasetModel", 
Lists.newArrayList(),
-                                      0, DownSampling.Second, true, 
Record.class, true,
+                                      0, DownSampling.Second, true, 
Record.class, true, false,
                                       new SQLDatabaseModelExtension(), new 
BanyanDBModelExtension(), new ElasticSearchModelExtension()
         );
         normalRecordModel = new Model("normalRecordModel", 
Lists.newArrayList(),
-                                      0, DownSampling.Second, false, 
Record.class, true,
+                                      0, DownSampling.Second, false, 
Record.class, true, false,
                                       new SQLDatabaseModelExtension(), new 
BanyanDBModelExtension(), new ElasticSearchModelExtension()
         );
         normalMetricsModel = new Model("normalMetricsModel", 
Lists.newArrayList(),
-                                       0, DownSampling.Minute, false, 
Metrics.class, true,
+                                       0, DownSampling.Minute, false, 
Metrics.class, true, false,
                                        new SQLDatabaseModelExtension(), new 
BanyanDBModelExtension(), new ElasticSearchModelExtension()
         );
         TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCHistoryDeleteDAOIT.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCHistoryDeleteDAOIT.java
index c0a729ba29..ecbc900913 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCHistoryDeleteDAOIT.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCHistoryDeleteDAOIT.java
@@ -107,7 +107,7 @@ class JDBCHistoryDeleteDAOIT {
                     new SQLDatabaseExtension(), null, null),
                 new ModelColumn(new ColumnName(timeBucketColumn), Long.class, 
Long.class, false, false, false, 0,
                     new SQLDatabaseExtension(), null, null)
-            ), 1, DownSampling.Minute, false, ServiceTraffic.class, false, new 
SQLDatabaseModelExtension(),
+            ), 1, DownSampling.Minute, false, ServiceTraffic.class, false, 
false, new SQLDatabaseModelExtension(),
             new BanyanDBModelExtension(), new ElasticSearchModelExtension());
 
         TableMetaInfo.addModel(model);

Reply via email to