This is an automated email from the ASF dual-hosted git repository.
wankai123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 5e9db8fba4 Add `@Stream(allowBootReshape = true)` opt-in for additive
boot-time reshape of BanyanDB streams / measures. (#13878)
5e9db8fba4 is described below
commit 5e9db8fba4c6b11620137afcbf9ccbd788c1214d
Author: Wan Kai <[email protected]>
AuthorDate: Thu May 21 14:23:51 2026 +0800
Add `@Stream(allowBootReshape = true)` opt-in for additive boot-time
reshape of BanyanDB streams / measures. (#13878)
---
CLAUDE.md | 4 +
NOTICE | 2 +-
dist-material/release-docs/NOTICE | 2 +-
docs/en/changes/changes.md | 1 +
.../runtime-rule-hot-update.md | 21 +
.../oap/server/core/alarm/AlarmRecord.java | 2 +-
.../oap/server/core/analysis/Stream.java | 26 +
.../oap/server/core/storage/model/Model.java | 10 +
.../server/core/storage/model/StorageModels.java | 5 +
.../plugin/banyandb/BanyanDBIndexInstaller.java | 133 +++-
.../server/storage/plugin/banyandb/BanyanDBIT.java | 882 +++++++++++++++++++++
.../src/test/resources/bydb.yml | 4 +
.../elasticsearch/base/TimeSeriesUtilsTest.java | 6 +-
.../jdbc/common/dao/JDBCHistoryDeleteDAOIT.java | 2 +-
14 files changed, 1087 insertions(+), 13 deletions(-)
diff --git a/CLAUDE.md b/CLAUDE.md
index dacd9a41ba..1608fda657 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -246,6 +246,10 @@ Always use `--recurse-submodules` when cloning or update
submodules manually.
- `changes/changes.md` - Changelog (update when making changes)
- `swip/` - SkyWalking Improvement Proposals
+## Handling Existing Tests
+
+If existing test code (UT, IT, or E2E) fails to pass, try to fix it. If any
test cases need to be modified, or test code needs to be deleted, the change
must be double-checked by a human before it is applied — surface the proposed
modification or deletion and the reasoning, and wait for explicit approval
rather than changing or removing the test yourself.
+
## Submitting Pull Requests
Use the `/gh-pull-request` skill for committing and pushing to a PR branch. It
runs pre-flight checks (compile, checkstyle, license headers) before every
push, and creates the PR if one doesn't exist yet.
diff --git a/NOTICE b/NOTICE
index 7e684bf85d..0fef61473b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
Apache SkyWalking
-Copyright 2017-2024 The Apache Software Foundation
+Copyright 2017-2026 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/dist-material/release-docs/NOTICE
b/dist-material/release-docs/NOTICE
index 6c5c72afe1..90fdf3a8b5 100755
--- a/dist-material/release-docs/NOTICE
+++ b/dist-material/release-docs/NOTICE
@@ -1,5 +1,5 @@
Apache SkyWalking
-Copyright 2017-2024 The Apache Software Foundation
+Copyright 2017-2026 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index ddf89306e5..322ada7e4d 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -279,6 +279,7 @@
* LAL: support full arithmetic (`+`, `-`, `*`, `/`) on numeric operands and
fix the original bug where `(tag("x") as Integer) + (tag("y") as Integer)` was
treated as string concatenation — expressions like `input_tokens +
output_tokens < 10000` produced the concatenated string `"2589115"` rather than
the integer sum `2704`, so token-threshold conditions never triggered `abort
{}`. Operand types are now inferred from explicit casts (`as Integer` / `as
Long` / `as Float` / `as Double`), ty [...]
* Fix: `avgHistogramPercentile` / `sumHistogramPercentile` meter functions
reported the smallest finite bucket boundary (e.g. `10` for OTel
`gen_ai_server_request_duration` whose `le` is rewritten from `0.01s` → `10ms`)
for every rank when no samples were observed in any bucket. The percentile
loop's `count >= roof` check matched on the first sorted bucket because both
sides were `0`. `calculate()` now short-circuits to `0` for every rank when the
windowed total is `0`.
* Fix: MAL `expPrefix` now applies to every metric source in `exp`, not just
the leading one. Previously the prefix was spliced after the first `.`, so
secondary metrics inside arguments (e.g. the divisor in
`a.sum(['s']).safeDiv(b.sum(['s']))`) silently skipped the prefix — a rule like
envoy-ai-gateway's `request_latency_avg` (`sum / count`) would tag-rewrite only
the dividend. The injection is now AST-aware: every bare-IDENTIFIER metric
source is wrapped, while downsampling-type consta [...]
+* Add `@Stream(allowBootReshape = true)` opt-in for additive boot-time reshape
of BanyanDB streams / measures. Code-defined stream classes (e.g.
`AlarmRecord`) can now annotate their schema as eligible for in-place additive
update at OAP boot — a new `@Column` is appended to the live tag-family /
fields via `client.update` instead of being silently rejected with
`SKIPPED_SHAPE_MISMATCH` (which previously forced operators to drop the measure
/ stream and lose historical rows). The opt-in [...]
#### UI
* Add mobile menu icon and i18n labels for the iOS layer.
diff --git a/docs/en/concepts-and-designs/runtime-rule-hot-update.md
b/docs/en/concepts-and-designs/runtime-rule-hot-update.md
index f1901cc6e9..d20fc5ed86 100644
--- a/docs/en/concepts-and-designs/runtime-rule-hot-update.md
+++ b/docs/en/concepts-and-designs/runtime-rule-hot-update.md
@@ -160,6 +160,27 @@ explicit ERROR, boot always continues, the affected metric
is disabled, and the
operator reconciles explicitly through the on-demand workflow. The same shape
is
visible across every backend.
+### Code-defined stream opt-in (narrow exception)
+
+The "boot never reshapes" rule above applies to **runtime-rule (MAL / LAL)**
+registration — those rules ride the `/addOrUpdate` REST workflow when their
+backing schema needs to change.
+
+Streams whose schema lives in OAP source code (e.g. `AlarmRecord`) can opt in
+to **additive** boot-time reshape via
+`@Stream(allowBootReshape = true)`. When the flag is on and the diff is
+purely additive (new tag / new field; no type changes, no drops, no entity /
+interval / sharding-key flips), the installer calls `client.update` at boot
+to append the new column to the live measure / stream; non-additive
+divergences still record `SKIPPED_SHAPE_MISMATCH` and require an operator
+drop+recreate. Only the init / standalone OAP performs the reshape; non-init
+peers continue through the existing poll-and-wait loop so a single node
+drives DDL during a rolling restart.
+
+This opt-in is **BanyanDB-only**. JDBC and Elasticsearch are append-only on
+the data path and already accept additive column / mapping additions at boot
+without operator intervention, so the flag is unread on those backends.
+
## On-demand workflow
Triggered by an HTTP call to one of the admin endpoints. A request arriving at
any
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
index eafde843a7..a5a1e474b3 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
@@ -42,7 +42,7 @@ import static
org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.AL
@Getter
@Setter
@ScopeDeclaration(id = ALARM, name = "Alarm")
-@Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM,
builder = AlarmRecord.Builder.class, processor = RecordStreamProcessor.class)
+@Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM,
builder = AlarmRecord.Builder.class, processor = RecordStreamProcessor.class,
allowBootReshape = true)
@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable =
AlarmRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
@BanyanDB.TimestampColumn(AlarmRecord.START_TIME)
@BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java
index acab363060..ab00568eb0 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java
@@ -27,6 +27,8 @@ import
org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamProcessor
import
org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import
org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
+import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import
org.apache.skywalking.oap.server.core.storage.model.StorageManipulationOpt;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
/**
@@ -59,4 +61,28 @@ public @interface Stream {
* TopNStreamProcessor} and {@link NoneStreamProcessor} for more details.
*/
Class<? extends StreamProcessor> processor();
+
+ /**
+ * Opt-in to additive boot-time reshape of the backend resource for this
stream class.
+ * <strong>BanyanDB only</strong> — JDBC and Elasticsearch are append-only
on the data
+ * path and don't refuse additive column / mapping additions at boot, so
they don't
+ * need (and don't read) this flag.
+ *
+ * <p>Default is {@code false} — boot follows the standard {@code
schemaCreateIfAbsent}
+ * policy and records {@link
StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH} on
+ * any shape divergence, leaving reconciliation to the operator (the only
workflow that
+ * may change backend schema).
+ *
+ * <p>When set to {@code true}, the BanyanDB installer is allowed to apply
+ * <strong>purely additive</strong> changes (new tag, new field) during
boot via
+ * {@code client.update}. Type changes, drops, kind flips, entity /
interval /
+ * sharding-key changes are still refused with {@code
SKIPPED_SHAPE_MISMATCH} and
+ * require a manual drop+recreate (identity-breaking edits are explicit
operator
+ * actions, not boot side-effects).
+ *
+ * <p>Only the init / standalone OAP performs the reshape; non-init peers
continue
+ * through the existing poll-and-wait loop in {@link
ModelInstaller#whenCreating} so a
+ * single node drives DDL during a rolling restart.
+ */
+ boolean allowBootReshape() default false;
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index 5ceb6294d2..ed139cc418 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -41,6 +41,16 @@ public class Model {
private final boolean superDataset;
private final Class<?> streamClass;
private final boolean timeRelativeID;
+ /**
+ * BanyanDB only — when true, the BanyanDB installer is allowed to apply
purely
+ * additive shape changes (new tag / new field) at boot. See
+ * {@link
org.apache.skywalking.oap.server.core.analysis.Stream#allowBootReshape()}.
+ * JDBC and Elasticsearch ignore this flag (append-only data paths already
accept
+ * additive column / mapping additions without operator intervention).
Defaults to
+ * false for models registered without a {@code @Stream} annotation (e.g.
runtime-rule
+ * MAL / LAL metrics, which reshape through the runtime-rule REST path
instead).
+ */
+ private final boolean allowBootReshape;
private final SQLDatabaseModelExtension sqlDBModelExtension;
private final BanyanDBModelExtension banyanDBModelExtension;
private final ElasticSearchModelExtension elasticSearchModelExtension;
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 558b47378f..72e8ce57e3 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.storage.model;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
@@ -169,6 +170,9 @@ public class StorageModels implements IModelManager,
ModelRegistry, ModelManipul
seriesIDChecker.check(storage.getModelName());
shardingKeyChecker.check(storage.getModelName());
+ final Stream streamAnnotation = aClass.getAnnotation(Stream.class);
+ final boolean allowBootReshape = streamAnnotation != null &&
streamAnnotation.allowBootReshape();
+
Model model = new Model(
storage.getModelName(),
modelColumns,
@@ -177,6 +181,7 @@ public class StorageModels implements IModelManager,
ModelRegistry, ModelManipul
isSuperDatasetModel(aClass),
aClass,
storage.isTimeRelativeID(),
+ allowBootReshape,
sqlDBModelExtension,
banyanDBModelExtension,
elasticSearchModelExtension
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index abf4f288e3..092214919e 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -172,7 +172,7 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
StreamModel streamModel =
MetadataRegistry.INSTANCE.registerStreamModel(
model, config);
if (runShapeChecks) {
- checkStream(streamModel.getStream(), c, opt);
+ checkStream(model, streamModel.getStream(), c,
opt);
checkIndexRules(model.getName(),
streamModel.getIndexRules(), c, opt);
checkIndexRuleBinding(
streamModel.getIndexRules(),
metadata.getGroup(), metadata.name(),
@@ -184,7 +184,7 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
} else { // measure
MeasureModel measureModel =
MetadataRegistry.INSTANCE.registerMeasureModel(model, config,
downSamplingConfigService);
if (runShapeChecks) {
- checkMeasure(measureModel.getMeasure(), c, opt);
+ checkMeasure(model, measureModel.getMeasure(), c,
opt);
checkIndexRules(model.getName(),
measureModel.getIndexRules(), c, opt);
checkIndexRuleBinding(
measureModel.getIndexRules(),
metadata.getGroup(), metadata.name(),
@@ -767,9 +767,16 @@ public class BanyanDBIndexInstaller extends ModelInstaller
{
* either update it (on-demand operator workflow — {@link
StorageManipulationOpt#isWithSchemaChange()})
* or skip the update and record {@link
StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}
* (static boot workflow — {@link
StorageManipulationOpt#isSchemaCreateIfAbsent()}). Boot MUST
- * NOT reshape the backend — reshape is an explicit operator action only.
+ * NOT reshape the backend by default — reshape is an explicit operator
action.
+ *
+ * <p>Exception: when the model opts in via {@link
Model#isAllowBootReshape()} and the diff
+ * is purely additive (new tag / new field, no type changes, no drops,
identity preserved),
+ * the init OAP is allowed to apply the additive update during boot.
Non-init OAPs continue
+ * through the poll-and-wait loop in
+ * {@link
org.apache.skywalking.oap.server.core.storage.model.ModelInstaller#whenCreating}
+ * so only one node races on the DDL.
*/
- private void checkMeasure(Measure measure, BanyanDBClient client,
StorageManipulationOpt opt) throws BanyanDBException {
+ private void checkMeasure(Model model, Measure measure, BanyanDBClient
client, StorageManipulationOpt opt) throws BanyanDBException {
Measure hisMeasure =
client.findMeasure(measure.getMetadata().getGroup(),
measure.getMetadata().getName());
if (hisMeasure == null) {
throw new IllegalStateException("Measure: " +
measure.getMetadata().getName() + " exist but can't find it from BanyanDB
server");
@@ -782,6 +789,15 @@ public class BanyanDBIndexInstaller extends ModelInstaller
{
.equals(measure.toBuilder().clearMetadata().build());
if (!equals) {
if (!opt.getFlags().isUpdateOnMismatch()) {
+ if (canBootReshape(model, opt) &&
isPurelyAdditiveMeasure(measure, hisMeasure)) {
+ opt.recordModRevision(client.update(measure));
+ log.info("boot reshape (additive) Measure: {} —
applied @Stream(allowBootReshape=true). backend={}, declared={}",
+ hisMeasure.getMetadata().getName(), hisMeasure,
measure);
+ opt.recordOutcome("measure",
hisMeasure.getMetadata().getName(),
+ StorageManipulationOpt.Outcome.UPDATED,
+ "additive boot reshape: new tag / field added");
+ return;
+ }
log.error("BanyanDB measure {} shape mismatch at boot —
backend holds a "
+ "different shape than the declared rule. SKIPPING
metric; operator "
+ "must reshape via POST /runtime/rule/addOrUpdate or
align the rule "
@@ -801,9 +817,10 @@ public class BanyanDBIndexInstaller extends ModelInstaller
{
/**
* Check if the stream exists and update (or record shape mismatch) per
mode.
- * See {@link #checkMeasure} for the create-if-absent vs full-install
contract.
+ * See {@link #checkMeasure} for the create-if-absent vs full-install
contract,
+ * including the {@link Model#isAllowBootReshape()} additive opt-in.
*/
- private void checkStream(Stream stream, BanyanDBClient client,
StorageManipulationOpt opt) throws BanyanDBException {
+ private void checkStream(Model model, Stream stream, BanyanDBClient
client, StorageManipulationOpt opt) throws BanyanDBException {
Stream hisStream = client.findStream(stream.getMetadata().getGroup(),
stream.getMetadata().getName());
if (hisStream == null) {
throw new IllegalStateException("Stream: " +
stream.getMetadata().getName() + " exist but can't find it from BanyanDB
server");
@@ -816,6 +833,15 @@ public class BanyanDBIndexInstaller extends ModelInstaller
{
.equals(stream.toBuilder().clearUpdatedAt().clearCreatedAt().clearMetadata().build());
if (!equals) {
if (!opt.getFlags().isUpdateOnMismatch()) {
+ if (canBootReshape(model, opt) &&
isPurelyAdditiveStream(stream, hisStream)) {
+ opt.recordModRevision(client.update(stream));
+ log.info("boot reshape (additive) Stream: {} — applied
@Stream(allowBootReshape=true). backend={}, declared={}",
+ hisStream.getMetadata().getName(), hisStream,
stream);
+ opt.recordOutcome("stream",
hisStream.getMetadata().getName(),
+ StorageManipulationOpt.Outcome.UPDATED,
+ "additive boot reshape: new tag added");
+ return;
+ }
log.error("BanyanDB stream {} shape mismatch at boot —
backend holds a "
+ "different shape than the declared rule. SKIPPING;
operator must "
+ "reshape via POST /runtime/rule/addOrUpdate.
backend={}, declared={}",
@@ -831,6 +857,101 @@ public class BanyanDBIndexInstaller extends
ModelInstaller {
}
}
+ /**
+ * Gate for boot-time reshape: three conditions, all required.
+ * <ul>
+ * <li>The model opts in via {@link Model#isAllowBootReshape()}.</li>
+ * <li>The opt is the static-boot policy ({@link
StorageManipulationOpt#isSchemaCreateIfAbsent()}).
+ * Restricting the reshape branch to this single mode keeps the
policy boundary
+ * explicit — {@code verifySchemaOnly()} must stay read-only even if
a future
+ * caller flips {@code updateOnMismatch} off, and {@code
withSchemaChange()}
+ * already takes the existing on-demand reshape path above.</li>
+ * <li>This OAP must not be in {@code no-init} mode. Non-init OAPs leave
DDL to the
+ * init / standalone OAP and converge via the poll-wait loop in
+ * {@link
org.apache.skywalking.oap.server.core.storage.model.ModelInstaller#whenCreating}.</li>
+ * </ul>
+ */
+ private boolean canBootReshape(Model model, StorageManipulationOpt opt) {
+ return model.isAllowBootReshape()
+ && opt.isSchemaCreateIfAbsent()
+ && !RunningMode.isNoInitMode();
+ }
+
+ /**
+ * Purely-additive diff for a BanyanDB {@link Stream}: declared may add
tag families or
+ * tags, but every tag-family / tag that already exists on the backend
must be present
+ * with the same name and {@link BanyandbDatabase.TagType type}, the
{@link BanyandbDatabase.Entity entity}
+ * column list must match exactly (reshape can't change shard / series-id
semantics),
+ * and no tag may be dropped. Returns false for any non-additive
divergence so the caller
+ * falls back to {@link
StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}.
+ */
+ private boolean isPurelyAdditiveStream(Stream declared, Stream live) {
+ if (!declared.getEntity().equals(live.getEntity())) {
+ return false;
+ }
+ return isPurelyAdditiveTagFamilies(declared.getTagFamiliesList(),
live.getTagFamiliesList());
+ }
+
+ /**
+ * Purely-additive diff for a BanyanDB {@link Measure}: same tag-family
and entity rules as
+ * {@link #isPurelyAdditiveStream}, plus fields may be added but no field
name / type /
+ * encoding / compression may change, and the scalar properties (interval,
index_mode,
+ * sharding_key) must match exactly.
+ */
+ private boolean isPurelyAdditiveMeasure(Measure declared, Measure live) {
+ if (!declared.getEntity().equals(live.getEntity())) {
+ return false;
+ }
+ if (!declared.getInterval().equals(live.getInterval())) {
+ return false;
+ }
+ if (declared.getIndexMode() != live.getIndexMode()) {
+ return false;
+ }
+ if (!declared.getShardingKey().equals(live.getShardingKey())) {
+ return false;
+ }
+ if (!isPurelyAdditiveTagFamilies(declared.getTagFamiliesList(),
live.getTagFamiliesList())) {
+ return false;
+ }
+ final Map<String, BanyandbDatabase.FieldSpec> declaredFields =
declared.getFieldsList().stream()
+ .collect(Collectors.toMap(BanyandbDatabase.FieldSpec::getName, f
-> f, (a, b) -> a));
+ for (BanyandbDatabase.FieldSpec liveField : live.getFieldsList()) {
+ BanyandbDatabase.FieldSpec declaredField =
declaredFields.get(liveField.getName());
+ if (declaredField == null) {
+ return false;
+ }
+ if (!declaredField.equals(liveField)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean
isPurelyAdditiveTagFamilies(List<BanyandbDatabase.TagFamilySpec> declared,
+
List<BanyandbDatabase.TagFamilySpec> live) {
+ final Map<String, BanyandbDatabase.TagFamilySpec> declaredByName =
declared.stream()
+ .collect(Collectors.toMap(BanyandbDatabase.TagFamilySpec::getName,
f -> f, (a, b) -> a));
+ for (BanyandbDatabase.TagFamilySpec liveFamily : live) {
+ BanyandbDatabase.TagFamilySpec declaredFamily =
declaredByName.get(liveFamily.getName());
+ if (declaredFamily == null) {
+ return false;
+ }
+ final Map<String, BanyandbDatabase.TagSpec> declaredTags =
declaredFamily.getTagsList().stream()
+ .collect(Collectors.toMap(BanyandbDatabase.TagSpec::getName, t
-> t, (a, b) -> a));
+ for (BanyandbDatabase.TagSpec liveTag : liveFamily.getTagsList()) {
+ BanyandbDatabase.TagSpec declaredTag =
declaredTags.get(liveTag.getName());
+ if (declaredTag == null) {
+ return false;
+ }
+ if (declaredTag.getType() != liveTag.getType()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
private void checkTrace(Trace trace, BanyanDBClient client,
StorageManipulationOpt opt) throws BanyanDBException {
Trace hisTrace = client.findTrace(trace.getMetadata().getGroup(),
trace.getMetadata().getName());
if (hisTrace == null) {
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
new file mode 100644
index 0000000000..58e19d4f0a
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
@@ -0,0 +1,882 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.library.banyandb.v1.client.DataPoint;
+import org.apache.skywalking.library.banyandb.v1.client.MeasureQuery;
+import org.apache.skywalking.library.banyandb.v1.client.MeasureQueryResponse;
+import org.apache.skywalking.library.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.library.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.library.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import
org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import
org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
+import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageID;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import
org.apache.skywalking.oap.server.core.storage.model.StorageManipulationOpt;
+import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.apache.skywalking.oap.server.library.it.ITVersions;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
+import org.apache.skywalking.oap.server.library.module.ModuleServiceHolder;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.MeasureBulkWriteProcessor;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
+import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.apache.skywalking.oap.server.testing.util.ReflectUtil;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+@Slf4j
+@Testcontainers
+public class BanyanDBIT {
+ private static final String REGISTRY = "ghcr.io";
+ private static final String IMAGE_NAME = "apache/skywalking-banyandb";
+ private static final String TAG = ITVersions.get("SW_BANYANDB_COMMIT");
+
+ private static final String IMAGE = REGISTRY + "/" + IMAGE_NAME + ":" +
TAG;
+ private static MockedStatic<DefaultScopeDefine>
DEFAULT_SCOPE_DEFINE_MOCKED_STATIC;
+ protected static final int GRPC_PORT = 17912;
+ protected static final int HTTP_PORT = 17913;
+
+ @Container
+ public GenericContainer<?> banyanDB = new GenericContainer<>(
+ DockerImageName.parse(IMAGE))
+ .withCommand("standalone", "--stream-root-path",
"/tmp/banyandb-stream-data",
+ "--measure-root-path", "/tmp/banyand-measure-data"
+ )
+ .withExposedPorts(GRPC_PORT, HTTP_PORT)
+ .waitingFor(Wait.forHttp("/api/healthz").forPort(HTTP_PORT));
+
+ private BanyanDBStorageClient client;
+ private BanyanDBStorageConfig config;
+
+ protected void setUpConnection() throws Exception {
+ ModuleManager moduleManager = mock(ModuleManager.class);
+ ModuleDefine storageModule = mock(ModuleDefine.class);
+ BanyanDBStorageProvider provider = mock(BanyanDBStorageProvider.class);
+ Mockito.when(provider.getModule()).thenReturn(storageModule);
+
+ NoneTelemetryProvider telemetryProvider =
mock(NoneTelemetryProvider.class);
+ Mockito.when(telemetryProvider.getService(MetricsCreator.class))
+ .thenReturn(new MetricsCreatorNoop());
+ TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
+ ReflectUtil.setInternalState(telemetryModule, "loadedProvider",
telemetryProvider);
+
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
+ log.info("create BanyanDB client and try to connect");
+ config = new BanyanDBConfigLoader(provider).loadConfig();
+ config.getGlobal().setTargets(banyanDB.getHost() + ":" +
banyanDB.getMappedPort(GRPC_PORT));
+ client = new BanyanDBStorageClient(moduleManager, config);
+ client.connect();
+ }
+
+ private MeasureBulkWriteProcessor processor;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ DEFAULT_SCOPE_DEFINE_MOCKED_STATIC =
mockStatic(DefaultScopeDefine.class);
+ DEFAULT_SCOPE_DEFINE_MOCKED_STATIC.when(() ->
DefaultScopeDefine.nameOf(1)).thenReturn("any");
+ setUpConnection();
+ processor = client.createMeasureBulkProcessor(1000, 1, 1);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (DEFAULT_SCOPE_DEFINE_MOCKED_STATIC != null) {
+ DEFAULT_SCOPE_DEFINE_MOCKED_STATIC.close();
+ DEFAULT_SCOPE_DEFINE_MOCKED_STATIC = null;
+ }
+ }
+
+ @Test
+ public void testInstall() throws Exception {
+ DownSamplingConfigService downSamplingConfigService = new
DownSamplingConfigService(Arrays.asList("minute"));
+ ModuleManager moduleManager = mock(ModuleManager.class);
+ ModuleProviderHolder moduleProviderHolder =
mock(ModuleProviderHolder.class);
+ ModuleServiceHolder moduleServiceHolder =
mock(ModuleServiceHolder.class);
+
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+ when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+ StorageModels models = new StorageModels();
+ Model model = models.add(TestMetric.class, DefaultScopeDefine.SERVICE,
+ new Storage("testMetric", true,
DownSampling.Minute),
+ StorageManipulationOpt.withSchemaChange()
+ );
+ BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client,
moduleManager, config);
+ installer.isExists(model, StorageManipulationOpt.withSchemaChange());
+ //test Group install
+ String groupName = MetadataRegistry.convertGroupName(
+ config.getGlobal().getNamespace(),
+ BanyanDB.MeasureGroup.METRICS_MINUTE.getName()
+ );
+ BanyandbCommon.Group group = client.client.findGroup(groupName);
+ assertEquals(BanyandbCommon.Catalog.CATALOG_MEASURE,
group.getCatalog());
+ assertEquals(config.getMetricsMin().getSegmentInterval(),
group.getResourceOpts().getSegmentInterval().getNum());
+ assertEquals(config.getMetricsMin().getShardNum(),
group.getResourceOpts().getShardNum());
+ assertEquals(BanyandbCommon.IntervalRule.Unit.UNIT_DAY,
group.getResourceOpts().getSegmentInterval().getUnit());
+ assertEquals(config.getMetricsMin().getTtl(),
group.getResourceOpts().getTtl().getNum());
+ assertEquals(BanyandbCommon.IntervalRule.Unit.UNIT_DAY,
group.getResourceOpts().getTtl().getUnit());
+
+ installer.createTable(model);
+ //test Measure install
+ BanyandbDatabase.Measure measure =
client.client.findMeasure(groupName, "testMetric_minute");
+ assertEquals("storage-only", measure.getTagFamilies(0).getName());
+ assertEquals("service_id",
measure.getTagFamilies(0).getTags(0).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
measure.getTagFamilies(0).getTags(0).getType());
+ assertEquals("searchable", measure.getTagFamilies(1).getName());
+ assertEquals("tag", measure.getTagFamilies(1).getTags(0).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
measure.getTagFamilies(1).getTags(0).getType());
+ assertEquals("service_id", measure.getEntity().getTagNames(0));
+ assertEquals("value", measure.getFields(0).getName());
+ assertEquals(BanyandbDatabase.FieldType.FIELD_TYPE_INT,
measure.getFields(0).getFieldType());
+ //test TopNAggregation install
+ BanyandbDatabase.TopNAggregation topNAggregation =
client.client.findTopNAggregation(
+ groupName, "testMetric-service");
+ assertEquals("value", topNAggregation.getFieldName());
+ assertEquals("service_id", topNAggregation.getGroupByTagNames(0));
+ assertEquals(BanyandbModel.Sort.SORT_DESC,
topNAggregation.getFieldValueSort());
+ assertEquals(10, topNAggregation.getLruSize());
+ assertEquals(1000, topNAggregation.getCountersNumber());
+ //test IndexRule install
+ BanyandbDatabase.IndexRule indexRuleTag =
client.client.findIndexRule(groupName, "tag");
+ assertEquals("url", indexRuleTag.getAnalyzer());
+ assertTrue(indexRuleTag.getNoSort());
+ //test IndexRuleBinding install
+ BanyandbDatabase.IndexRuleBinding indexRuleBinding =
client.client.findIndexRuleBinding(
+ groupName, "testMetric_minute");
+ assertEquals("tag", indexRuleBinding.getRules(0));
+ assertEquals("testMetric_minute",
indexRuleBinding.getSubject().getName());
+ //test data query
+ Instant now = Instant.now();
+ Instant begin = now.minus(15, ChronoUnit.MINUTES);
+ MeasureWrite measureWrite = client.createMeasureWrite(groupName,
"testMetric_minute", now.toEpochMilli());
+ measureWrite.tag("storage-only", "service_id",
TagAndValue.stringTagValue("service1"))
+ .tag("searchable", "tag",
TagAndValue.stringTagValue("tag1"))
+ .field("value", TagAndValue.longFieldValue(100));
+ CompletableFuture<Void> f = processor.add(measureWrite);
+ f.exceptionally(exp -> {
+ Assertions.fail(exp.getMessage());
+ return null;
+ });
+ f.get(10, TimeUnit.SECONDS);
+
+ MeasureQuery query = new MeasureQuery(Lists.newArrayList(groupName),
"testMetric_minute",
+ new TimestampRange(
+ begin.toEpochMilli(),
+ now.plus(1,
ChronoUnit.MINUTES).toEpochMilli()
+ ), ImmutableMap.of("service_id",
"storage-only", "tag", "searchable"),
+ ImmutableSet.of("value")
+ );
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ MeasureQueryResponse resp = client.query(query);
+ assertNotNull(resp);
+ assertEquals(1, resp.getDataPoints().size());
+ assertEquals("service1",
resp.getDataPoints().get(0).getTagValue("service_id"));
+ assertEquals("tag1",
resp.getDataPoints().get(0).getTagValue("tag"));
+ assertEquals(100, (Long)
resp.getDataPoints().get(0).getFieldValue("value"));
+ });
+
+ // StorageModels.add now dedupes by (name, downsampling); evict the
original
+ // registration so the UpdateTestMetric registration takes effect.
+ models.remove(TestMetric.class,
StorageManipulationOpt.withSchemaChange());
+ Model updatedModel = models.add(UpdateTestMetric.class,
DefaultScopeDefine.SERVICE,
+ new Storage("testMetric", true,
DownSampling.Minute),
+
StorageManipulationOpt.withSchemaChange()
+ );
+
config.getMetricsMin().setShardNum(config.getMetricsMin().getShardNum() + 1);
+
config.getMetricsMin().setSegmentInterval(config.getMetricsMin().getSegmentInterval()
+ 2);
+ config.getMetricsMin().setTtl(config.getMetricsMin().getTtl() + 3);
+ BanyanDBIndexInstaller newInstaller = new
BanyanDBIndexInstaller(client, moduleManager, config);
+ newInstaller.isExists(updatedModel,
StorageManipulationOpt.withSchemaChange());
+ //test Group update — assert the live group now reflects the values we
set on config,
+ //rather than hard-coding the post-mutation numbers (which would
couple the test to the
+ //defaults shipped in bydb.yml).
+ BanyandbCommon.Group updatedGroup = client.client.findGroup(groupName);
+ assertEquals(config.getMetricsMin().getShardNum(),
updatedGroup.getResourceOpts().getShardNum());
+ assertEquals(config.getMetricsMin().getSegmentInterval(),
updatedGroup.getResourceOpts().getSegmentInterval().getNum());
+ assertEquals(config.getMetricsMin().getTtl(),
updatedGroup.getResourceOpts().getTtl().getNum());
+ //test Measure update
+ BanyandbDatabase.Measure updatedMeasure =
client.client.findMeasure(groupName, "testMetric_minute");
+ assertEquals("storage-only",
updatedMeasure.getTagFamilies(0).getName());
+ assertEquals("service_id",
updatedMeasure.getTagFamilies(0).getTags(0).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
updatedMeasure.getTagFamilies(0).getTags(0).getType());
+ assertEquals("searchable", updatedMeasure.getTagFamilies(1).getName());
+ assertEquals("tag",
updatedMeasure.getTagFamilies(1).getTags(0).getName());
+ assertEquals("new_tag",
updatedMeasure.getTagFamilies(1).getTags(1).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
updatedMeasure.getTagFamilies(1).getTags(0).getType());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
updatedMeasure.getTagFamilies(1).getTags(1).getType());
+ assertEquals("service_id", updatedMeasure.getEntity().getTagNames(0));
+ assertEquals("value", updatedMeasure.getFields(0).getName());
+ assertEquals(BanyandbDatabase.FieldType.FIELD_TYPE_INT,
updatedMeasure.getFields(0).getFieldType());
+ assertEquals("new_value", updatedMeasure.getFields(1).getName());
+ assertEquals(BanyandbDatabase.FieldType.FIELD_TYPE_INT,
updatedMeasure.getFields(1).getFieldType());
+ //test IndexRule update
+ BanyandbDatabase.IndexRule updatedIndexRuleTag =
client.client.findIndexRule(groupName, "tag");
+ assertEquals("", updatedIndexRuleTag.getAnalyzer());
+ assertFalse(updatedIndexRuleTag.getNoSort());
+ BanyandbDatabase.IndexRule updatedIndexRuleNewTag =
client.client.findIndexRule(groupName, "new_tag");
+ assertTrue(updatedIndexRuleNewTag.getNoSort());
+ //test IndexRuleBinding update
+ BanyandbDatabase.IndexRuleBinding updatedIndexRuleBinding =
client.client.findIndexRuleBinding(
+ groupName, "testMetric_minute");
+ assertEquals("tag", updatedIndexRuleBinding.getRules(0));
+ assertEquals("new_tag", updatedIndexRuleBinding.getRules(1));
+ assertEquals("testMetric_minute",
updatedIndexRuleBinding.getSubject().getName());
+ //test data
+ MeasureWrite updatedMeasureWrite =
client.createMeasureWrite(groupName, "testMetric_minute", now.plus(10,
ChronoUnit.MINUTES).toEpochMilli());
+ updatedMeasureWrite.tag("storage-only", "service_id",
TagAndValue.stringTagValue("service2"))
+ .tag("searchable", "tag",
TagAndValue.stringTagValue("tag1"))
+ .tag("searchable", "new_tag",
TagAndValue.stringTagValue("new_tag1"))
+ .field("value", TagAndValue.longFieldValue(101))
+ .field("new_value",
TagAndValue.longFieldValue(1000));
+ CompletableFuture<Void> cf = processor.add(updatedMeasureWrite);
+ cf.exceptionally(exp -> {
+ Assertions.fail(exp.getMessage());
+ return null;
+ });
+ cf.get(10, TimeUnit.SECONDS);
+ MeasureQuery updatedQuery = new MeasureQuery(
+ Lists.newArrayList(groupName), "testMetric_minute",
+ new TimestampRange(begin.toEpochMilli(), now.plus(15,
ChronoUnit.MINUTES).toEpochMilli()),
+ ImmutableMap.of("service_id", "storage-only", "tag", "searchable",
"new_tag", "searchable"),
+ ImmutableSet.of("value",
"new_value")
+ );
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ MeasureQueryResponse updatedResp = client.query(updatedQuery);
+ assertNotNull(updatedResp);
+ assertEquals(2, updatedResp.getDataPoints().size());
+ // Index by service_id so the assertions don't depend on
server-side ordering
+ // (MeasureQuery doesn't set an orderBy and BanyanDB result order
is not contractually stable).
+ Map<String, DataPoint> byService =
updatedResp.getDataPoints().stream()
+ .collect(Collectors.toMap(dp -> (String)
dp.getTagValue("service_id"), dp -> dp));
+ DataPoint dp1 = byService.get("service1");
+ assertNotNull(dp1);
+ assertEquals("tag1", dp1.getTagValue("tag"));
+ assertEquals(100, (Long) dp1.getFieldValue("value"));
+ DataPoint dp2 = byService.get("service2");
+ assertNotNull(dp2);
+ assertEquals("tag1", dp2.getTagValue("tag"));
+ assertEquals("new_tag1", dp2.getTagValue("new_tag"));
+ assertEquals(101, (Long) dp2.getFieldValue("value"));
+ assertEquals(1000, (Long) dp2.getFieldValue("new_value"));
+ });
+ }
+
+ @Test
+ public void testStreamInstallAndUpdate() throws Exception {
+ DownSamplingConfigService downSamplingConfigService = new
DownSamplingConfigService(Arrays.asList("minute"));
+ ModuleManager moduleManager = mock(ModuleManager.class);
+ ModuleProviderHolder moduleProviderHolder =
mock(ModuleProviderHolder.class);
+ ModuleServiceHolder moduleServiceHolder =
mock(ModuleServiceHolder.class);
+
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+ when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+ StorageModels models = new StorageModels();
+ Model model = models.add(TestStream.class, DefaultScopeDefine.SERVICE,
+ new Storage("testStream", true,
DownSampling.Second),
+ StorageManipulationOpt.withSchemaChange()
+ );
+ BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client,
moduleManager, config);
+ installer.isExists(model, StorageManipulationOpt.withSchemaChange());
+ // test Group install
+ String groupName = MetadataRegistry.convertGroupName(
+ config.getGlobal().getNamespace(),
+ BanyanDB.StreamGroup.RECORDS_LOG.getName()
+ );
+ BanyandbCommon.Group group = client.client.findGroup(groupName);
+ assertEquals(BanyandbCommon.Catalog.CATALOG_STREAM,
group.getCatalog());
+ assertEquals(config.getRecordsLog().getSegmentInterval(),
group.getResourceOpts().getSegmentInterval().getNum());
+ assertEquals(config.getRecordsLog().getShardNum(),
group.getResourceOpts().getShardNum());
+ assertEquals(BanyandbCommon.IntervalRule.Unit.UNIT_DAY,
group.getResourceOpts().getSegmentInterval().getUnit());
+ assertEquals(config.getRecordsLog().getTtl(),
group.getResourceOpts().getTtl().getNum());
+ assertEquals(BanyandbCommon.IntervalRule.Unit.UNIT_DAY,
group.getResourceOpts().getTtl().getUnit());
+
+ installer.createTable(model);
+ // test Stream install
+ BanyandbDatabase.Stream stream = client.client.findStream(groupName,
"testStream");
+ assertEquals("storage-only", stream.getTagFamilies(0).getName());
+ assertEquals("service_id",
stream.getTagFamilies(0).getTags(0).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
stream.getTagFamilies(0).getTags(0).getType());
+ assertEquals("timestamp",
stream.getTagFamilies(0).getTags(1).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_INT,
stream.getTagFamilies(0).getTags(1).getType());
+ assertEquals("searchable", stream.getTagFamilies(1).getName());
+ assertEquals("tag", stream.getTagFamilies(1).getTags(0).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
stream.getTagFamilies(1).getTags(0).getType());
+ assertEquals("service_id", stream.getEntity().getTagNames(0));
+ // test IndexRule install
+ BanyandbDatabase.IndexRule indexRuleTag =
client.client.findIndexRule(groupName, "tag");
+ assertEquals("url", indexRuleTag.getAnalyzer());
+ assertTrue(indexRuleTag.getNoSort());
+ // test IndexRuleBinding install
+ BanyandbDatabase.IndexRuleBinding indexRuleBinding =
client.client.findIndexRuleBinding(
+ groupName, "testStream");
+ assertEquals("tag", indexRuleBinding.getRules(0));
+ assertEquals("testStream", indexRuleBinding.getSubject().getName());
+
+ // StorageModels.add now dedupes by (name, downsampling); evict the
original
+ // registration so the UpdateTestStream registration takes effect.
+ models.remove(TestStream.class,
StorageManipulationOpt.withSchemaChange());
+ Model updatedModel = models.add(UpdateTestStream.class,
DefaultScopeDefine.SERVICE,
+ new Storage("testStream", true,
DownSampling.Second),
+
StorageManipulationOpt.withSchemaChange()
+ );
+ BanyanDBIndexInstaller newInstaller = new
BanyanDBIndexInstaller(client, moduleManager, config);
+ newInstaller.isExists(updatedModel,
StorageManipulationOpt.withSchemaChange());
+ // test Stream update
+ BanyandbDatabase.Stream updatedStream =
client.client.findStream(groupName, "testStream");
+ assertEquals("storage-only",
updatedStream.getTagFamilies(0).getName());
+ assertEquals("service_id",
updatedStream.getTagFamilies(0).getTags(0).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
updatedStream.getTagFamilies(0).getTags(0).getType());
+ assertEquals("timestamp",
updatedStream.getTagFamilies(0).getTags(1).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_INT,
updatedStream.getTagFamilies(0).getTags(1).getType());
+ assertEquals("searchable", updatedStream.getTagFamilies(1).getName());
+ assertEquals("tag",
updatedStream.getTagFamilies(1).getTags(0).getName());
+ assertEquals("new_tag",
updatedStream.getTagFamilies(1).getTags(1).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
updatedStream.getTagFamilies(1).getTags(0).getType());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
updatedStream.getTagFamilies(1).getTags(1).getType());
+ assertEquals("service_id", updatedStream.getEntity().getTagNames(0));
+ // test IndexRule update
+ BanyandbDatabase.IndexRule updatedIndexRuleTag =
client.client.findIndexRule(groupName, "tag");
+ assertEquals("", updatedIndexRuleTag.getAnalyzer());
+ assertFalse(updatedIndexRuleTag.getNoSort());
+ BanyandbDatabase.IndexRule updatedIndexRuleNewTag =
client.client.findIndexRule(groupName, "new_tag");
+ assertTrue(updatedIndexRuleNewTag.getNoSort());
+ // test IndexRuleBinding update
+ BanyandbDatabase.IndexRuleBinding updatedIndexRuleBinding =
client.client.findIndexRuleBinding(
+ groupName, "testStream");
+ assertEquals("tag", updatedIndexRuleBinding.getRules(0));
+ assertEquals("new_tag", updatedIndexRuleBinding.getRules(1));
+ assertEquals("testStream",
updatedIndexRuleBinding.getSubject().getName());
+ }
+
+ /**
+ * Drive the allowBootReshape path: install {@link TestStream} first under
+ * {@link StorageManipulationOpt#withSchemaChange()} so the stream lives
on the backend
+ * with the original shape, then re-install a class that adds {@code
new_tag} and opts in
+ * via {@code allowBootReshape = true} under {@link
StorageManipulationOpt#schemaCreateIfAbsent()}.
+ * The installer should apply the additive update during boot rather than
recording
+ * {@link StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}.
+ */
+ @Test
+ public void testStreamAdditiveBootReshape() throws Exception {
+ DownSamplingConfigService downSamplingConfigService = new
DownSamplingConfigService(Arrays.asList("minute"));
+ ModuleManager moduleManager = mock(ModuleManager.class);
+ ModuleProviderHolder moduleProviderHolder =
mock(ModuleProviderHolder.class);
+ ModuleServiceHolder moduleServiceHolder =
mock(ModuleServiceHolder.class);
+
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+ when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+ StorageModels models = new StorageModels();
+ Model baseModel = models.add(TestStream.class,
DefaultScopeDefine.SERVICE,
+ new Storage("testStream", true,
DownSampling.Second),
+
StorageManipulationOpt.withSchemaChange());
+ BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client,
moduleManager, config);
+ installer.isExists(baseModel,
StorageManipulationOpt.withSchemaChange());
+ installer.createTable(baseModel);
+
+ String groupName = MetadataRegistry.convertGroupName(
+ config.getGlobal().getNamespace(),
BanyanDB.StreamGroup.RECORDS_LOG.getName());
+ BanyandbDatabase.Stream initial = client.client.findStream(groupName,
"testStream");
+ assertEquals(1, initial.getTagFamilies(1).getTagsCount());
+
+ models.remove(TestStream.class,
StorageManipulationOpt.withSchemaChange());
+ Model reshapedModel = models.add(TestStreamAdditiveReshapeOn.class,
DefaultScopeDefine.SERVICE,
+ new Storage("testStream", true,
DownSampling.Second),
+
StorageManipulationOpt.withSchemaChange());
+ assertTrue(reshapedModel.isAllowBootReshape());
+
+ StorageManipulationOpt bootOpt =
StorageManipulationOpt.schemaCreateIfAbsent();
+ new BanyanDBIndexInstaller(client, moduleManager,
config).isExists(reshapedModel, bootOpt);
+
+ BanyandbDatabase.Stream reshaped = client.client.findStream(groupName,
"testStream");
+ assertEquals(2, reshaped.getTagFamilies(1).getTagsCount());
+ assertEquals("new_tag",
reshaped.getTagFamilies(1).getTags(1).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
reshaped.getTagFamilies(1).getTags(1).getType());
+
+ boolean updatedRecorded = bootOpt.getOutcomes().stream()
+ .anyMatch(o -> "stream".equals(o.getResourceType())
+ && "testStream".equals(o.getResourceName())
+ && o.getStatus() == StorageManipulationOpt.Outcome.UPDATED);
+ assertTrue(updatedRecorded, "expected UPDATED outcome for additive
boot reshape, got " + bootOpt.getOutcomes());
+ }
+
+ /**
+ * Same setup as {@link #testStreamAdditiveBootReshape} but the re-install
class leaves
+ * {@code allowBootReshape} at its default ({@code false}). Boot must
refuse to reshape
+ * even though the diff is additive — record {@link
StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}
+ * and leave the live stream untouched.
+ */
+ @Test
+ public void testStreamAdditiveBootReshape_optOutSkips() throws Exception {
+ DownSamplingConfigService downSamplingConfigService = new
DownSamplingConfigService(Arrays.asList("minute"));
+ ModuleManager moduleManager = mock(ModuleManager.class);
+ ModuleProviderHolder moduleProviderHolder =
mock(ModuleProviderHolder.class);
+ ModuleServiceHolder moduleServiceHolder =
mock(ModuleServiceHolder.class);
+
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+ when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+ StorageModels models = new StorageModels();
+ Model baseModel = models.add(TestStream.class,
DefaultScopeDefine.SERVICE,
+ new Storage("testStream", true,
DownSampling.Second),
+
StorageManipulationOpt.withSchemaChange());
+ BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client,
moduleManager, config);
+ installer.isExists(baseModel,
StorageManipulationOpt.withSchemaChange());
+ installer.createTable(baseModel);
+
+ models.remove(TestStream.class,
StorageManipulationOpt.withSchemaChange());
+ Model reshapedModel = models.add(TestStreamAdditiveReshapeOff.class,
DefaultScopeDefine.SERVICE,
+ new Storage("testStream", true,
DownSampling.Second),
+
StorageManipulationOpt.withSchemaChange());
+ assertFalse(reshapedModel.isAllowBootReshape());
+
+ StorageManipulationOpt bootOpt =
StorageManipulationOpt.schemaCreateIfAbsent();
+ new BanyanDBIndexInstaller(client, moduleManager,
config).isExists(reshapedModel, bootOpt);
+
+ String groupName = MetadataRegistry.convertGroupName(
+ config.getGlobal().getNamespace(),
BanyanDB.StreamGroup.RECORDS_LOG.getName());
+ BanyandbDatabase.Stream live = client.client.findStream(groupName,
"testStream");
+ assertEquals(1, live.getTagFamilies(1).getTagsCount());
+
+ boolean skipRecorded = bootOpt.getOutcomes().stream()
+ .anyMatch(o -> "stream".equals(o.getResourceType())
+ && "testStream".equals(o.getResourceName())
+ && o.getStatus() ==
StorageManipulationOpt.Outcome.SKIPPED_SHAPE_MISMATCH);
+ assertTrue(skipRecorded, "expected SKIPPED_SHAPE_MISMATCH without
allowBootReshape opt-in, got " + bootOpt.getOutcomes());
+ }
+
+ /**
+ * Measure variant of {@link #testStreamAdditiveBootReshape}: install
{@link TestMetric},
+ * then re-install {@link TestMetricAdditiveReshapeOn} which adds {@code
new_tag} and
+ * {@code new_value} (a field) and opts in via {@code allowBootReshape =
true}. The boot
+ * installer should apply the additive update and record
+ * {@link StorageManipulationOpt.Outcome#UPDATED}.
+ */
+ @Test
+ public void testMeasureAdditiveBootReshape() throws Exception {
+ DownSamplingConfigService downSamplingConfigService = new
DownSamplingConfigService(Arrays.asList("minute"));
+ ModuleManager moduleManager = mock(ModuleManager.class);
+ ModuleProviderHolder moduleProviderHolder =
mock(ModuleProviderHolder.class);
+ ModuleServiceHolder moduleServiceHolder =
mock(ModuleServiceHolder.class);
+
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+ when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+ StorageModels models = new StorageModels();
+ Model baseModel = models.add(TestMetric.class,
DefaultScopeDefine.SERVICE,
+ new Storage("testMetric", true,
DownSampling.Minute),
+
StorageManipulationOpt.withSchemaChange());
+ BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client,
moduleManager, config);
+ installer.isExists(baseModel,
StorageManipulationOpt.withSchemaChange());
+ installer.createTable(baseModel);
+
+ models.remove(TestMetric.class,
StorageManipulationOpt.withSchemaChange());
+ Model reshapedModel = models.add(TestMetricAdditiveReshapeOn.class,
DefaultScopeDefine.SERVICE,
+ new Storage("testMetric", true,
DownSampling.Minute),
+
StorageManipulationOpt.withSchemaChange());
+ assertTrue(reshapedModel.isAllowBootReshape());
+
+ StorageManipulationOpt bootOpt =
StorageManipulationOpt.schemaCreateIfAbsent();
+ new BanyanDBIndexInstaller(client, moduleManager,
config).isExists(reshapedModel, bootOpt);
+
+ String groupName = MetadataRegistry.convertGroupName(
+ config.getGlobal().getNamespace(),
BanyanDB.MeasureGroup.METRICS_MINUTE.getName());
+ BanyandbDatabase.Measure reshaped =
client.client.findMeasure(groupName, "testMetric_minute");
+ assertEquals(2, reshaped.getTagFamilies(1).getTagsCount());
+ assertEquals("new_tag",
reshaped.getTagFamilies(1).getTags(1).getName());
+ assertEquals(2, reshaped.getFieldsCount());
+ assertEquals("new_value", reshaped.getFields(1).getName());
+
+ boolean updatedRecorded = bootOpt.getOutcomes().stream()
+ .anyMatch(o -> "measure".equals(o.getResourceType())
+ && "testMetric_minute".equals(o.getResourceName())
+ && o.getStatus() == StorageManipulationOpt.Outcome.UPDATED);
+ assertTrue(updatedRecorded, "expected UPDATED outcome for additive
measure boot reshape, got " + bootOpt.getOutcomes());
+ }
+
+ /**
+ * Opt-in is necessary but not sufficient: even with {@code
allowBootReshape = true} a
+ * diff that is not purely additive (here, the {@code tag} column flips
from
+ * {@code String} → {@code long}, i.e. {@code TAG_TYPE_STRING} → {@code
TAG_TYPE_INT})
+ * must be refused at boot. {@link
StorageManipulationOpt.Outcome#SKIPPED_SHAPE_MISMATCH}
+ * is recorded and the live stream stays unchanged, forcing the operator
to do an
+ * explicit drop+recreate (the only safe path for identity-breaking
changes).
+ */
+ @Test
+ public void testStreamNonAdditiveBootReshape_optInStillSkips() throws
Exception {
+ DownSamplingConfigService downSamplingConfigService = new
DownSamplingConfigService(Arrays.asList("minute"));
+ ModuleManager moduleManager = mock(ModuleManager.class);
+ ModuleProviderHolder moduleProviderHolder =
mock(ModuleProviderHolder.class);
+ ModuleServiceHolder moduleServiceHolder =
mock(ModuleServiceHolder.class);
+
when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleProviderHolder);
+ when(moduleProviderHolder.provider()).thenReturn(moduleServiceHolder);
+
when(moduleServiceHolder.getService(DownSamplingConfigService.class)).thenReturn(downSamplingConfigService);
+
+ StorageModels models = new StorageModels();
+ Model baseModel = models.add(TestStream.class,
DefaultScopeDefine.SERVICE,
+ new Storage("testStream", true,
DownSampling.Second),
+
StorageManipulationOpt.withSchemaChange());
+ BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client,
moduleManager, config);
+ installer.isExists(baseModel,
StorageManipulationOpt.withSchemaChange());
+ installer.createTable(baseModel);
+
+ models.remove(TestStream.class,
StorageManipulationOpt.withSchemaChange());
+ Model reshapedModel = models.add(TestStreamNonAdditiveReshapeOn.class,
DefaultScopeDefine.SERVICE,
+ new Storage("testStream", true,
DownSampling.Second),
+
StorageManipulationOpt.withSchemaChange());
+ assertTrue(reshapedModel.isAllowBootReshape());
+
+ StorageManipulationOpt bootOpt =
StorageManipulationOpt.schemaCreateIfAbsent();
+ new BanyanDBIndexInstaller(client, moduleManager,
config).isExists(reshapedModel, bootOpt);
+
+ String groupName = MetadataRegistry.convertGroupName(
+ config.getGlobal().getNamespace(),
BanyanDB.StreamGroup.RECORDS_LOG.getName());
+ BanyandbDatabase.Stream live = client.client.findStream(groupName,
"testStream");
+ assertEquals("tag", live.getTagFamilies(1).getTags(0).getName());
+ assertEquals(BanyandbDatabase.TagType.TAG_TYPE_STRING,
live.getTagFamilies(1).getTags(0).getType());
+
+ boolean skipRecorded = bootOpt.getOutcomes().stream()
+ .anyMatch(o -> "stream".equals(o.getResourceType())
+ && "testStream".equals(o.getResourceName())
+ && o.getStatus() ==
StorageManipulationOpt.Outcome.SKIPPED_SHAPE_MISMATCH);
+ assertTrue(skipRecorded,
+ "expected SKIPPED_SHAPE_MISMATCH for non-additive (type-change)
diff even with opt-in, got " + bootOpt.getOutcomes());
+ }
+
+ /**
+ * Non-additive variant: {@code tag} is now {@code long} (TAG_TYPE_INT)
where the live
+ * stream has it as {@code String} (TAG_TYPE_STRING). Boot must refuse to
reshape even
+ * with {@code allowBootReshape = true}.
+ */
+ @Stream(name = "testStream", scopeId = DefaultScopeDefine.SERVICE,
+ builder = TestStreamNonAdditiveReshapeOn.Builder.class, processor =
RecordStreamProcessor.class,
+ allowBootReshape = true)
+ @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG)
+ @BanyanDB.TimestampColumn("timestamp")
+ private static class TestStreamNonAdditiveReshapeOn extends Record {
+ @Column(name = "service_id")
+ @BanyanDB.SeriesID(index = 0)
+ private String serviceId;
+ @Column(name = "tag")
+ private long tag;
+ @Column(name = "timestamp")
+ private long timestamp;
+
+ @Override
+ public StorageID id() {
+ return new StorageID();
+ }
+
+ static class Builder implements StorageBuilder<StorageData> {
+ @Override
+ public StorageData storage2Entity(final Convert2Entity converter) {
+ return null;
+ }
+
+ @Override
+ public void entity2Storage(final StorageData entity, final
Convert2Storage converter) {
+ }
+ }
+ }
+
+ /**
+ * Mirror of {@link UpdateTestStream} but trimmed to a purely-additive
shape change
+ * (only {@code new_tag} is added; {@code tag}'s index settings stay
matched to
+ * {@link TestStream}) and with the new {@code allowBootReshape} opt-in
flipped on.
+ */
+ @Stream(name = "testStream", scopeId = DefaultScopeDefine.SERVICE,
+ builder = TestStreamAdditiveReshapeOn.Builder.class, processor =
RecordStreamProcessor.class,
+ allowBootReshape = true)
+ @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG)
+ @BanyanDB.TimestampColumn("timestamp")
+ private static class TestStreamAdditiveReshapeOn extends Record {
+ @Column(name = "service_id")
+ @BanyanDB.SeriesID(index = 0)
+ private String serviceId;
+ @Column(name = "tag")
+ @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)
+ private String tag;
+ @Column(name = "new_tag")
+ private String newTag;
+ @Column(name = "timestamp")
+ private long timestamp;
+
+ @Override
+ public StorageID id() {
+ return new StorageID();
+ }
+
+ static class Builder implements StorageBuilder<StorageData> {
+ @Override
+ public StorageData storage2Entity(final Convert2Entity converter) {
+ return null;
+ }
+
+ @Override
+ public void entity2Storage(final StorageData entity, final
Convert2Storage converter) {
+ }
+ }
+ }
+
+ /**
+ * Same additive shape as {@link TestStreamAdditiveReshapeOn} but without
the opt-in —
+ * boot must refuse to reshape and record SKIPPED_SHAPE_MISMATCH.
+ */
+ @Stream(name = "testStream", scopeId = DefaultScopeDefine.SERVICE,
+ builder = TestStreamAdditiveReshapeOff.Builder.class, processor =
RecordStreamProcessor.class)
+ @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG)
+ @BanyanDB.TimestampColumn("timestamp")
+ private static class TestStreamAdditiveReshapeOff extends Record {
+ @Column(name = "service_id")
+ @BanyanDB.SeriesID(index = 0)
+ private String serviceId;
+ @Column(name = "tag")
+ @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)
+ private String tag;
+ @Column(name = "new_tag")
+ private String newTag;
+ @Column(name = "timestamp")
+ private long timestamp;
+
+ @Override
+ public StorageID id() {
+ return new StorageID();
+ }
+
+ static class Builder implements StorageBuilder<StorageData> {
+ @Override
+ public StorageData storage2Entity(final Convert2Entity converter) {
+ return null;
+ }
+
+ @Override
+ public void entity2Storage(final StorageData entity, final
Convert2Storage converter) {
+ }
+ }
+ }
+
+ /**
+ * Mirror of {@link UpdateTestMetric} but trimmed to a purely-additive
shape change
+ * (new tag, new field) and with {@code allowBootReshape = true}.
+ */
+ @Stream(name = "testMetric", scopeId = DefaultScopeDefine.SERVICE,
+ builder = TestMetricAdditiveReshapeOn.Builder.class, processor =
MetricsStreamProcessor.class,
+ allowBootReshape = true)
+ private static class TestMetricAdditiveReshapeOn {
+ @Column(name = "service_id")
+ @BanyanDB.SeriesID(index = 0)
+ @BanyanDB.ShardingKey(index = 0)
+ private String serviceId;
+ @Column(name = "tag")
+ @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)
+ private String tag;
+ @Column(name = "new_tag")
+ private String newTag;
+ @Column(name = "value", dataType = Column.ValueDataType.COMMON_VALUE)
+ @BanyanDB.MeasureField
+ private long value;
+ @Column(name = "new_value", storageOnly = true)
+ @BanyanDB.MeasureField
+ private long newValue;
+
+ static class Builder implements StorageBuilder<StorageData> {
+ @Override
+ public StorageData storage2Entity(final Convert2Entity converter) {
+ return null;
+ }
+
+ @Override
+ public void entity2Storage(final StorageData entity, final
Convert2Storage converter) {
+ }
+ }
+ }
+
+ @Stream(name = "testStream", scopeId = DefaultScopeDefine.SERVICE,
+ builder = TestStream.Builder.class, processor =
RecordStreamProcessor.class)
+ @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG)
+ @BanyanDB.TimestampColumn("timestamp")
+ private static class TestStream extends Record {
+ @Column(name = "service_id")
+ @BanyanDB.SeriesID(index = 0)
+ private String serviceId;
+ @Column(name = "tag")
+ @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)
+ private String tag;
+ @Column(name = "timestamp")
+ private long timestamp;
+
+ @Override
+ public StorageID id() {
+ return new StorageID();
+ }
+
+ static class Builder implements StorageBuilder<StorageData> {
+ @Override
+ public StorageData storage2Entity(final Convert2Entity converter) {
+ return null;
+ }
+
+ @Override
+ public void entity2Storage(final StorageData entity, final
Convert2Storage converter) {
+
+ }
+ }
+ }
+
+ @Stream(name = "testStream", scopeId = DefaultScopeDefine.SERVICE,
+ builder = UpdateTestStream.Builder.class, processor =
RecordStreamProcessor.class)
+ @BanyanDB.Group(streamGroup = BanyanDB.StreamGroup.RECORDS_LOG)
+ @BanyanDB.TimestampColumn("timestamp")
+ private static class UpdateTestStream extends Record {
+ @Column(name = "service_id")
+ @BanyanDB.SeriesID(index = 0)
+ private String serviceId;
+ @Column(name = "tag")
+ @BanyanDB.EnableSort
+ private String tag;
+ @Column(name = "new_tag")
+ private String newTag;
+ @Column(name = "timestamp")
+ private long timestamp;
+
+ @Override
+ public StorageID id() {
+ return new StorageID();
+ }
+
+ static class Builder implements StorageBuilder<StorageData> {
+ @Override
+ public StorageData storage2Entity(final Convert2Entity converter) {
+ return null;
+ }
+
+ @Override
+ public void entity2Storage(final StorageData entity, final
Convert2Storage converter) {
+
+ }
+ }
+ }
+
+ @Stream(name = "testMetric", scopeId = DefaultScopeDefine.SERVICE,
+ builder = TestMetric.Builder.class, processor =
MetricsStreamProcessor.class)
+ private static class TestMetric {
+ @Column(name = "service_id")
+ @BanyanDB.SeriesID(index = 0)
+ @BanyanDB.ShardingKey(index = 0)
+ private String serviceId;
+ @Column(name = "tag")
+ @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)
+ private String tag;
+ @Column(name = "value", dataType = Column.ValueDataType.COMMON_VALUE)
+ @BanyanDB.MeasureField
+ private long value;
+
+ static class Builder implements StorageBuilder<StorageData> {
+ @Override
+ public StorageData storage2Entity(final Convert2Entity converter) {
+ return null;
+ }
+
+ @Override
+ public void entity2Storage(final StorageData entity, final
Convert2Storage converter) {
+
+ }
+ }
+ }
+
+ @Stream(name = "testMetric", scopeId = DefaultScopeDefine.SERVICE,
+ builder = UpdateTestMetric.Builder.class, processor =
MetricsStreamProcessor.class)
+ private static class UpdateTestMetric {
+ @Column(name = "service_id")
+ @BanyanDB.SeriesID(index = 0)
+ private String serviceId;
+ @Column(name = "tag")
+ @BanyanDB.EnableSort
+ private String tag;
+ @Column(name = "new_tag")
+ private String newTag;
+ @Column(name = "value", dataType = Column.ValueDataType.COMMON_VALUE)
+ @BanyanDB.MeasureField
+ private long value;
+ @Column(name = "new_value", storageOnly = true)
+ @BanyanDB.MeasureField
+ private long newValue;
+
+ static class Builder implements StorageBuilder<StorageData> {
+ @Override
+ public StorageData storage2Entity(final Convert2Entity converter) {
+ return null;
+ }
+
+ @Override
+ public void entity2Storage(final StorageData entity, final
Convert2Storage converter) {
+
+ }
+ }
+ }
+}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.yml
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.yml
index 1260ffa34d..b1ce48367b 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.yml
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/resources/bydb.yml
@@ -43,6 +43,7 @@ global:
# The batch size for querying profile data.
profileDataQueryBatchSize:
${SW_STORAGE_BANYANDB_QUERY_PROFILE_DATA_BATCH_SIZE:100}
asyncProfilerTaskQueryMaxSize:
${SW_STORAGE_BANYANDB_ASYNC_PROFILER_TASK_QUERY_MAX_SIZE:200}
+ pprofTaskQueryMaxSize: ${SW_STORAGE_BANYANDB_PPROF_TASK_QUERY_MAX_SIZE:200}
user: ${SW_STORAGE_BANYANDB_USER:""}
password: ${SW_STORAGE_BANYANDB_PASSWORD:""}
# If the BanyanDB server is configured with TLS, configure the TLS cert file
path and enable TLS connection.
@@ -52,6 +53,9 @@ global:
# The namespace in BanyanDB to store the data of OAP, if not set, the
default is "sw".
# OAP will create BanyanDB Groups using the format of "{namespace}_{group
name}", such as "sw_records".
namespace: ${SW_NAMESPACE:"sw"}
+ # The compatible server API versions of BanyanDB.
+ # The compatible BanyanDB Server version number can be found via the [API
versions
mapping](https://skywalking.apache.org/docs/skywalking-banyandb/latest/installation/versions/).
+ compatibleServerApiVersions:
${SW_STORAGE_BANYANDB_COMPATIBLE_SERVER_API_VERSIONS:"0.10"}
groups:
# The group settings of record.
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
index 931edd6615..0002de624b 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
@@ -43,15 +43,15 @@ public class TimeSeriesUtilsTest {
@BeforeEach
public void prepare() {
superDatasetModel = new Model("superDatasetModel",
Lists.newArrayList(),
- 0, DownSampling.Second, true,
Record.class, true,
+ 0, DownSampling.Second, true,
Record.class, true, false,
new SQLDatabaseModelExtension(), new
BanyanDBModelExtension(), new ElasticSearchModelExtension()
);
normalRecordModel = new Model("normalRecordModel",
Lists.newArrayList(),
- 0, DownSampling.Second, false,
Record.class, true,
+ 0, DownSampling.Second, false,
Record.class, true, false,
new SQLDatabaseModelExtension(), new
BanyanDBModelExtension(), new ElasticSearchModelExtension()
);
normalMetricsModel = new Model("normalMetricsModel",
Lists.newArrayList(),
- 0, DownSampling.Minute, false,
Metrics.class, true,
+ 0, DownSampling.Minute, false,
Metrics.class, true, false,
new SQLDatabaseModelExtension(), new
BanyanDBModelExtension(), new ElasticSearchModelExtension()
);
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCHistoryDeleteDAOIT.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCHistoryDeleteDAOIT.java
index c0a729ba29..ecbc900913 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCHistoryDeleteDAOIT.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCHistoryDeleteDAOIT.java
@@ -107,7 +107,7 @@ class JDBCHistoryDeleteDAOIT {
new SQLDatabaseExtension(), null, null),
new ModelColumn(new ColumnName(timeBucketColumn), Long.class,
Long.class, false, false, false, 0,
new SQLDatabaseExtension(), null, null)
- ), 1, DownSampling.Minute, false, ServiceTraffic.class, false, new
SQLDatabaseModelExtension(),
+ ), 1, DownSampling.Minute, false, ServiceTraffic.class, false,
false, new SQLDatabaseModelExtension(),
new BanyanDBModelExtension(), new ElasticSearchModelExtension());
TableMetaInfo.addModel(model);