This is an automated email from the ASF dual-hosted git repository. wankai123 pushed a commit to branch tagfamily-update in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 1e0af210393f7ed1832183a33d1297131a57250d Author: wankai123 <[email protected]> AuthorDate: Fri May 22 09:17:44 2026 +0800 banyandb support update tag family --- docs/en/changes/changes.md | 2 +- .../runtime-rule-hot-update.md | 39 ++++- .../plugin/banyandb/BanyanDBIndexInstaller.java | 163 +++++++++++++++------ .../server/storage/plugin/banyandb/BanyanDBIT.java | 137 +++++++++++++++++ 4 files changed, 290 insertions(+), 51 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 322ada7e4d..ef2c809d90 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -279,7 +279,7 @@ * LAL: support full arithmetic (`+`, `-`, `*`, `/`) on numeric operands and fix the original bug where `(tag("x") as Integer) + (tag("y") as Integer)` was treated as string concatenation — expressions like `input_tokens + output_tokens < 10000` produced the concatenated string `"2589115"` rather than the integer sum `2704`, so token-threshold conditions never triggered `abort {}`. Operand types are now inferred from explicit casts (`as Integer` / `as Long` / `as Float` / `as Double`), ty [...] * Fix: `avgHistogramPercentile` / `sumHistogramPercentile` meter functions reported the smallest finite bucket boundary (e.g. `10` for OTel `gen_ai_server_request_duration` whose `le` is rewritten from `0.01s` → `10ms`) for every rank when no samples were observed in any bucket. The percentile loop's `count >= roof` check matched on the first sorted bucket because both sides were `0`. `calculate()` now short-circuits to `0` for every rank when the windowed total is `0`. * Fix: MAL `expPrefix` now applies to every metric source in `exp`, not just the leading one. Previously the prefix was spliced after the first `.`, so secondary metrics inside arguments (e.g. the divisor in `a.sum(['s']).safeDiv(b.sum(['s']))`) silently skipped the prefix — a rule like envoy-ai-gateway's `request_latency_avg` (`sum / count`) would tag-rewrite only the dividend. The injection is now AST-aware: every bare-IDENTIFIER metric source is wrapped, while downsampling-type consta [...] -* Add `@Stream(allowBootReshape = true)` opt-in for additive boot-time reshape of BanyanDB streams / measures. Code-defined stream classes (e.g. `AlarmRecord`) can now annotate their schema as eligible for in-place additive update at OAP boot — a new `@Column` is appended to the live tag-family / fields via `client.update` instead of being silently rejected with `SKIPPED_SHAPE_MISMATCH` (which previously forced operators to drop the measure / stream and lose historical rows). The opt-in [...] +* Add `@Stream(allowBootReshape = true)` opt-in for additive boot-time reshape of BanyanDB streams / measures. Code-defined stream classes (e.g. `AlarmRecord`) can now annotate their schema as eligible for in-place additive update at OAP boot — a new `@Column` is appended to the live tag-family / fields via `client.update` instead of being silently rejected with `SKIPPED_SHAPE_MISMATCH` (which previously forced operators to drop the measure / stream and lose historical rows). Additive in [...] #### UI * Add mobile menu icon and i18n labels for the iOS layer. diff --git a/docs/en/concepts-and-designs/runtime-rule-hot-update.md b/docs/en/concepts-and-designs/runtime-rule-hot-update.md index d20fc5ed86..78ee77f6d4 100644 --- a/docs/en/concepts-and-designs/runtime-rule-hot-update.md +++ b/docs/en/concepts-and-designs/runtime-rule-hot-update.md @@ -169,18 +169,43 @@ backing schema needs to change. Streams whose schema lives in OAP source code (e.g. `AlarmRecord`) can opt in to **additive** boot-time reshape via `@Stream(allowBootReshape = true)`. When the flag is on and the diff is -purely additive (new tag / new field; no type changes, no drops, no entity / -interval / sharding-key flips), the installer calls `client.update` at boot -to append the new column to the live measure / stream; non-additive -divergences still record `SKIPPED_SHAPE_MISMATCH` and require an operator -drop+recreate. Only the init / standalone OAP performs the reshape; non-init -peers continue through the existing poll-and-wait loop so a single node -drives DDL during a rolling restart. +purely additive, the installer calls `client.update` at boot to extend the +live measure / stream; non-additive divergences still record +`SKIPPED_SHAPE_MISMATCH` and require an operator drop+recreate. Only the +init / standalone OAP performs the reshape; non-init peers continue through +the existing poll-and-wait loop so a single node drives DDL during a rolling +restart. + +"Additive" includes two cases: + +1. **New tag / new field** — a brand-new `@Column` is appended to the live + tag family (or fields list, for measures). +2. **Tag relocation between families** — a `@Column`'s `storageOnly` flag + flips, moving the tag between the `storage-only` and `searchable` + families. The tag identity and type are preserved; only its on-disk + family location changes. + +Drops, tag-type changes, kind flips (tag↔field), and entity / interval / +sharding-key changes are still rejected with `SKIPPED_SHAPE_MISMATCH`. + +When the primary `check*` records `SKIPPED_SHAPE_MISMATCH`, the dependent +`IndexRule` and `IndexRuleBinding` reconciliation is **also skipped**. This +preserves coherence between the stream / measure tag layout and the binding +that points into it — without the gate, the binding would silently update to +reference the new declared tag list while the live tag families still carry +the old shape, leaving operators with a binding routing to tags that don't +exist in the live family layout. This opt-in is **BanyanDB-only**. JDBC and Elasticsearch are append-only on the data path and already accept additive column / mapping additions at boot without operator intervention, so the flag is unread on those backends. +> **Operator caveat:** BanyanDB does not physically migrate existing rows +> when a tag's family changes. Pre-existing data for the relocated tag stays +> in its original on-disk family location; new writes go to the declared +> family. Queries that route through a new IndexRule on the relocated tag +> will only see post-reshape rows until historical data ages out via TTL. + ## On-demand workflow Triggered by an HTTP call to one of the admin endpoints. A request arriving at any diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java index 092214919e..b4206c011c 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java @@ -160,37 +160,46 @@ public class BanyanDBIndexInstaller extends ModelInstaller { return installInfo; } if (runShapeChecks) { - checkTrace(traceModel.getTrace(), c, opt); - checkIndexRules(model.getName(), traceModel.getIndexRules(), c, opt); - checkIndexRuleBinding( - traceModel.getIndexRules(), metadata.getGroup(), metadata.name(), - BanyandbCommon.Catalog.CATALOG_TRACE, c, opt - ); + if (checkTrace(traceModel.getTrace(), c, opt)) { + checkIndexRules(model.getName(), traceModel.getIndexRules(), c, opt); + checkIndexRuleBinding( + traceModel.getIndexRules(), metadata.getGroup(), metadata.name(), + BanyandbCommon.Catalog.CATALOG_TRACE, c, opt + ); + } else { + skipDependentReconcile(opt, "trace", metadata.name()); + } } } else { // stream StreamModel streamModel = MetadataRegistry.INSTANCE.registerStreamModel( model, config); if (runShapeChecks) { - checkStream(model, streamModel.getStream(), c, opt); - checkIndexRules(model.getName(), streamModel.getIndexRules(), c, opt); - checkIndexRuleBinding( - streamModel.getIndexRules(), metadata.getGroup(), metadata.name(), - BanyandbCommon.Catalog.CATALOG_STREAM, c, opt - ); - // Stream not support server side TopN pre-aggregation + if (checkStream(model, streamModel.getStream(), c, opt)) { + checkIndexRules(model.getName(), streamModel.getIndexRules(), c, opt); + checkIndexRuleBinding( + streamModel.getIndexRules(), metadata.getGroup(), metadata.name(), + BanyandbCommon.Catalog.CATALOG_STREAM, c, opt + ); + // Stream not support server side TopN pre-aggregation + } else { + skipDependentReconcile(opt, "stream", metadata.name()); + } } } } else { // measure MeasureModel measureModel = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, downSamplingConfigService); if (runShapeChecks) { - checkMeasure(model, measureModel.getMeasure(), c, opt); - checkIndexRules(model.getName(), measureModel.getIndexRules(), c, opt); - checkIndexRuleBinding( - measureModel.getIndexRules(), metadata.getGroup(), metadata.name(), - BanyandbCommon.Catalog.CATALOG_MEASURE, c, opt - ); - checkTopNAggregation(model, c, opt); + if (checkMeasure(model, measureModel.getMeasure(), c, opt)) { + checkIndexRules(model.getName(), measureModel.getIndexRules(), c, opt); + checkIndexRuleBinding( + measureModel.getIndexRules(), metadata.getGroup(), metadata.name(), + BanyandbCommon.Catalog.CATALOG_MEASURE, c, opt + ); + checkTopNAggregation(model, c, opt); + } else { + skipDependentReconcile(opt, "measure", metadata.name()); + } } } } else { @@ -776,7 +785,16 @@ public class BanyanDBIndexInstaller extends ModelInstaller { * {@link org.apache.skywalking.oap.server.core.storage.model.ModelInstaller#whenCreating} * so only one node races on the DDL. */ - private void checkMeasure(Model model, Measure measure, BanyanDBClient client, StorageManipulationOpt opt) throws BanyanDBException { + /** + * @return {@code true} when the live measure is now aligned with the declared shape + * (either it already matched, or the installer successfully applied an update); + * {@code false} when the shape diverged and the installer recorded + * {@link StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}. Callers use the + * return value to skip dependent resources (index rules, binding, TopN) so a + * non-additive divergence doesn't leave the binding pointing at a stream/measure + * that no longer agrees with it. + */ + private boolean checkMeasure(Model model, Measure measure, BanyanDBClient client, StorageManipulationOpt opt) throws BanyanDBException { Measure hisMeasure = client.findMeasure(measure.getMetadata().getGroup(), measure.getMetadata().getName()); if (hisMeasure == null) { throw new IllegalStateException("Measure: " + measure.getMetadata().getName() + " exist but can't find it from BanyanDB server"); @@ -796,7 +814,7 @@ public class BanyanDBIndexInstaller extends ModelInstaller { opt.recordOutcome("measure", hisMeasure.getMetadata().getName(), StorageManipulationOpt.Outcome.UPDATED, "additive boot reshape: new tag / field added"); - return; + return true; } log.error("BanyanDB measure {} shape mismatch at boot — backend holds a " + "different shape than the declared rule. SKIPPING metric; operator " @@ -806,13 +824,14 @@ public class BanyanDBIndexInstaller extends ModelInstaller { opt.recordOutcome("measure", hisMeasure.getMetadata().getName(), StorageManipulationOpt.Outcome.SKIPPED_SHAPE_MISMATCH, "backend shape differs from declared shape; use /runtime/rule/addOrUpdate to reshape"); - return; + return false; } // banyanDB server can not delete or update Tags. opt.recordModRevision(client.update(measure)); log.info("update Measure: {} from: {} to: {}", hisMeasure.getMetadata().getName(), hisMeasure, measure); } } + return true; } /** @@ -820,7 +839,14 @@ public class BanyanDBIndexInstaller extends ModelInstaller { * See {@link #checkMeasure} for the create-if-absent vs full-install contract, * including the {@link Model#isAllowBootReshape()} additive opt-in. */ - private void checkStream(Model model, Stream stream, BanyanDBClient client, StorageManipulationOpt opt) throws BanyanDBException { + /** + * @return {@code true} when the live stream is now aligned with the declared shape + * (already matched or successfully updated); {@code false} when the installer + * recorded {@link StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}. See + * {@link #checkMeasure} for why callers must gate dependent index-rule / + * binding reconciliation on this signal. + */ + private boolean checkStream(Model model, Stream stream, BanyanDBClient client, StorageManipulationOpt opt) throws BanyanDBException { Stream hisStream = client.findStream(stream.getMetadata().getGroup(), stream.getMetadata().getName()); if (hisStream == null) { throw new IllegalStateException("Stream: " + stream.getMetadata().getName() + " exist but can't find it from BanyanDB server"); @@ -840,7 +866,7 @@ public class BanyanDBIndexInstaller extends ModelInstaller { opt.recordOutcome("stream", hisStream.getMetadata().getName(), StorageManipulationOpt.Outcome.UPDATED, "additive boot reshape: new tag added"); - return; + return true; } log.error("BanyanDB stream {} shape mismatch at boot — backend holds a " + "different shape than the declared rule. SKIPPING; operator must " @@ -849,12 +875,13 @@ public class BanyanDBIndexInstaller extends ModelInstaller { opt.recordOutcome("stream", hisStream.getMetadata().getName(), StorageManipulationOpt.Outcome.SKIPPED_SHAPE_MISMATCH, "backend shape differs from declared shape; use /runtime/rule/addOrUpdate to reshape"); - return; + return false; } opt.recordModRevision(client.update(stream)); log.info("update Stream: {} from: {} to: {}", hisStream.getMetadata().getName(), hisStream, stream); } } + return true; } /** @@ -878,12 +905,42 @@ public class BanyanDBIndexInstaller extends ModelInstaller { } /** - * Purely-additive diff for a BanyanDB {@link Stream}: declared may add tag families or - * tags, but every tag-family / tag that already exists on the backend must be present - * with the same name and {@link BanyandbDatabase.TagType type}, the {@link BanyandbDatabase.Entity entity} - * column list must match exactly (reshape can't change shard / series-id semantics), - * and no tag may be dropped. Returns false for any non-additive divergence so the caller - * falls back to {@link StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}. + * Record a parallel {@link StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH} for the + * index-rule + binding (+ TopN, for measures) resources of a stream / measure / trace + * whose primary {@code check*} just skipped. Calling + * {@code checkIndexRules} / {@code checkIndexRuleBinding} unconditionally after a primary + * skip would silently update the binding to reference the new declared rule list while + * the underlying schema still carries the old shape — operators end up with a binding + * pointing at tags / fields that don't agree with the live tag family layout (e.g. a tag + * was dropped from the declared model but kept on the backend, the binding loses its + * reference, and the orphan IndexRule becomes unqueryable). + * + * <p>Skipping the dependent reconcile keeps live state coherent: either everything + * matches the declared shape, or nothing on this resource is touched until the operator + * drops + recreates. + */ + private void skipDependentReconcile(StorageManipulationOpt opt, String resourceType, String resourceName) { + log.warn("BanyanDB {} {} shape mismatch — skipping dependent IndexRule / IndexRuleBinding " + + "reconciliation to avoid partial reshape (binding would point at the new tag " + + "list while the live tag families still carry the old shape).", + resourceType, resourceName); + opt.recordOutcome("indexRules", resourceName, + StorageManipulationOpt.Outcome.SKIPPED_SHAPE_MISMATCH, + resourceType + " shape mismatch; index-rule reconcile skipped"); + opt.recordOutcome("indexRuleBinding", resourceName, + StorageManipulationOpt.Outcome.SKIPPED_SHAPE_MISMATCH, + resourceType + " shape mismatch; binding reconcile skipped"); + } + + /** + * Purely-additive diff for a BanyanDB {@link Stream}: declared may add tags or relocate + * existing tags between families (a {@code storageOnly} toggle on a {@code @Column} + * moves a tag between {@code storage-only} and {@code searchable}; the tag identity is + * preserved, only its on-disk family location changes). The {@link BanyandbDatabase.Entity entity} + * column list must still match exactly (reshape can't change shard / series-id semantics), + * existing tag types may not change, and no tag may be dropped. Returns false for any + * non-additive divergence so the caller falls back to + * {@link StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}. */ private boolean isPurelyAdditiveStream(Stream declared, Stream live) { if (!declared.getEntity().equals(live.getEntity())) { @@ -928,23 +985,37 @@ public class BanyanDBIndexInstaller extends ModelInstaller { return true; } + /** + * Tag-family compatibility check used by {@link #isPurelyAdditiveStream} / + * {@link #isPurelyAdditiveMeasure}. The check is name+type oriented, not family-position + * oriented — a tag may move between families (e.g. a {@code @Column} flips from + * {@code storageOnly = true} → {@code false}, which relocates it from the + * {@code storage-only} family to {@code searchable}) and is still considered safe to + * apply at boot. Drops (tag missing from declared entirely) and type changes still + * return false. + * + * <p><strong>Operator caveat:</strong> BanyanDB does NOT physically migrate existing + * rows when a tag's family changes. Pre-existing data for that tag stays in the old + * family's on-disk segment; new writes go to the declared family. Queries that route + * through a new IndexRule on the relocated tag will only see post-reshape rows. + * Operators should expect a backfill window after a storageOnly toggle. + */ private boolean isPurelyAdditiveTagFamilies(List<BanyandbDatabase.TagFamilySpec> declared, List<BanyandbDatabase.TagFamilySpec> live) { - final Map<String, BanyandbDatabase.TagFamilySpec> declaredByName = declared.stream() - .collect(Collectors.toMap(BanyandbDatabase.TagFamilySpec::getName, f -> f, (a, b) -> a)); + // Collapse declared tags across all families: (name -> TagSpec). A tag is allowed + // to move between families, so a per-family lookup would falsely reject the move. + final Map<String, BanyandbDatabase.TagSpec> declaredTagsByName = declared.stream() + .flatMap(f -> f.getTagsList().stream()) + .collect(Collectors.toMap(BanyandbDatabase.TagSpec::getName, t -> t, (a, b) -> a)); for (BanyandbDatabase.TagFamilySpec liveFamily : live) { - BanyandbDatabase.TagFamilySpec declaredFamily = declaredByName.get(liveFamily.getName()); - if (declaredFamily == null) { - return false; - } - final Map<String, BanyandbDatabase.TagSpec> declaredTags = declaredFamily.getTagsList().stream() - .collect(Collectors.toMap(BanyandbDatabase.TagSpec::getName, t -> t, (a, b) -> a)); for (BanyandbDatabase.TagSpec liveTag : liveFamily.getTagsList()) { - BanyandbDatabase.TagSpec declaredTag = declaredTags.get(liveTag.getName()); + BanyandbDatabase.TagSpec declaredTag = declaredTagsByName.get(liveTag.getName()); if (declaredTag == null) { + // Tag dropped entirely from the declared model — non-additive. return false; } if (declaredTag.getType() != liveTag.getType()) { + // Type changed — non-additive, requires drop+recreate. return false; } } @@ -952,7 +1023,12 @@ public class BanyanDBIndexInstaller extends ModelInstaller { return true; } - private void checkTrace(Trace trace, BanyanDBClient client, StorageManipulationOpt opt) throws BanyanDBException { + /** + * @return {@code true} when the live trace is now aligned with the declared shape; + * {@code false} on {@link StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}. + * See {@link #checkMeasure} for the dependent-resource gating rationale. + */ + private boolean checkTrace(Trace trace, BanyanDBClient client, StorageManipulationOpt opt) throws BanyanDBException { Trace hisTrace = client.findTrace(trace.getMetadata().getGroup(), trace.getMetadata().getName()); if (hisTrace == null) { throw new IllegalStateException("Trace: " + trace.getMetadata().getName() + " exist but can't find it from BanyanDB server"); @@ -972,12 +1048,13 @@ public class BanyanDBIndexInstaller extends ModelInstaller { opt.recordOutcome("trace", hisTrace.getMetadata().getName(), StorageManipulationOpt.Outcome.SKIPPED_SHAPE_MISMATCH, "backend shape differs from declared shape; use /runtime/rule/addOrUpdate to reshape"); - return; + return false; } opt.recordModRevision(client.update(trace)); log.info("update Trace: {} from: {} to: {}", hisTrace.getMetadata().getName(), hisTrace, trace); } } + return true; } /** diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java index 58e19d4f0a..da06f7cf82 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java @@ -608,6 +608,143 @@ public class BanyanDBIT { "expected SKIPPED_SHAPE_MISMATCH for non-additive (type-change) diff even with opt-in, got " + bootOpt.getOutcomes()); } + /** + * Toggling {@code storageOnly} on an existing {@code @Column} moves the tag from + * {@code storage-only} → {@code searchable} (or vice versa). Although the live tag + * family no longer contains the tag at its old position, the tag identity + type are + * preserved, so {@link BanyanDBIndexInstaller#isPurelyAdditiveStream} (via + * {@code isPurelyAdditiveTagFamilies}) should accept the relocation when + * {@code allowBootReshape = true} and the OAP is in the init / standalone path. The + * dependent IndexRule for the now-indexed tag should also be created. + */ + @Test + public void testStreamStorageOnlyTogglePathBootReshape() throws Exception { + DownSamplingConfigService downSamplingConfigService = new DownSamplingConfigService(Arrays.asList("minute")); + ModuleManager moduleManager = mock(ModuleManager.class); + ModuleProviderHolder moduleProviderHolder = mock(ModuleProviderHolder.class); + ModuleServiceHolder moduleServiceHolder = mock(ModuleServiceHolder.class); + when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder); + when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder); + when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService); + + StorageModels models = new StorageModels(); + Model baseModel = models.add(TestStreamStorageOnly.class, DefaultScopeDefine.SERVICE, + new Storage("relocStream", true, DownSampling.Second), + StorageManipulationOpt.withSchemaChange()); + BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, moduleManager, config); + installer.isExists(baseModel, StorageManipulationOpt.withSchemaChange()); + installer.createTable(baseModel); + + String groupName = MetadataRegistry.convertGroupName( + config.getGlobal().getNamespace(), BanyanDB.StreamGroup.RECORDS_LOG.getName()); + BanyandbDatabase.Stream initial = client.client.findStream(groupName, "relocStream"); + // payload starts in storage-only family + assertTrue(initial.getTagFamiliesList().stream() + .filter(f -> "storage-only".equals(f.getName())) + .flatMap(f -> f.getTagsList().stream()) + .anyMatch(t -> "payload".equals(t.getName())), + "expected payload tag in storage-only family initially, got " + initial); + + models.remove(TestStreamStorageOnly.class, StorageManipulationOpt.withSchemaChange()); + Model reshapedModel = models.add(TestStreamStorageOnlyOff.class, DefaultScopeDefine.SERVICE, + new Storage("relocStream", true, DownSampling.Second), + StorageManipulationOpt.withSchemaChange()); + assertTrue(reshapedModel.isAllowBootReshape()); + + StorageManipulationOpt bootOpt = StorageManipulationOpt.schemaCreateIfAbsent(); + new BanyanDBIndexInstaller(client, moduleManager, config).isExists(reshapedModel, bootOpt); + + BanyandbDatabase.Stream reshaped = client.client.findStream(groupName, "relocStream"); + // payload is now in searchable family + assertTrue(reshaped.getTagFamiliesList().stream() + .filter(f -> "searchable".equals(f.getName())) + .flatMap(f -> f.getTagsList().stream()) + .anyMatch(t -> "payload".equals(t.getName())), + "expected payload tag relocated to searchable family after reshape, got " + reshaped); + assertFalse(reshaped.getTagFamiliesList().stream() + .filter(f -> "storage-only".equals(f.getName())) + .flatMap(f -> f.getTagsList().stream()) + .anyMatch(t -> "payload".equals(t.getName())), + "expected payload tag no longer in storage-only family after reshape, got " + reshaped); + + boolean updatedRecorded = bootOpt.getOutcomes().stream() + .anyMatch(o -> "stream".equals(o.getResourceType()) + && "relocStream".equals(o.getResourceName()) + && o.getStatus() == StorageManipulationOpt.Outcome.UPDATED); + assertTrue(updatedRecorded, "expected UPDATED outcome for storageOnly relocation, got " + bootOpt.getOutcomes()); + } + + /** + * Initial state for {@link #testStreamStorageOnlyTogglePathBootReshape}: {@code payload} + * declared with {@code storageOnly = true}, so it lands in the {@code storage-only} + * tag family. + */ + @Stream(name = "relocStream", scopeId = DefaultScopeDefine.SERVICE, + builder = TestStreamStorageOnly.Builder.class, processor = RecordStreamProcessor.class) + @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG) + @BanyanDB.TimestampColumn("timestamp") + private static class TestStreamStorageOnly extends Record { + @Column(name = "service_id") + @BanyanDB.SeriesID(index = 0) + private String serviceId; + @Column(name = "payload", storageOnly = true) + private String payload; + @Column(name = "timestamp") + private long timestamp; + + @Override + public StorageID id() { + return new StorageID(); + } + + static class Builder implements StorageBuilder<StorageData> { + @Override + public StorageData storage2Entity(final Convert2Entity converter) { + return null; + } + + @Override + public void entity2Storage(final StorageData entity, final Convert2Storage converter) { + } + } + } + + /** + * Reshape target: same {@code payload} column, but {@code storageOnly} is gone so the + * tag relocates to the {@code searchable} family. Opted in via + * {@code allowBootReshape = true}. + */ + @Stream(name = "relocStream", scopeId = DefaultScopeDefine.SERVICE, + builder = TestStreamStorageOnlyOff.Builder.class, processor = RecordStreamProcessor.class, + allowBootReshape = true) + @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG) + @BanyanDB.TimestampColumn("timestamp") + private static class TestStreamStorageOnlyOff extends Record { + @Column(name = "service_id") + @BanyanDB.SeriesID(index = 0) + private String serviceId; + @Column(name = "payload") + private String payload; + @Column(name = "timestamp") + private long timestamp; + + @Override + public StorageID id() { + return new StorageID(); + } + + static class Builder implements StorageBuilder<StorageData> { + @Override + public StorageData storage2Entity(final Convert2Entity converter) { + return null; + } + + @Override + public void entity2Storage(final StorageData entity, final Convert2Storage converter) { + } + } + } + /** * Non-additive variant: {@code tag} is now {@code long} (TAG_TYPE_INT) where the live * stream has it as {@code String} (TAG_TYPE_STRING). Boot must refuse to reshape even
