This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new ddbed6d091 Adapt BanyanDB Java Client 0.7.0. (#12621)
ddbed6d091 is described below
commit ddbed6d091c4c20dbb43294fbd03778bdbb8471a
Author: Wan Kai <[email protected]>
AuthorDate: Sat Sep 14 15:05:20 2024 +0800
Adapt BanyanDB Java Client 0.7.0. (#12621)
---
docs/en/changes/changes.md | 1 +
oap-server-bom/pom.xml | 2 +-
.../plugin/banyandb/BanyanDBIndexInstaller.java | 45 ++--
.../plugin/banyandb/BanyanDBNoneStreamDAO.java | 2 +-
.../plugin/banyandb/BanyanDBStorageClient.java | 58 +++--
.../plugin/banyandb/BanyanDBStorageProvider.java | 10 +-
.../banyandb/BanyanDBUIMenuManagementDAO.java | 27 ++-
.../banyandb/BanyanDBUITemplateManagementDAO.java | 55 +++--
.../storage/plugin/banyandb/MeasureModel.java | 32 +++
.../storage/plugin/banyandb/MetadataRegistry.java | 265 +++++++++++++--------
.../storage/plugin/banyandb/StreamModel.java | 32 +++
.../BanyanDBContinuousProfilingPolicyDAO.java | 34 ++-
test/e2e-v2/script/env | 2 +-
13 files changed, 391 insertions(+), 174 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 96e4410e41..0e5e806f81 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -65,6 +65,7 @@
* Fix the previous analysis result missing in the ALS `k8s-mesh` analyzer.
* Fix `findEndpoint` query require `keyword` when using BanyanDB.
* Support to analysis the ztunnel mapped IP address in eBPF Access Log
Receiver.
+* Adapt BanyanDB Java Client 0.7.0-rc3.
#### UI
diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index f5385435d8..158f66f033 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -73,7 +73,7 @@
<httpcore.version>4.4.13</httpcore.version>
<httpasyncclient.version>4.1.5</httpasyncclient.version>
<commons-compress.version>1.21</commons-compress.version>
- <banyandb-java-client.version>0.7.0-rc2</banyandb-java-client.version>
+ <banyandb-java-client.version>0.7-rc3</banyandb-java-client.version>
<kafka-clients.version>3.4.0</kafka-clients.version>
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
<consul.client.version>1.5.3</consul.client.version>
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index b95737843b..9c98a99d2b 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -22,8 +22,9 @@ import io.grpc.Status;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
-import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
+import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.StorageException;
@@ -31,6 +32,7 @@ import
org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@Slf4j
public class BanyanDBIndexInstaller extends ModelInstaller {
@@ -58,20 +60,20 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
final boolean resourceExist = metadata.checkResourceExistence(c);
if (!resourceExist) {
return false;
- }
-
- // then check entity schema
- if (metadata.findRemoteSchema(c).isPresent()) {
- // register models only locally but not remotely
+ } else {
+ // register models only locally(Schema cache) but not remotely
if (model.isRecord()) { // stream
MetadataRegistry.INSTANCE.registerStreamModel(model,
config, configService);
} else { // measure
MetadataRegistry.INSTANCE.registerMeasureModel(model,
config, configService);
}
+ // pre-load remote schema for java client
+ MetadataCache.EntityMetadata remoteMeta =
metadata.updateRemoteSchema(c);
+ if (remoteMeta == null) {
+ throw new IllegalStateException("inconsistent state:
metadata:" + metadata + ", remoteMeta: null");
+ }
return true;
}
-
- throw new IllegalStateException("inconsistent state:" + metadata);
} catch (BanyanDBException ex) {
throw new StorageException("fail to check existence", ex);
}
@@ -84,11 +86,17 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
.provider()
.getService(ConfigService.class);
if (model.isRecord()) { // stream
- Stream stream =
MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
+ StreamModel streamModel =
MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
+ Stream stream = streamModel.getStream();
if (stream != null) {
log.info("install stream schema {}", model.getName());
+ final BanyanDBClient client = ((BanyanDBStorageClient)
this.client).client;
try {
- ((BanyanDBStorageClient) client).define(stream);
+ if
(CollectionUtils.isNotEmpty(streamModel.getIndexRules())) {
+ client.define(stream, streamModel.getIndexRules());
+ } else {
+ client.define(stream);
+ }
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS))
{
log.info(
@@ -102,12 +110,17 @@ public class BanyanDBIndexInstaller extends
ModelInstaller {
}
}
} else { // measure
- Measure measure =
MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
+ MeasureModel measureModel =
MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
+ Measure measure = measureModel.getMeasure();
if (measure != null) {
- log.info("install measure schema {}", measure.name());
- final BanyanDBClient c = ((BanyanDBStorageClient)
this.client).client;
+ log.info("install measure schema {}", model.getName());
+ final BanyanDBClient client = ((BanyanDBStorageClient)
this.client).client;
try {
- c.define(measure);
+ if
(CollectionUtils.isNotEmpty(measureModel.getIndexRules())) {
+ client.define(measure,
measureModel.getIndexRules());
+ } else {
+ client.define(measure);
+ }
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS))
{
log.info("Measure schema {}_{} already created by
another OAP node",
@@ -119,7 +132,7 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
}
final MetadataRegistry.Schema schema =
MetadataRegistry.INSTANCE.findMetadata(model);
try {
- schema.installTopNAggregation(c);
+ schema.installTopNAggregation(client);
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS))
{
log.info("Measure schema {}_{} TopN({}) already
created by another OAP node",
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
index 1dc2c2b7d2..51ce5381ac 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
@@ -44,7 +44,7 @@ public class BanyanDBNoneStreamDAO extends
AbstractDAO<BanyanDBStorageClient> im
if (schema == null) {
throw new IOException(model.getName() + " is not registered");
}
- StreamWrite streamWrite = getClient().client.createStreamWrite(
+ StreamWrite streamWrite = getClient().createStreamWrite(
schema.getMetadata().getGroup(), // group name
schema.getMetadata().name(), // stream-name
noneStream.id().build() // identity
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 26069cdc0d..35ce37194e 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import io.grpc.Status;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
@@ -30,14 +31,15 @@ import
org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
import org.apache.skywalking.banyandb.v1.client.TopNQuery;
import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
+import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
+import
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.ApplyRequest.Strategy;
+import
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.DeleteResponse;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.AlreadyExistsException;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.metadata.Group;
-import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
-import org.apache.skywalking.banyandb.v1.client.metadata.Property;
-import org.apache.skywalking.banyandb.v1.client.metadata.PropertyStore;
-import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
-import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
import org.apache.skywalking.oap.server.library.client.Client;
import
org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
import
org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
@@ -103,9 +105,9 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
}
}
- public PropertyStore.DeleteResult deleteProperty(String group, String
name, String id, String... tags) throws IOException {
+ public DeleteResponse deleteProperty(String group, String name, String id,
String... tags) throws IOException {
try {
- PropertyStore.DeleteResult result =
this.client.deleteProperty(group, name, id, tags);
+ DeleteResponse result = this.client.deleteProperty(group, name,
id, tags);
this.healthChecker.health();
return result;
} catch (BanyanDBException ex) {
@@ -158,7 +160,7 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
}
/**
- * PropertyStore.Strategy is default to {@link
PropertyStore.Strategy#MERGE}
+ * PropertyStore.Strategy is default to {@link Strategy#STRATEGY_MERGE}
*/
public void define(Property property) throws IOException {
try {
@@ -170,7 +172,7 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
}
}
- public void define(Property property, PropertyStore.Strategy strategy)
throws IOException {
+ public void define(Property property, Strategy strategy) throws
IOException {
try {
this.client.apply(property, strategy);
this.healthChecker.health();
@@ -190,6 +192,16 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
}
}
+ public void define(Stream stream, List<BanyandbDatabase.IndexRule>
indexRules) throws BanyanDBException {
+ try {
+ this.client.define(stream, indexRules);
+ this.healthChecker.health();
+ } catch (BanyanDBException ex) {
+ healthChecker.unHealth(ex);
+ throw ex;
+ }
+ }
+
public void define(Measure measure) throws BanyanDBException {
try {
this.client.define(measure);
@@ -200,6 +212,16 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
}
}
+ public void define(Measure measure, List<BanyandbDatabase.IndexRule>
indexRules) throws BanyanDBException {
+ try {
+ this.client.define(measure, indexRules);
+ this.healthChecker.health();
+ } catch (BanyanDBException ex) {
+ healthChecker.unHealth(ex);
+ throw ex;
+ }
+ }
+
public void defineIfEmpty(Group group) throws IOException {
try {
try {
@@ -223,12 +245,20 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
}
}
- public StreamWrite createStreamWrite(String group, String name, String
elementId) {
- return this.client.createStreamWrite(group, name, elementId);
+ public StreamWrite createStreamWrite(String group, String name, String
elementId) throws IOException {
+ try {
+ return this.client.createStreamWrite(group, name, elementId);
+ } catch (BanyanDBException e) {
+ throw new IOException("fail to create stream write", e);
+ }
}
- public MeasureWrite createMeasureWrite(String group, String name, long
timestamp) {
- return this.client.createMeasureWrite(group, name, timestamp);
+ public MeasureWrite createMeasureWrite(String group, String name, long
timestamp) throws IOException {
+ try {
+ return this.client.createMeasureWrite(group, name, timestamp);
+ } catch (BanyanDBException e) {
+ throw new IOException("fail to create measure write", e);
+ }
}
public void write(StreamWrite streamWrite) {
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index 5a6c443d7a..e7660921b3 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
-import org.apache.skywalking.banyandb.v1.client.metadata.Group;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
@@ -175,7 +175,13 @@ public class BanyanDBStorageProvider extends
ModuleProvider {
this.client.registerChecker(healthChecker);
try {
this.client.connect();
-
this.client.defineIfEmpty(Group.create(BanyanDBUITemplateManagementDAO.GROUP));
+ this.client.defineIfEmpty(BanyandbCommon.Group.newBuilder()
+ .setMetadata(
+
BanyandbCommon.Metadata.newBuilder()
+
.setName(
+
BanyanDBUITemplateManagementDAO.GROUP))
+
.setCatalog(BanyandbCommon.Catalog.CATALOG_UNSPECIFIED)
+ .build());
this.modelInstaller.start();
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(modelInstaller);
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUIMenuManagementDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUIMenuManagementDAO.java
index e6b923c904..74e7cc75dd 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUIMenuManagementDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUIMenuManagementDAO.java
@@ -19,8 +19,11 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.banyandb.v1.client.metadata.Property;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
import org.apache.skywalking.oap.server.core.management.ui.menu.UIMenu;
import
org.apache.skywalking.oap.server.core.storage.management.UIMenuManagementDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
@@ -46,17 +49,27 @@ public class BanyanDBUIMenuManagementDAO extends
AbstractBanyanDBDAO implements
@Override
public void saveMenu(UIMenu menu) throws IOException {
- this.getClient().define(Property.create(GROUP, UIMenu.INDEX_NAME,
menu.id().build())
- .addTag(TagAndValue.newStringTag(UIMenu.CONFIGURATION,
menu.getConfigurationJson()))
- .addTag(TagAndValue.newLongTag(UIMenu.UPDATE_TIME,
menu.getUpdateTime()))
- .build());
+ Property property = Property.newBuilder()
+
.setMetadata(BanyandbProperty.Metadata.newBuilder().setId(menu.getMenuId())
+
.setContainer(
+
BanyandbCommon.Metadata.newBuilder()
+
.setGroup(GROUP)
+
.setName(
+
UIMenu.INDEX_NAME)))
+
+
.addTags(TagAndValue.newStringTag(UIMenu.CONFIGURATION,
menu.getConfigurationJson())
+ .build())
+
.addTags(TagAndValue.newLongTag(UIMenu.UPDATE_TIME,
menu.getUpdateTime()).build())
+ .build();
+ this.getClient().define(property);
}
public UIMenu parse(Property property) {
UIMenu menu = new UIMenu();
- menu.setMenuId(property.id());
+ menu.setMenuId(property.getMetadata().getId());
- for (TagAndValue<?> tagAndValue : property.tags()) {
+ for (BanyandbModel.Tag tag : property.getTagsList()) {
+ TagAndValue<?> tagAndValue = TagAndValue.fromProtobuf(tag);
if (tagAndValue.getTagName().equals(UIMenu.CONFIGURATION)) {
menu.setConfigurationJson((String) tagAndValue.getValue());
} else if (tagAndValue.getTagName().equals(UIMenu.UPDATE_TIME)) {
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUITemplateManagementDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUITemplateManagementDAO.java
index f6f6414828..9bd4ef26bf 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUITemplateManagementDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUITemplateManagementDAO.java
@@ -19,8 +19,11 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.banyandb.v1.client.metadata.Property;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
@@ -64,7 +67,7 @@ public class BanyanDBUITemplateManagementDAO extends
AbstractBanyanDBDAO impleme
this.getClient().define(newTemplate);
return TemplateChangeStatus.builder()
.status(true)
- .id(newTemplate.id())
+ .id(newTemplate.getMetadata().getId())
.build();
} catch (IOException ioEx) {
log.error("fail to add new template", ioEx);
@@ -80,7 +83,7 @@ public class BanyanDBUITemplateManagementDAO extends
AbstractBanyanDBDAO impleme
this.getClient().define(newTemplate);
return TemplateChangeStatus.builder()
.status(true)
- .id(newTemplate.id())
+ .id(newTemplate.getMetadata().getId())
.build();
} catch (IOException ioEx) {
log.error("fail to modify the template", ioEx);
@@ -118,9 +121,10 @@ public class BanyanDBUITemplateManagementDAO extends
AbstractBanyanDBDAO impleme
public UITemplate parse(Property property) {
UITemplate uiTemplate = new UITemplate();
- uiTemplate.setTemplateId(property.id());
+ uiTemplate.setTemplateId(property.getMetadata().getId());
- for (TagAndValue<?> tagAndValue : property.tags()) {
+ for (BanyandbModel.Tag tag : property.getTagsList()) {
+ TagAndValue<?> tagAndValue = TagAndValue.fromProtobuf(tag);
if (tagAndValue.getTagName().equals(UITemplate.CONFIGURATION)) {
uiTemplate.setConfiguration((String) tagAndValue.getValue());
} else if (tagAndValue.getTagName().equals(UITemplate.DISABLED)) {
@@ -133,11 +137,16 @@ public class BanyanDBUITemplateManagementDAO extends
AbstractBanyanDBDAO impleme
}
public Property applyAll(UITemplate uiTemplate) {
- return Property.create(GROUP, UITemplate.INDEX_NAME,
uiTemplate.id().build())
- .addTag(TagAndValue.newStringTag(UITemplate.CONFIGURATION,
uiTemplate.getConfiguration()))
- .addTag(TagAndValue.newLongTag(UITemplate.DISABLED,
uiTemplate.getDisabled()))
- .addTag(TagAndValue.newLongTag(UITemplate.UPDATE_TIME,
uiTemplate.getUpdateTime()))
- .build();
+ return Property.newBuilder()
+ .setMetadata(BanyandbProperty.Metadata.newBuilder()
+ .setId(uiTemplate.id().build())
+ .setContainer(BanyandbCommon.Metadata.newBuilder()
+ .setGroup(GROUP)
+ .setName(UITemplate.INDEX_NAME)))
+ .addTags(TagAndValue.newStringTag(UITemplate.CONFIGURATION,
uiTemplate.getConfiguration()).build())
+ .addTags(TagAndValue.newLongTag(UITemplate.DISABLED,
uiTemplate.getDisabled()).build())
+ .addTags(TagAndValue.newLongTag(UITemplate.UPDATE_TIME,
uiTemplate.getUpdateTime()).build())
+ .build();
}
/**
@@ -147,10 +156,15 @@ public class BanyanDBUITemplateManagementDAO extends
AbstractBanyanDBDAO impleme
* @return new property (patch) to be applied
*/
public Property applyStatus(UITemplate uiTemplate) {
- return Property.create(GROUP, UITemplate.INDEX_NAME,
uiTemplate.id().build())
- .addTag(TagAndValue.newLongTag(UITemplate.DISABLED,
uiTemplate.getDisabled()))
- .addTag(TagAndValue.newLongTag(UITemplate.UPDATE_TIME,
uiTemplate.getUpdateTime()))
- .build();
+ return Property.newBuilder()
+ .setMetadata(BanyandbProperty.Metadata.newBuilder()
+ .setId(uiTemplate.id().build())
+ .setContainer(BanyandbCommon.Metadata.newBuilder()
+ .setGroup(GROUP)
+ .setName(UITemplate.INDEX_NAME)))
+ .addTags(TagAndValue.newLongTag(UITemplate.DISABLED,
uiTemplate.getDisabled()).build())
+ .addTags(TagAndValue.newLongTag(UITemplate.UPDATE_TIME,
uiTemplate.getUpdateTime()).build())
+ .build();
}
/**
@@ -160,9 +174,14 @@ public class BanyanDBUITemplateManagementDAO extends
AbstractBanyanDBDAO impleme
* @return new property (patch) to be applied
*/
public Property applyConfiguration(UITemplate uiTemplate) {
- return Property.create(GROUP, UITemplate.INDEX_NAME,
uiTemplate.id().build())
- .addTag(TagAndValue.newStringTag(UITemplate.CONFIGURATION,
uiTemplate.getConfiguration()))
- .addTag(TagAndValue.newLongTag(UITemplate.UPDATE_TIME,
uiTemplate.getUpdateTime()))
- .build();
+ return Property.newBuilder()
+ .setMetadata(BanyandbProperty.Metadata.newBuilder()
+ .setId(uiTemplate.id().build())
+ .setContainer(BanyandbCommon.Metadata.newBuilder()
+ .setGroup(GROUP)
+ .setName(UITemplate.INDEX_NAME)))
+ .addTags(TagAndValue.newStringTag(UITemplate.CONFIGURATION,
uiTemplate.getConfiguration()).build())
+ .addTags(TagAndValue.newLongTag(UITemplate.UPDATE_TIME,
uiTemplate.getUpdateTime()).build())
+ .build();
}
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MeasureModel.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MeasureModel.java
new file mode 100644
index 0000000000..09f3904839
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MeasureModel.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import java.util.List;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
+
+@RequiredArgsConstructor
+@Getter
+public class MeasureModel {
+ private final Measure measure;
+ private final List<IndexRule> indexRules;
+}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 86795cf496..bdf53a6761 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -48,20 +48,30 @@ import lombok.Setter;
import lombok.Singular;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Metadata;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.IntervalRule;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Catalog;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.ResourceOpts;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
+import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagFamilySpec;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagSpec;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType;
+import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.FieldSpec;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.FieldType;
+import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.CompressionMethod;
+import
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.EncodingMethod;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
-import org.apache.skywalking.banyandb.v1.client.metadata.Group;
-import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
-import org.apache.skywalking.banyandb.v1.client.metadata.IntervalRule;
-import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
-import org.apache.skywalking.banyandb.v1.client.metadata.NamedSchema;
+import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
-import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
-import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
-import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -86,7 +96,7 @@ public enum MetadataRegistry {
private Map<String, GroupSetting> specificGroupSettings = new HashMap<>();
- public Stream registerStreamModel(Model model, BanyanDBStorageConfig
config, ConfigService configService) {
+ public StreamModel registerStreamModel(Model model, BanyanDBStorageConfig
config, ConfigService configService) {
final SchemaMetadata schemaMetadata = parseMetadata(model, config,
configService);
Schema.SchemaBuilder schemaBuilder =
Schema.builder().metadata(schemaMetadata);
Map<String, ModelColumn> modelColumnMap = model.getColumns().stream()
@@ -100,12 +110,12 @@ public enum MetadataRegistry {
// this can be used to build both
// 1) a list of TagFamilySpec,
// 2) a list of IndexRule,
- List<TagMetadata> tags = parseTagMetadata(model, schemaBuilder,
shardingColumns);
+ List<TagMetadata> tags = parseTagMetadata(model, schemaBuilder,
shardingColumns, schemaMetadata.group);
List<TagFamilySpec> tagFamilySpecs =
schemaMetadata.extractTagFamilySpec(tags, false);
// iterate over tagFamilySpecs to save tag names
for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
- for (final TagFamilySpec.TagSpec tagSpec :
tagFamilySpec.tagSpecs()) {
- schemaBuilder.tag(tagSpec.getTagName());
+ for (final TagSpec tagSpec : tagFamilySpec.getTagsList()) {
+ schemaBuilder.tag(tagSpec.getName());
}
}
String timestampColumn4Stream =
model.getBanyanDBModelExtension().getTimestampColumn();
@@ -119,15 +129,18 @@ public enum MetadataRegistry {
.filter(Objects::nonNull)
.collect(Collectors.toList());
- final Stream.Builder builder =
Stream.create(schemaMetadata.getGroup(), schemaMetadata.name());
- builder.setEntityRelativeTags(shardingColumns);
- builder.addTagFamilies(tagFamilySpecs);
- builder.addIndexes(indexRules);
+ final Stream.Builder builder = Stream.newBuilder();
+
builder.setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(schemaMetadata.getGroup())
+ .setName(schemaMetadata.name()));
+
builder.setEntity(BanyandbDatabase.Entity.newBuilder().addAllTagNames(shardingColumns));
+ builder.addAllTagFamilies(tagFamilySpecs);
+
+ //builder.addIndexes(indexRules);
registry.put(schemaMetadata.name(), schemaBuilder.build());
- return builder.build();
+ return new StreamModel(builder.build(), indexRules);
}
- public Measure registerMeasureModel(Model model, BanyanDBStorageConfig
config, ConfigService configService) throws StorageException {
+ public MeasureModel registerMeasureModel(Model model,
BanyanDBStorageConfig config, ConfigService configService) throws
StorageException {
final SchemaMetadata schemaMetadata = parseMetadata(model, config,
configService);
Schema.SchemaBuilder schemaBuilder =
Schema.builder().metadata(schemaMetadata);
Map<String, ModelColumn> modelColumnMap = model.getColumns().stream()
@@ -141,12 +154,12 @@ public enum MetadataRegistry {
// this can be used to build both
// 1) a list of TagFamilySpec,
// 2) a list of IndexRule,
- MeasureMetadata tagsAndFields = parseTagAndFieldMetadata(model,
schemaBuilder, shardingColumns);
+ MeasureMetadata tagsAndFields = parseTagAndFieldMetadata(model,
schemaBuilder, shardingColumns, schemaMetadata.group);
List<TagFamilySpec> tagFamilySpecs =
schemaMetadata.extractTagFamilySpec(tagsAndFields.tags,
model.getBanyanDBModelExtension().isStoreIDTag());
// iterate over tagFamilySpecs to save tag names
for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
- for (final TagFamilySpec.TagSpec tagSpec :
tagFamilySpec.tagSpecs()) {
- schemaBuilder.tag(tagSpec.getTagName());
+ for (final TagSpec tagSpec : tagFamilySpec.getTagsList()) {
+ schemaBuilder.tag(tagSpec.getName());
}
}
List<IndexRule> indexRules = tagsAndFields.tags.stream()
@@ -155,26 +168,29 @@ public enum MetadataRegistry {
.collect(Collectors.toList());
if (model.getBanyanDBModelExtension().isStoreIDTag()) {
- indexRules.add(IndexRule.create(BanyanDBConverter.ID,
IndexRule.IndexType.INVERTED));
- }
-
- final Measure.Builder builder =
Measure.create(schemaMetadata.getGroup(), schemaMetadata.name(),
- downSamplingDuration(model.getDownsampling()));
- builder.setEntityRelativeTags(shardingColumns);
- builder.addTagFamilies(tagFamilySpecs);
- if (!indexRules.isEmpty()) {
- builder.addIndexes(indexRules);
- }
+ indexRules.add(indexRule(schemaMetadata.group,
BanyanDBConverter.ID));
+ // indexRules.add(IndexRule.create(BanyanDBConverter.ID,
IndexRule.IndexType.INVERTED));
+ }
+
+ final Measure.Builder builder = Measure.newBuilder();
+
builder.setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(schemaMetadata.getGroup())
+ .setName(schemaMetadata.name()));
+
builder.setInterval(downSamplingDuration(model.getDownsampling()).format());
+
builder.setEntity(BanyandbDatabase.Entity.newBuilder().addAllTagNames(shardingColumns));
+ builder.addAllTagFamilies(tagFamilySpecs);
+// if (!indexRules.isEmpty()) {
+// builder.addIndexes(indexRules);
+// }
// parse and set field
- for (Measure.FieldSpec field : tagsAndFields.fields) {
- builder.addField(field);
+ for (BanyandbDatabase.FieldSpec field : tagsAndFields.fields) {
+ builder.addFields(field);
schemaBuilder.field(field.getName());
}
// parse TopN
schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.name()));
registry.put(schemaMetadata.name(), schemaBuilder.build());
- return builder.build();
+ return new MeasureModel(builder.build(), indexRules);
}
private TopNSpec parseTopNSpec(final Model model, final String measureName)
@@ -198,7 +214,7 @@ public enum MetadataRegistry {
.countersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber())
.fieldName(valueColumnOpt.get().getValueCName())
.groupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())
- .sort(AbstractQuery.Sort.UNSPECIFIED) // include both TopN and
BottomN
+ .sort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both
TopN and BottomN
.build();
}
@@ -245,27 +261,31 @@ public enum MetadataRegistry {
return this.registry.get(SchemaMetadata.formatName(modelName,
downSampling));
}
- private Measure.FieldSpec parseFieldSpec(ModelColumn modelColumn) {
+ private FieldSpec parseFieldSpec(ModelColumn modelColumn) {
String colName = modelColumn.getColumnName().getStorageName();
if (String.class.equals(modelColumn.getType())) {
- return Measure.FieldSpec.newIntField(colName)
- .compressWithZSTD()
- .build();
+ return FieldSpec.newBuilder().setName(colName)
+ .setFieldType(FieldType.FIELD_TYPE_STRING)
+
.setCompressionMethod(CompressionMethod.COMPRESSION_METHOD_ZSTD)
+ .build();
} else if (long.class.equals(modelColumn.getType()) ||
int.class.equals(modelColumn.getType())) {
- return Measure.FieldSpec.newIntField(colName)
- .compressWithZSTD()
- .encodeWithGorilla()
- .build();
+ return FieldSpec.newBuilder().setName(colName)
+ .setFieldType(FieldType.FIELD_TYPE_INT)
+
.setCompressionMethod(CompressionMethod.COMPRESSION_METHOD_ZSTD)
+
.setEncodingMethod(EncodingMethod.ENCODING_METHOD_GORILLA)
+ .build();
} else if
(StorageDataComplexObject.class.isAssignableFrom(modelColumn.getType()) ||
JsonObject.class.equals(modelColumn.getType())) {
- return Measure.FieldSpec.newStringField(colName)
- .compressWithZSTD()
- .build();
+ return FieldSpec.newBuilder().setName(colName)
+ .setFieldType(FieldType.FIELD_TYPE_STRING)
+
.setCompressionMethod(CompressionMethod.COMPRESSION_METHOD_ZSTD)
+ .build();
} else if (double.class.equals(modelColumn.getType())) {
// TODO: natively support double/float in BanyanDB
log.warn("Double is stored as binary");
- return Measure.FieldSpec.newBinaryField(colName)
- .compressWithZSTD()
- .build();
+ return FieldSpec.newBuilder().setName(colName)
+ .setFieldType(FieldType.FIELD_TYPE_DATA_BINARY)
+
.setCompressionMethod(CompressionMethod.COMPRESSION_METHOD_ZSTD)
+ .build();
} else {
throw new
UnsupportedOperationException(modelColumn.getType().getSimpleName() + " is not
supported for field");
}
@@ -284,8 +304,11 @@ public enum MetadataRegistry {
}
}
- IndexRule indexRule(String tagName) {
- return IndexRule.create(tagName, IndexRule.IndexType.INVERTED);
+ IndexRule indexRule(String group, String tagName) {
+ return IndexRule.newBuilder()
+
.setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group))
+
.setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName).build();
+ //return IndexRule.create(tagName, IndexRule.IndexType.INVERTED);
}
/**
@@ -314,18 +337,18 @@ public enum MetadataRegistry {
*
* @since 9.4.0 Skip {@link Record#TIME_BUCKET}
*/
- List<TagMetadata> parseTagMetadata(Model model, Schema.SchemaBuilder
builder, List<String> shardingColumns) {
+ List<TagMetadata> parseTagMetadata(Model model, Schema.SchemaBuilder
builder, List<String> shardingColumns, String group) {
List<TagMetadata> tagMetadataList = new ArrayList<>();
for (final ModelColumn col : model.getColumns()) {
final String columnStorageName =
col.getColumnName().getStorageName();
if (columnStorageName.equals(Record.TIME_BUCKET)) {
continue;
}
- final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col);
+ final TagSpec tagSpec = parseTagSpec(col);
builder.spec(columnStorageName, new ColumnSpec(ColumnType.TAG,
col.getType()));
String colName = col.getColumnName().getStorageName();
if (!shardingColumns.contains(colName) &&
col.getBanyanDBExtension().shouldIndex()) {
- tagMetadataList.add(new
TagMetadata(indexRule(tagSpec.getTagName()), tagSpec));
+ tagMetadataList.add(new TagMetadata(indexRule(group,
tagSpec.getName()), tagSpec));
} else {
tagMetadataList.add(new TagMetadata(null, tagSpec));
}
@@ -339,7 +362,7 @@ public enum MetadataRegistry {
@Singular
private final List<TagMetadata> tags;
@Singular
- private final List<Measure.FieldSpec> fields;
+ private final List<BanyandbDatabase.FieldSpec> fields;
}
/**
@@ -349,7 +372,7 @@ public enum MetadataRegistry {
*
* @since 9.4.0 Skip {@link Metrics#TIME_BUCKET}
*/
- MeasureMetadata parseTagAndFieldMetadata(Model model, Schema.SchemaBuilder
builder, List<String> shardingColumns) {
+ MeasureMetadata parseTagAndFieldMetadata(Model model, Schema.SchemaBuilder
builder, List<String> shardingColumns, String group) {
// skip metric
MeasureMetadata.MeasureMetadataBuilder result =
MeasureMetadata.builder();
for (final ModelColumn col : model.getColumns()) {
@@ -362,10 +385,10 @@ public enum MetadataRegistry {
result.field(parseFieldSpec(col));
continue;
}
- final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col);
+ final TagSpec tagSpec = parseTagSpec(col);
builder.spec(columnStorageName, new ColumnSpec(ColumnType.TAG,
col.getType()));
String colName = col.getColumnName().getStorageName();
- result.tag(new TagMetadata(!shardingColumns.contains(colName) &&
col.getBanyanDBExtension().shouldIndex() ? indexRule(tagSpec.getTagName()) :
null, tagSpec));
+ result.tag(new TagMetadata(!shardingColumns.contains(colName) &&
col.getBanyanDBExtension().shouldIndex() ? indexRule(group, tagSpec.getName())
: null, tagSpec));
}
return result.build();
@@ -378,36 +401,36 @@ public enum MetadataRegistry {
* @return a typed tag spec
*/
@Nonnull
- private TagFamilySpec.TagSpec parseTagSpec(ModelColumn modelColumn) {
+ private TagSpec parseTagSpec(ModelColumn modelColumn) {
final Class<?> clazz = modelColumn.getType();
final String colName = modelColumn.getColumnName().getStorageName();
- TagFamilySpec.TagSpec tagSpec = null;
+ TagSpec.Builder tagSpec = TagSpec.newBuilder().setName(colName);
if (String.class.equals(clazz) ||
StorageDataComplexObject.class.isAssignableFrom(clazz) ||
JsonObject.class.equals(clazz)) {
- tagSpec = TagFamilySpec.TagSpec.newStringTag(colName);
+ tagSpec = tagSpec.setType(TagType.TAG_TYPE_STRING);
} else if (int.class.equals(clazz) || long.class.equals(clazz)) {
- tagSpec = TagFamilySpec.TagSpec.newIntTag(colName);
+ tagSpec = tagSpec.setType(TagType.TAG_TYPE_INT);
} else if (byte[].class.equals(clazz)) {
- tagSpec = TagFamilySpec.TagSpec.newBinaryTag(colName);
+ tagSpec = tagSpec.setType(TagType.TAG_TYPE_DATA_BINARY);
} else if (clazz.isEnum()) {
- tagSpec = TagFamilySpec.TagSpec.newIntTag(colName);
+ tagSpec = tagSpec.setType(TagType.TAG_TYPE_INT);
} else if (double.class.equals(clazz) || Double.class.equals(clazz)) {
// serialize double as binary
- tagSpec = TagFamilySpec.TagSpec.newBinaryTag(colName);
+ tagSpec = tagSpec.setType(TagType.TAG_TYPE_DATA_BINARY);
} else if (IntList.class.isAssignableFrom(clazz)) {
- tagSpec = TagFamilySpec.TagSpec.newIntArrayTag(colName);
+ tagSpec = tagSpec.setType(TagType.TAG_TYPE_INT_ARRAY);
} else if (List.class.isAssignableFrom(clazz)) { // handle exceptions
ParameterizedType t = (ParameterizedType)
modelColumn.getGenericType();
if (String.class.equals(t.getActualTypeArguments()[0])) {
- tagSpec = TagFamilySpec.TagSpec.newStringArrayTag(colName);
+ tagSpec = tagSpec.setType(TagType.TAG_TYPE_STRING_ARRAY);
}
- }
- if (tagSpec == null) {
+ } else {
throw new IllegalStateException("type " +
modelColumn.getType().toString() + " is not supported");
}
+
if (modelColumn.isIndexOnly()) {
- tagSpec.indexedOnly();
+ tagSpec.setIndexedOnly(true);
}
- return tagSpec;
+ return tagSpec.build();
}
public void initializeIntervals(String specificGroupSettingsStr) {
@@ -507,7 +530,7 @@ public enum MetadataRegistry {
return modelName + "_" + downSampling.getName();
}
- public Optional<NamedSchema<?>> findRemoteSchema(BanyanDBClient
client) throws BanyanDBException {
+ public Optional<Object> findRemoteSchema(BanyanDBClient client) throws
BanyanDBException {
try {
switch (kind) {
case STREAM:
@@ -526,6 +549,17 @@ public enum MetadataRegistry {
}
}
+ public MetadataCache.EntityMetadata updateRemoteSchema(BanyanDBClient
client) throws BanyanDBException {
+ switch (kind) {
+ case STREAM:
+ return
client.updateStreamMetadataCacheFromSever(this.group, this.name());
+ case MEASURE:
+ return
client.updateMeasureMetadataCacheFromSever(this.group, this.name());
+ default:
+ throw new IllegalStateException("should not reach here");
+ }
+ }
+
private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata>
tagMetadataList, boolean shouldAddID) {
final String indexFamily = SchemaMetadata.this.indexFamily();
final String nonIndexFamily = SchemaMetadata.this.nonIndexFamily();
@@ -534,10 +568,11 @@ public enum MetadataRegistry {
final List<TagFamilySpec> tagFamilySpecs = new
ArrayList<>(tagMetadataMap.size());
for (final Map.Entry<String, List<TagMetadata>> entry :
tagMetadataMap.entrySet()) {
- final TagFamilySpec.Builder b =
TagFamilySpec.create(entry.getKey())
-
.addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList()));
+ final TagFamilySpec.Builder b = TagFamilySpec.newBuilder();
+ b.setName(entry.getKey());
+
b.addAllTags(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList()));
if (shouldAddID && indexFamily.equals(entry.getKey())) {
-
b.addTagSpec(TagFamilySpec.TagSpec.newStringTag(BanyanDBConverter.ID));
+
b.addTags(TagSpec.newBuilder().setType(TagType.TAG_TYPE_STRING).setName(BanyanDBConverter.ID));
}
tagFamilySpecs.add(b.build());
}
@@ -547,19 +582,31 @@ public enum MetadataRegistry {
public boolean checkResourceExistence(BanyanDBClient client) throws
BanyanDBException {
ResourceExist resourceExist;
+ Group.Builder gBuilder
+ = Group.newBuilder()
+ .setMetadata(Metadata.newBuilder().setName(this.group))
+ .setResourceOpts(ResourceOpts.newBuilder()
+ .setShardNum(this.shard)
+ .setSegmentInterval(
+
IntervalRule.newBuilder()
+ .setUnit(
+
IntervalRule.Unit.UNIT_DAY)
+ .setNum(
+
this.segmentIntervalDays))
+ .setTtl(
+
IntervalRule.newBuilder()
+ .setUnit(
+
IntervalRule.Unit.UNIT_DAY)
+ .setNum(
+
this.ttlDays)));
switch (kind) {
case STREAM:
resourceExist = client.existStream(this.group,
this.name());
if (!resourceExist.hasGroup()) {
try {
- Group g = client.define(Group.create(this.group,
Catalog.STREAM, this.shard,
-
IntervalRule.create(
-
IntervalRule.Unit.DAY, this.segmentIntervalDays),
-
IntervalRule.create(
-
IntervalRule.Unit.DAY, this.ttlDays)
- ));
+ Group g =
client.define(gBuilder.setCatalog(Catalog.CATALOG_STREAM).build());
if (g != null) {
- log.info("group {} created", g.name());
+ log.info("group {} created",
g.getMetadata().getName());
}
} catch (BanyanDBException ex) {
if
(ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
@@ -574,14 +621,15 @@ public enum MetadataRegistry {
resourceExist = client.existMeasure(this.group,
this.name());
try {
if (!resourceExist.hasGroup()) {
- Group g = client.define(Group.create(this.group,
Catalog.MEASURE, this.shard,
-
IntervalRule.create(
-
IntervalRule.Unit.DAY, this.segmentIntervalDays),
-
IntervalRule.create(
-
IntervalRule.Unit.DAY, this.ttlDays)
- ));
+ Group g =
client.define(gBuilder.setCatalog(Catalog.CATALOG_MEASURE).build());
+// Group.create(this.group, Catalog.MEASURE,
this.shard,
+//
IntervalRule.create(
+//
IntervalRule.Unit.DAY, this.segmentIntervalDays),
+//
IntervalRule.create(
+//
IntervalRule.Unit.DAY, this.ttlDays)
+// ));
if (g != null) {
- log.info("group {} created", g.name());
+ log.info("group {} created",
g.getMetadata().getName());
}
}
} catch (BanyanDBException ex) {
@@ -637,7 +685,7 @@ public enum MetadataRegistry {
@Getter
private static class TagMetadata {
private final IndexRule indexRule;
- private final TagFamilySpec.TagSpec tagSpec;
+ private final TagSpec tagSpec;
boolean isIndex() {
return this.indexRule != null;
@@ -679,14 +727,29 @@ public enum MetadataRegistry {
}
return;
}
- client.define(TopNAggregation.create(getMetadata().getGroup(),
this.getTopNSpec().getName())
- .setSourceMeasureName(getMetadata().name())
- .setFieldValueSort(this.getTopNSpec().getSort())
- .setFieldName(this.getTopNSpec().getFieldName())
-
.setGroupByTagNames(this.getTopNSpec().getGroupByTagNames())
- .setCountersNumber(this.getTopNSpec().getCountersNumber())
- .setLruSize(this.getTopNSpec().getLruSize())
- .build());
+ TopNAggregation.Builder builder
+ = TopNAggregation.newBuilder()
+ .setMetadata(Metadata.newBuilder()
+
.setGroup(getMetadata().getGroup())
+
.setName(this.getTopNSpec().getName()))
+
+ .setSourceMeasure(Metadata.newBuilder()
+
.setGroup(getMetadata().getGroup())
+
.setName(getMetadata().name()))
+
.setFieldValueSort(this.getTopNSpec().getSort())
+
.setFieldName(this.getTopNSpec().getFieldName())
+
.addAllGroupByTagNames(this.getTopNSpec().getGroupByTagNames())
+
.setCountersNumber(this.getTopNSpec().getCountersNumber())
+ .setLruSize(this.getTopNSpec().getLruSize());
+ client.define(builder.build());
+// client.define(TopNAggregation.create(getMetadata().getGroup(),
this.getTopNSpec().getName())
+// .setSourceMeasureName(getMetadata().name())
+// .setFieldValueSort(this.getTopNSpec().getSort())
+// .setFieldName(this.getTopNSpec().getFieldName())
+//
.setGroupByTagNames(this.getTopNSpec().getGroupByTagNames())
+//
.setCountersNumber(this.getTopNSpec().getCountersNumber())
+// .setLruSize(this.getTopNSpec().getLruSize())
+// .build());
log.info("installed TopN schema for measure {}",
getMetadata().name());
}
}
@@ -700,7 +763,7 @@ public enum MetadataRegistry {
@Singular
private final List<String> groupByTagNames;
private final String fieldName;
- private final AbstractQuery.Sort sort;
+ private final BanyandbModel.Sort sort;
private final int lruSize;
private final int countersNumber;
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamModel.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamModel.java
new file mode 100644
index 0000000000..ce163115af
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamModel.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import java.util.List;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
+
+@RequiredArgsConstructor
+@Getter
+public class StreamModel {
+ private final Stream stream;
+ private final List<IndexRule> indexRules;
+}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBContinuousProfilingPolicyDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBContinuousProfilingPolicyDAO.java
index c37160261d..52e69a4574 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBContinuousProfilingPolicyDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBContinuousProfilingPolicyDAO.java
@@ -19,8 +19,11 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.banyandb.v1.client.metadata.Property;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
import
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingPolicy;
import
org.apache.skywalking.oap.server.core.storage.profiling.continuous.IContinuousProfilingPolicyDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
@@ -48,10 +51,15 @@ public class BanyanDBContinuousProfilingPolicyDAO extends
AbstractBanyanDBDAO im
}
public Property applyAll(ContinuousProfilingPolicy policy) {
- return Property.create(GROUP, ContinuousProfilingPolicy.INDEX_NAME,
policy.id().build())
- .addTag(TagAndValue.newStringTag(ContinuousProfilingPolicy.UUID,
policy.getUuid()))
-
.addTag(TagAndValue.newStringTag(ContinuousProfilingPolicy.CONFIGURATION_JSON,
policy.getConfigurationJson()))
- .build();
+ return Property.newBuilder()
+ .setMetadata(BanyandbProperty.Metadata.newBuilder()
+
.setId(policy.id().build())
+
.setContainer(BanyandbCommon.Metadata.newBuilder()
+
.setGroup(GROUP)
+
.setName(ContinuousProfilingPolicy.INDEX_NAME)))
+ .addTags(TagAndValue.newStringTag(ContinuousProfilingPolicy.UUID,
policy.getUuid()).build())
+
.addTags(TagAndValue.newStringTag(ContinuousProfilingPolicy.CONFIGURATION_JSON,
policy.getConfigurationJson()).build())
+ .build();
}
@Override
@@ -65,16 +73,16 @@ public class BanyanDBContinuousProfilingPolicyDAO extends
AbstractBanyanDBDAO im
}
}).filter(Objects::nonNull).map(properties -> {
final ContinuousProfilingPolicy policy = new
ContinuousProfilingPolicy();
- policy.setServiceId(properties.id());
- for (TagAndValue<?> tag : properties.tags()) {
- if
(tag.getTagName().equals(ContinuousProfilingPolicy.CONFIGURATION_JSON)) {
- policy.setConfigurationJson((String) tag.getValue());
- } else if
(tag.getTagName().equals(ContinuousProfilingPolicy.UUID)) {
- policy.setUuid((String) tag.getValue());
+ policy.setServiceId(properties.getMetadata().getId());
+ for (BanyandbModel.Tag tag : properties.getTagsList()) {
+ TagAndValue<?> tagAndValue = TagAndValue.fromProtobuf(tag);
+ if
(tagAndValue.getTagName().equals(ContinuousProfilingPolicy.CONFIGURATION_JSON))
{
+ policy.setConfigurationJson((String)
tagAndValue.getValue());
+ } else if
(tagAndValue.getTagName().equals(ContinuousProfilingPolicy.UUID)) {
+ policy.setUuid((String) tagAndValue.getValue());
}
}
return policy;
}).collect(Collectors.toList());
}
-
-}
\ No newline at end of file
+}
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 4dcf1af099..8631e0da5f 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -23,7 +23,7 @@
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=1335f15bf821a40a7cd71448fa805f0be265afcc
SW_ROVER_COMMIT=6bbd39aa701984482330d9dfb4dbaaff0527d55c
-SW_BANYANDB_COMMIT=59c396870ac2d81ec81113802d54277fe070d91b
+SW_BANYANDB_COMMIT=0e734c462571dcf55dbb7761211c07d8b156521e
SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3
SW_CTL_COMMIT=d5f3597733aa5217373986d776a3ee5ee8b3c468