This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new ddbed6d091 Adapt BanyanDB Java Client 0.7.0. (#12621)
ddbed6d091 is described below

commit ddbed6d091c4c20dbb43294fbd03778bdbb8471a
Author: Wan Kai <[email protected]>
AuthorDate: Sat Sep 14 15:05:20 2024 +0800

    Adapt BanyanDB Java Client 0.7.0. (#12621)
---
 docs/en/changes/changes.md                         |   1 +
 oap-server-bom/pom.xml                             |   2 +-
 .../plugin/banyandb/BanyanDBIndexInstaller.java    |  45 ++--
 .../plugin/banyandb/BanyanDBNoneStreamDAO.java     |   2 +-
 .../plugin/banyandb/BanyanDBStorageClient.java     |  58 +++--
 .../plugin/banyandb/BanyanDBStorageProvider.java   |  10 +-
 .../banyandb/BanyanDBUIMenuManagementDAO.java      |  27 ++-
 .../banyandb/BanyanDBUITemplateManagementDAO.java  |  55 +++--
 .../storage/plugin/banyandb/MeasureModel.java      |  32 +++
 .../storage/plugin/banyandb/MetadataRegistry.java  | 265 +++++++++++++--------
 .../storage/plugin/banyandb/StreamModel.java       |  32 +++
 .../BanyanDBContinuousProfilingPolicyDAO.java      |  34 ++-
 test/e2e-v2/script/env                             |   2 +-
 13 files changed, 391 insertions(+), 174 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 96e4410e41..0e5e806f81 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -65,6 +65,7 @@
 * Fix the previous analysis result missing in the ALS `k8s-mesh` analyzer.
 * Fix `findEndpoint` query require `keyword` when using BanyanDB.
 * Support to analysis the ztunnel mapped IP address in eBPF Access Log 
Receiver.
+* Adapt BanyanDB Java Client 0.7.0-rc3.
 
 #### UI
 
diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index f5385435d8..158f66f033 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -73,7 +73,7 @@
         <httpcore.version>4.4.13</httpcore.version>
         <httpasyncclient.version>4.1.5</httpasyncclient.version>
         <commons-compress.version>1.21</commons-compress.version>
-        <banyandb-java-client.version>0.7.0-rc2</banyandb-java-client.version>
+        <banyandb-java-client.version>0.7-rc3</banyandb-java-client.version>
         <kafka-clients.version>3.4.0</kafka-clients.version>
         <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
         <consul.client.version>1.5.3</consul.client.version>
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index b95737843b..9c98a99d2b 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -22,8 +22,9 @@ import io.grpc.Status;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
-import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
+import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
@@ -31,6 +32,7 @@ import 
org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 
 @Slf4j
 public class BanyanDBIndexInstaller extends ModelInstaller {
@@ -58,20 +60,20 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
             final boolean resourceExist = metadata.checkResourceExistence(c);
             if (!resourceExist) {
                 return false;
-            }
-
-            // then check entity schema
-            if (metadata.findRemoteSchema(c).isPresent()) {
-                // register models only locally but not remotely
+            } else {
+                // register models only locally(Schema cache) but not remotely
                 if (model.isRecord()) { // stream
                     MetadataRegistry.INSTANCE.registerStreamModel(model, 
config, configService);
                 } else { // measure
                     MetadataRegistry.INSTANCE.registerMeasureModel(model, 
config, configService);
                 }
+                // pre-load remote schema for java client
+                MetadataCache.EntityMetadata remoteMeta = 
metadata.updateRemoteSchema(c);
+                if (remoteMeta == null) {
+                    throw new IllegalStateException("inconsistent state: 
metadata:" + metadata + ", remoteMeta: null");
+                }
                 return true;
             }
-
-            throw new IllegalStateException("inconsistent state:" + metadata);
         } catch (BanyanDBException ex) {
             throw new StorageException("fail to check existence", ex);
         }
@@ -84,11 +86,17 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
                                                        .provider()
                                                        
.getService(ConfigService.class);
             if (model.isRecord()) { // stream
-                Stream stream = 
MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
+                StreamModel streamModel = 
MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
+                Stream stream = streamModel.getStream();
                 if (stream != null) {
                     log.info("install stream schema {}", model.getName());
+                    final BanyanDBClient client = ((BanyanDBStorageClient) 
this.client).client;
                     try {
-                        ((BanyanDBStorageClient) client).define(stream);
+                        if 
(CollectionUtils.isNotEmpty(streamModel.getIndexRules())) {
+                            client.define(stream, streamModel.getIndexRules());
+                        } else {
+                            client.define(stream);
+                        }
                     } catch (BanyanDBException ex) {
                         if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) 
{
                             log.info(
@@ -102,12 +110,17 @@ public class BanyanDBIndexInstaller extends 
ModelInstaller {
                     }
                 }
             } else { // measure
-                Measure measure = 
MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
+                MeasureModel measureModel = 
MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
+                Measure measure = measureModel.getMeasure();
                 if (measure != null) {
-                    log.info("install measure schema {}", measure.name());
-                    final BanyanDBClient c = ((BanyanDBStorageClient) 
this.client).client;
+                    log.info("install measure schema {}", model.getName());
+                    final BanyanDBClient client = ((BanyanDBStorageClient) 
this.client).client;
                     try {
-                        c.define(measure);
+                        if 
(CollectionUtils.isNotEmpty(measureModel.getIndexRules())) {
+                            client.define(measure, 
measureModel.getIndexRules());
+                        } else {
+                            client.define(measure);
+                        }
                     } catch (BanyanDBException ex) {
                         if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) 
{
                             log.info("Measure schema {}_{} already created by 
another OAP node",
@@ -119,7 +132,7 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
                     }
                     final MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(model);
                     try {
-                        schema.installTopNAggregation(c);
+                        schema.installTopNAggregation(client);
                     } catch (BanyanDBException ex) {
                         if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) 
{
                             log.info("Measure schema {}_{} TopN({}) already 
created by another OAP node",
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
index 1dc2c2b7d2..51ce5381ac 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
@@ -44,7 +44,7 @@ public class BanyanDBNoneStreamDAO extends 
AbstractDAO<BanyanDBStorageClient> im
         if (schema == null) {
             throw new IOException(model.getName() + " is not registered");
         }
-        StreamWrite streamWrite = getClient().client.createStreamWrite(
+        StreamWrite streamWrite = getClient().createStreamWrite(
             schema.getMetadata().getGroup(), // group name
             schema.getMetadata().name(), // stream-name
             noneStream.id().build() // identity
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 26069cdc0d..35ce37194e 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import io.grpc.Status;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
 import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
 import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
@@ -30,14 +31,15 @@ import 
org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
 import org.apache.skywalking.banyandb.v1.client.TopNQuery;
 import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
+import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
+import 
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.ApplyRequest.Strategy;
+import 
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.DeleteResponse;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.AlreadyExistsException;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.metadata.Group;
-import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
-import org.apache.skywalking.banyandb.v1.client.metadata.Property;
-import org.apache.skywalking.banyandb.v1.client.metadata.PropertyStore;
-import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
-import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
 import org.apache.skywalking.oap.server.library.client.Client;
 import 
org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
 import 
org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
@@ -103,9 +105,9 @@ public class BanyanDBStorageClient implements Client, 
HealthCheckable {
         }
     }
 
-    public PropertyStore.DeleteResult deleteProperty(String group, String 
name, String id, String... tags) throws IOException {
+    public DeleteResponse deleteProperty(String group, String name, String id, 
String... tags) throws IOException {
         try {
-            PropertyStore.DeleteResult result = 
this.client.deleteProperty(group, name, id, tags);
+            DeleteResponse result = this.client.deleteProperty(group, name, 
id, tags);
             this.healthChecker.health();
             return result;
         } catch (BanyanDBException ex) {
@@ -158,7 +160,7 @@ public class BanyanDBStorageClient implements Client, 
HealthCheckable {
     }
 
     /**
-     * PropertyStore.Strategy is default to {@link 
PropertyStore.Strategy#MERGE}
+     * PropertyStore.Strategy is default to {@link Strategy#STRATEGY_MERGE}
      */
     public void define(Property property) throws IOException {
         try {
@@ -170,7 +172,7 @@ public class BanyanDBStorageClient implements Client, 
HealthCheckable {
         }
     }
 
-    public void define(Property property, PropertyStore.Strategy strategy) 
throws IOException {
+    public void define(Property property, Strategy strategy) throws 
IOException {
         try {
             this.client.apply(property, strategy);
             this.healthChecker.health();
@@ -190,6 +192,16 @@ public class BanyanDBStorageClient implements Client, 
HealthCheckable {
         }
     }
 
+    public void define(Stream stream, List<BanyandbDatabase.IndexRule> 
indexRules) throws BanyanDBException {
+        try {
+            this.client.define(stream, indexRules);
+            this.healthChecker.health();
+        } catch (BanyanDBException ex) {
+            healthChecker.unHealth(ex);
+            throw ex;
+        }
+    }
+
     public void define(Measure measure) throws BanyanDBException {
         try {
             this.client.define(measure);
@@ -200,6 +212,16 @@ public class BanyanDBStorageClient implements Client, 
HealthCheckable {
         }
     }
 
+    public void define(Measure measure, List<BanyandbDatabase.IndexRule> 
indexRules) throws BanyanDBException {
+        try {
+            this.client.define(measure, indexRules);
+            this.healthChecker.health();
+        } catch (BanyanDBException ex) {
+            healthChecker.unHealth(ex);
+            throw ex;
+        }
+    }
+
     public void defineIfEmpty(Group group) throws IOException {
         try {
             try {
@@ -223,12 +245,20 @@ public class BanyanDBStorageClient implements Client, 
HealthCheckable {
         }
     }
 
-    public StreamWrite createStreamWrite(String group, String name, String 
elementId) {
-        return this.client.createStreamWrite(group, name, elementId);
+    public StreamWrite createStreamWrite(String group, String name, String 
elementId) throws IOException {
+        try {
+            return this.client.createStreamWrite(group, name, elementId);
+        } catch (BanyanDBException e) {
+            throw new IOException("fail to create stream write", e);
+        }
     }
 
-    public MeasureWrite createMeasureWrite(String group, String name, long 
timestamp) {
-        return this.client.createMeasureWrite(group, name, timestamp);
+    public MeasureWrite createMeasureWrite(String group, String name, long 
timestamp) throws IOException {
+        try {
+            return this.client.createMeasureWrite(group, name, timestamp);
+        } catch (BanyanDBException e) {
+            throw new IOException("fail to create measure write", e);
+        }
     }
 
     public void write(StreamWrite streamWrite) {
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index 5a6c443d7a..e7660921b3 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
-import org.apache.skywalking.banyandb.v1.client.metadata.Group;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
@@ -175,7 +175,13 @@ public class BanyanDBStorageProvider extends 
ModuleProvider {
         this.client.registerChecker(healthChecker);
         try {
             this.client.connect();
-            
this.client.defineIfEmpty(Group.create(BanyanDBUITemplateManagementDAO.GROUP));
+            this.client.defineIfEmpty(BanyandbCommon.Group.newBuilder()
+                                                          .setMetadata(
+                                                              
BanyandbCommon.Metadata.newBuilder()
+                                                                               
      .setName(
+                                                                               
          BanyanDBUITemplateManagementDAO.GROUP))
+                                                          
.setCatalog(BanyandbCommon.Catalog.CATALOG_UNSPECIFIED)
+                                                          .build());
             this.modelInstaller.start();
 
             
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(modelInstaller);
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUIMenuManagementDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUIMenuManagementDAO.java
index e6b923c904..74e7cc75dd 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUIMenuManagementDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUIMenuManagementDAO.java
@@ -19,8 +19,11 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.banyandb.v1.client.metadata.Property;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
 import org.apache.skywalking.oap.server.core.management.ui.menu.UIMenu;
 import 
org.apache.skywalking.oap.server.core.storage.management.UIMenuManagementDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
@@ -46,17 +49,27 @@ public class BanyanDBUIMenuManagementDAO extends 
AbstractBanyanDBDAO implements
 
     @Override
     public void saveMenu(UIMenu menu) throws IOException {
-        this.getClient().define(Property.create(GROUP, UIMenu.INDEX_NAME, 
menu.id().build())
-            .addTag(TagAndValue.newStringTag(UIMenu.CONFIGURATION, 
menu.getConfigurationJson()))
-            .addTag(TagAndValue.newLongTag(UIMenu.UPDATE_TIME, 
menu.getUpdateTime()))
-            .build());
+        Property property = Property.newBuilder()
+                                    
.setMetadata(BanyandbProperty.Metadata.newBuilder().setId(menu.getMenuId())
+                                                                          
.setContainer(
+                                                                              
BanyandbCommon.Metadata.newBuilder()
+                                                                               
                      .setGroup(GROUP)
+                                                                               
                      .setName(
+                                                                               
                          UIMenu.INDEX_NAME)))
+
+                                    
.addTags(TagAndValue.newStringTag(UIMenu.CONFIGURATION, 
menu.getConfigurationJson())
+                                                        .build())
+                                    
.addTags(TagAndValue.newLongTag(UIMenu.UPDATE_TIME, 
menu.getUpdateTime()).build())
+                                    .build();
+        this.getClient().define(property);
     }
 
     public UIMenu parse(Property property) {
         UIMenu menu = new UIMenu();
-        menu.setMenuId(property.id());
+        menu.setMenuId(property.getMetadata().getId());
 
-        for (TagAndValue<?> tagAndValue : property.tags()) {
+        for (BanyandbModel.Tag tag : property.getTagsList()) {
+            TagAndValue<?> tagAndValue = TagAndValue.fromProtobuf(tag);
             if (tagAndValue.getTagName().equals(UIMenu.CONFIGURATION)) {
                 menu.setConfigurationJson((String) tagAndValue.getValue());
             } else if (tagAndValue.getTagName().equals(UIMenu.UPDATE_TIME)) {
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUITemplateManagementDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUITemplateManagementDAO.java
index f6f6414828..9bd4ef26bf 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUITemplateManagementDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBUITemplateManagementDAO.java
@@ -19,8 +19,11 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.banyandb.v1.client.metadata.Property;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
 import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
 import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
 import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
@@ -64,7 +67,7 @@ public class BanyanDBUITemplateManagementDAO extends 
AbstractBanyanDBDAO impleme
             this.getClient().define(newTemplate);
             return TemplateChangeStatus.builder()
                     .status(true)
-                    .id(newTemplate.id())
+                    .id(newTemplate.getMetadata().getId())
                     .build();
         } catch (IOException ioEx) {
             log.error("fail to add new template", ioEx);
@@ -80,7 +83,7 @@ public class BanyanDBUITemplateManagementDAO extends 
AbstractBanyanDBDAO impleme
             this.getClient().define(newTemplate);
             return TemplateChangeStatus.builder()
                     .status(true)
-                    .id(newTemplate.id())
+                    .id(newTemplate.getMetadata().getId())
                     .build();
         } catch (IOException ioEx) {
             log.error("fail to modify the template", ioEx);
@@ -118,9 +121,10 @@ public class BanyanDBUITemplateManagementDAO extends 
AbstractBanyanDBDAO impleme
 
     public UITemplate parse(Property property) {
         UITemplate uiTemplate = new UITemplate();
-        uiTemplate.setTemplateId(property.id());
+        uiTemplate.setTemplateId(property.getMetadata().getId());
 
-        for (TagAndValue<?> tagAndValue : property.tags()) {
+        for (BanyandbModel.Tag tag : property.getTagsList()) {
+            TagAndValue<?> tagAndValue = TagAndValue.fromProtobuf(tag);
             if (tagAndValue.getTagName().equals(UITemplate.CONFIGURATION)) {
                 uiTemplate.setConfiguration((String) tagAndValue.getValue());
             } else if (tagAndValue.getTagName().equals(UITemplate.DISABLED)) {
@@ -133,11 +137,16 @@ public class BanyanDBUITemplateManagementDAO extends 
AbstractBanyanDBDAO impleme
     }
 
     public Property applyAll(UITemplate uiTemplate) {
-        return Property.create(GROUP, UITemplate.INDEX_NAME, 
uiTemplate.id().build())
-                .addTag(TagAndValue.newStringTag(UITemplate.CONFIGURATION, 
uiTemplate.getConfiguration()))
-                .addTag(TagAndValue.newLongTag(UITemplate.DISABLED, 
uiTemplate.getDisabled()))
-                .addTag(TagAndValue.newLongTag(UITemplate.UPDATE_TIME, 
uiTemplate.getUpdateTime()))
-                .build();
+        return Property.newBuilder()
+                .setMetadata(BanyandbProperty.Metadata.newBuilder()
+                        .setId(uiTemplate.id().build())
+                        .setContainer(BanyandbCommon.Metadata.newBuilder()
+                                .setGroup(GROUP)
+                                .setName(UITemplate.INDEX_NAME)))
+            .addTags(TagAndValue.newStringTag(UITemplate.CONFIGURATION, 
uiTemplate.getConfiguration()).build())
+            .addTags(TagAndValue.newLongTag(UITemplate.DISABLED, 
uiTemplate.getDisabled()).build())
+            .addTags(TagAndValue.newLongTag(UITemplate.UPDATE_TIME, 
uiTemplate.getUpdateTime()).build())
+            .build();
     }
 
     /**
@@ -147,10 +156,15 @@ public class BanyanDBUITemplateManagementDAO extends 
AbstractBanyanDBDAO impleme
      * @return new property (patch) to be applied
      */
     public Property applyStatus(UITemplate uiTemplate) {
-        return Property.create(GROUP, UITemplate.INDEX_NAME, 
uiTemplate.id().build())
-                .addTag(TagAndValue.newLongTag(UITemplate.DISABLED, 
uiTemplate.getDisabled()))
-                .addTag(TagAndValue.newLongTag(UITemplate.UPDATE_TIME, 
uiTemplate.getUpdateTime()))
-                .build();
+        return Property.newBuilder()
+                .setMetadata(BanyandbProperty.Metadata.newBuilder()
+                        .setId(uiTemplate.id().build())
+                        .setContainer(BanyandbCommon.Metadata.newBuilder()
+                                .setGroup(GROUP)
+                                .setName(UITemplate.INDEX_NAME)))
+                .addTags(TagAndValue.newLongTag(UITemplate.DISABLED, 
uiTemplate.getDisabled()).build())
+                .addTags(TagAndValue.newLongTag(UITemplate.UPDATE_TIME, 
uiTemplate.getUpdateTime()).build())
+            .build();
     }
 
     /**
@@ -160,9 +174,14 @@ public class BanyanDBUITemplateManagementDAO extends 
AbstractBanyanDBDAO impleme
      * @return new property (patch) to be applied
      */
     public Property applyConfiguration(UITemplate uiTemplate) {
-        return Property.create(GROUP, UITemplate.INDEX_NAME, 
uiTemplate.id().build())
-                .addTag(TagAndValue.newStringTag(UITemplate.CONFIGURATION, 
uiTemplate.getConfiguration()))
-                .addTag(TagAndValue.newLongTag(UITemplate.UPDATE_TIME, 
uiTemplate.getUpdateTime()))
-                .build();
+        return Property.newBuilder()
+                .setMetadata(BanyandbProperty.Metadata.newBuilder()
+                        .setId(uiTemplate.id().build())
+                        .setContainer(BanyandbCommon.Metadata.newBuilder()
+                                .setGroup(GROUP)
+                                .setName(UITemplate.INDEX_NAME)))
+                .addTags(TagAndValue.newStringTag(UITemplate.CONFIGURATION, 
uiTemplate.getConfiguration()).build())
+                .addTags(TagAndValue.newLongTag(UITemplate.UPDATE_TIME, 
uiTemplate.getUpdateTime()).build())
+            .build();
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MeasureModel.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MeasureModel.java
new file mode 100644
index 0000000000..09f3904839
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MeasureModel.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import java.util.List;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
+
+@RequiredArgsConstructor
+@Getter
+public class MeasureModel {
+    private final Measure measure;
+    private final List<IndexRule> indexRules;
+}
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 86795cf496..bdf53a6761 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -48,20 +48,30 @@ import lombok.Setter;
 import lombok.Singular;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Metadata;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.IntervalRule;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Catalog;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.ResourceOpts;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
+import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagFamilySpec;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagSpec;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType;
+import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.FieldSpec;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.FieldType;
+import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.CompressionMethod;
+import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.EncodingMethod;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
 import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
 import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
-import org.apache.skywalking.banyandb.v1.client.metadata.Group;
-import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
-import org.apache.skywalking.banyandb.v1.client.metadata.IntervalRule;
-import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
-import org.apache.skywalking.banyandb.v1.client.metadata.NamedSchema;
+import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
 import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
-import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
-import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
-import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -86,7 +96,7 @@ public enum MetadataRegistry {
 
     private Map<String, GroupSetting> specificGroupSettings = new HashMap<>();
 
-    public Stream registerStreamModel(Model model, BanyanDBStorageConfig 
config, ConfigService configService) {
+    public StreamModel registerStreamModel(Model model, BanyanDBStorageConfig 
config, ConfigService configService) {
         final SchemaMetadata schemaMetadata = parseMetadata(model, config, 
configService);
         Schema.SchemaBuilder schemaBuilder = 
Schema.builder().metadata(schemaMetadata);
         Map<String, ModelColumn> modelColumnMap = model.getColumns().stream()
@@ -100,12 +110,12 @@ public enum MetadataRegistry {
         // this can be used to build both
         // 1) a list of TagFamilySpec,
         // 2) a list of IndexRule,
-        List<TagMetadata> tags = parseTagMetadata(model, schemaBuilder, 
shardingColumns);
+        List<TagMetadata> tags = parseTagMetadata(model, schemaBuilder, 
shardingColumns, schemaMetadata.group);
         List<TagFamilySpec> tagFamilySpecs = 
schemaMetadata.extractTagFamilySpec(tags, false);
         // iterate over tagFamilySpecs to save tag names
         for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
-            for (final TagFamilySpec.TagSpec tagSpec : 
tagFamilySpec.tagSpecs()) {
-                schemaBuilder.tag(tagSpec.getTagName());
+            for (final TagSpec tagSpec : tagFamilySpec.getTagsList()) {
+                schemaBuilder.tag(tagSpec.getName());
             }
         }
         String timestampColumn4Stream = 
model.getBanyanDBModelExtension().getTimestampColumn();
@@ -119,15 +129,18 @@ public enum MetadataRegistry {
                 .filter(Objects::nonNull)
                 .collect(Collectors.toList());
 
-        final Stream.Builder builder = 
Stream.create(schemaMetadata.getGroup(), schemaMetadata.name());
-        builder.setEntityRelativeTags(shardingColumns);
-        builder.addTagFamilies(tagFamilySpecs);
-        builder.addIndexes(indexRules);
+        final Stream.Builder builder = Stream.newBuilder();
+        
builder.setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(schemaMetadata.getGroup())
+                                .setName(schemaMetadata.name()));
+        
builder.setEntity(BanyandbDatabase.Entity.newBuilder().addAllTagNames(shardingColumns));
+        builder.addAllTagFamilies(tagFamilySpecs);
+
+        //builder.addIndexes(indexRules);
         registry.put(schemaMetadata.name(), schemaBuilder.build());
-        return builder.build();
+        return new StreamModel(builder.build(), indexRules);
     }
 
-    public Measure registerMeasureModel(Model model, BanyanDBStorageConfig 
config, ConfigService configService) throws StorageException {
+    public MeasureModel registerMeasureModel(Model model, 
BanyanDBStorageConfig config, ConfigService configService) throws 
StorageException {
         final SchemaMetadata schemaMetadata = parseMetadata(model, config, 
configService);
         Schema.SchemaBuilder schemaBuilder = 
Schema.builder().metadata(schemaMetadata);
         Map<String, ModelColumn> modelColumnMap = model.getColumns().stream()
@@ -141,12 +154,12 @@ public enum MetadataRegistry {
         // this can be used to build both
         // 1) a list of TagFamilySpec,
         // 2) a list of IndexRule,
-        MeasureMetadata tagsAndFields = parseTagAndFieldMetadata(model, 
schemaBuilder, shardingColumns);
+        MeasureMetadata tagsAndFields = parseTagAndFieldMetadata(model, 
schemaBuilder, shardingColumns, schemaMetadata.group);
         List<TagFamilySpec> tagFamilySpecs = 
schemaMetadata.extractTagFamilySpec(tagsAndFields.tags, 
model.getBanyanDBModelExtension().isStoreIDTag());
         // iterate over tagFamilySpecs to save tag names
         for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
-            for (final TagFamilySpec.TagSpec tagSpec : 
tagFamilySpec.tagSpecs()) {
-                schemaBuilder.tag(tagSpec.getTagName());
+            for (final TagSpec tagSpec : tagFamilySpec.getTagsList()) {
+                schemaBuilder.tag(tagSpec.getName());
             }
         }
         List<IndexRule> indexRules = tagsAndFields.tags.stream()
@@ -155,26 +168,29 @@ public enum MetadataRegistry {
                 .collect(Collectors.toList());
 
         if (model.getBanyanDBModelExtension().isStoreIDTag()) {
-            indexRules.add(IndexRule.create(BanyanDBConverter.ID, 
IndexRule.IndexType.INVERTED));
-        }
-
-        final Measure.Builder builder = 
Measure.create(schemaMetadata.getGroup(), schemaMetadata.name(),
-                downSamplingDuration(model.getDownsampling()));
-        builder.setEntityRelativeTags(shardingColumns);
-        builder.addTagFamilies(tagFamilySpecs);
-        if (!indexRules.isEmpty()) {
-            builder.addIndexes(indexRules);
-        }
+            indexRules.add(indexRule(schemaMetadata.group, 
BanyanDBConverter.ID));
+           // indexRules.add(IndexRule.create(BanyanDBConverter.ID, 
IndexRule.IndexType.INVERTED));
+        }
+
+        final Measure.Builder builder = Measure.newBuilder();
+        
builder.setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(schemaMetadata.getGroup())
+                                .setName(schemaMetadata.name()));
+        
builder.setInterval(downSamplingDuration(model.getDownsampling()).format());
+        
builder.setEntity(BanyandbDatabase.Entity.newBuilder().addAllTagNames(shardingColumns));
+        builder.addAllTagFamilies(tagFamilySpecs);
+//        if (!indexRules.isEmpty()) {
+//            builder.addIndexes(indexRules);
+//        }
         // parse and set field
-        for (Measure.FieldSpec field : tagsAndFields.fields) {
-            builder.addField(field);
+        for (BanyandbDatabase.FieldSpec field : tagsAndFields.fields) {
+            builder.addFields(field);
             schemaBuilder.field(field.getName());
         }
         // parse TopN
         schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.name()));
 
         registry.put(schemaMetadata.name(), schemaBuilder.build());
-        return builder.build();
+        return new MeasureModel(builder.build(), indexRules);
     }
 
     private TopNSpec parseTopNSpec(final Model model, final String measureName)
@@ -198,7 +214,7 @@ public enum MetadataRegistry {
                 
.countersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber())
                 .fieldName(valueColumnOpt.get().getValueCName())
                 
.groupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())
-                .sort(AbstractQuery.Sort.UNSPECIFIED) // include both TopN and 
BottomN
+                .sort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both 
TopN and BottomN
                 .build();
     }
 
@@ -245,27 +261,31 @@ public enum MetadataRegistry {
         return this.registry.get(SchemaMetadata.formatName(modelName, 
downSampling));
     }
 
-    private Measure.FieldSpec parseFieldSpec(ModelColumn modelColumn) {
+    private FieldSpec parseFieldSpec(ModelColumn modelColumn) {
         String colName = modelColumn.getColumnName().getStorageName();
         if (String.class.equals(modelColumn.getType())) {
-            return Measure.FieldSpec.newIntField(colName)
-                    .compressWithZSTD()
-                    .build();
+            return FieldSpec.newBuilder().setName(colName)
+                            .setFieldType(FieldType.FIELD_TYPE_STRING)
+                            
.setCompressionMethod(CompressionMethod.COMPRESSION_METHOD_ZSTD)
+                            .build();
         } else if (long.class.equals(modelColumn.getType()) || 
int.class.equals(modelColumn.getType())) {
-            return Measure.FieldSpec.newIntField(colName)
-                    .compressWithZSTD()
-                    .encodeWithGorilla()
-                    .build();
+            return FieldSpec.newBuilder().setName(colName)
+                            .setFieldType(FieldType.FIELD_TYPE_INT)
+                            
.setCompressionMethod(CompressionMethod.COMPRESSION_METHOD_ZSTD)
+                            
.setEncodingMethod(EncodingMethod.ENCODING_METHOD_GORILLA)
+                            .build();
         } else if 
(StorageDataComplexObject.class.isAssignableFrom(modelColumn.getType()) || 
JsonObject.class.equals(modelColumn.getType())) {
-            return Measure.FieldSpec.newStringField(colName)
-                    .compressWithZSTD()
-                    .build();
+            return FieldSpec.newBuilder().setName(colName)
+                            .setFieldType(FieldType.FIELD_TYPE_STRING)
+                            
.setCompressionMethod(CompressionMethod.COMPRESSION_METHOD_ZSTD)
+                            .build();
         } else if (double.class.equals(modelColumn.getType())) {
             // TODO: natively support double/float in BanyanDB
             log.warn("Double is stored as binary");
-            return Measure.FieldSpec.newBinaryField(colName)
-                    .compressWithZSTD()
-                    .build();
+            return FieldSpec.newBuilder().setName(colName)
+                            .setFieldType(FieldType.FIELD_TYPE_DATA_BINARY)
+                            
.setCompressionMethod(CompressionMethod.COMPRESSION_METHOD_ZSTD)
+                            .build();
         } else {
             throw new 
UnsupportedOperationException(modelColumn.getType().getSimpleName() + " is not 
supported for field");
         }
@@ -284,8 +304,11 @@ public enum MetadataRegistry {
         }
     }
 
-    IndexRule indexRule(String tagName) {
-        return IndexRule.create(tagName, IndexRule.IndexType.INVERTED);
+    IndexRule indexRule(String group, String tagName) {
+        return IndexRule.newBuilder()
+                        
.setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group))
+                        
.setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName).build();
+        //return IndexRule.create(tagName, IndexRule.IndexType.INVERTED);
     }
 
     /**
@@ -314,18 +337,18 @@ public enum MetadataRegistry {
      *
      * @since 9.4.0 Skip {@link Record#TIME_BUCKET}
      */
-    List<TagMetadata> parseTagMetadata(Model model, Schema.SchemaBuilder 
builder, List<String> shardingColumns) {
+    List<TagMetadata> parseTagMetadata(Model model, Schema.SchemaBuilder 
builder, List<String> shardingColumns, String group) {
         List<TagMetadata> tagMetadataList = new ArrayList<>();
         for (final ModelColumn col : model.getColumns()) {
             final String columnStorageName = 
col.getColumnName().getStorageName();
             if (columnStorageName.equals(Record.TIME_BUCKET)) {
                 continue;
             }
-            final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col);
+            final TagSpec tagSpec = parseTagSpec(col);
             builder.spec(columnStorageName, new ColumnSpec(ColumnType.TAG, 
col.getType()));
             String colName = col.getColumnName().getStorageName();
             if (!shardingColumns.contains(colName) && 
col.getBanyanDBExtension().shouldIndex()) {
-                tagMetadataList.add(new 
TagMetadata(indexRule(tagSpec.getTagName()), tagSpec));
+                tagMetadataList.add(new TagMetadata(indexRule(group, 
tagSpec.getName()), tagSpec));
             } else {
                 tagMetadataList.add(new TagMetadata(null, tagSpec));
             }
@@ -339,7 +362,7 @@ public enum MetadataRegistry {
         @Singular
         private final List<TagMetadata> tags;
         @Singular
-        private final List<Measure.FieldSpec> fields;
+        private final List<BanyandbDatabase.FieldSpec> fields;
     }
 
     /**
@@ -349,7 +372,7 @@ public enum MetadataRegistry {
      *
      * @since 9.4.0 Skip {@link Metrics#TIME_BUCKET}
      */
-    MeasureMetadata parseTagAndFieldMetadata(Model model, Schema.SchemaBuilder 
builder, List<String> shardingColumns) {
+    MeasureMetadata parseTagAndFieldMetadata(Model model, Schema.SchemaBuilder 
builder, List<String> shardingColumns, String group) {
         // skip metric
         MeasureMetadata.MeasureMetadataBuilder result = 
MeasureMetadata.builder();
         for (final ModelColumn col : model.getColumns()) {
@@ -362,10 +385,10 @@ public enum MetadataRegistry {
                 result.field(parseFieldSpec(col));
                 continue;
             }
-            final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col);
+            final TagSpec tagSpec = parseTagSpec(col);
             builder.spec(columnStorageName, new ColumnSpec(ColumnType.TAG, 
col.getType()));
             String colName = col.getColumnName().getStorageName();
-            result.tag(new TagMetadata(!shardingColumns.contains(colName) && 
col.getBanyanDBExtension().shouldIndex() ? indexRule(tagSpec.getTagName()) : 
null, tagSpec));
+            result.tag(new TagMetadata(!shardingColumns.contains(colName) && 
col.getBanyanDBExtension().shouldIndex() ? indexRule(group, tagSpec.getName()) 
: null, tagSpec));
         }
 
         return result.build();
@@ -378,36 +401,36 @@ public enum MetadataRegistry {
      * @return a typed tag spec
      */
     @Nonnull
-    private TagFamilySpec.TagSpec parseTagSpec(ModelColumn modelColumn) {
+    private TagSpec parseTagSpec(ModelColumn modelColumn) {
         final Class<?> clazz = modelColumn.getType();
         final String colName = modelColumn.getColumnName().getStorageName();
-        TagFamilySpec.TagSpec tagSpec = null;
+        TagSpec.Builder tagSpec = TagSpec.newBuilder().setName(colName);
         if (String.class.equals(clazz) || 
StorageDataComplexObject.class.isAssignableFrom(clazz) || 
JsonObject.class.equals(clazz)) {
-            tagSpec = TagFamilySpec.TagSpec.newStringTag(colName);
+            tagSpec = tagSpec.setType(TagType.TAG_TYPE_STRING);
         } else if (int.class.equals(clazz) || long.class.equals(clazz)) {
-            tagSpec = TagFamilySpec.TagSpec.newIntTag(colName);
+            tagSpec = tagSpec.setType(TagType.TAG_TYPE_INT);
         } else if (byte[].class.equals(clazz)) {
-            tagSpec = TagFamilySpec.TagSpec.newBinaryTag(colName);
+            tagSpec = tagSpec.setType(TagType.TAG_TYPE_DATA_BINARY);
         } else if (clazz.isEnum()) {
-            tagSpec = TagFamilySpec.TagSpec.newIntTag(colName);
+            tagSpec = tagSpec.setType(TagType.TAG_TYPE_INT);
         } else if (double.class.equals(clazz) || Double.class.equals(clazz)) {
             // serialize double as binary
-            tagSpec = TagFamilySpec.TagSpec.newBinaryTag(colName);
+            tagSpec = tagSpec.setType(TagType.TAG_TYPE_DATA_BINARY);
         } else if (IntList.class.isAssignableFrom(clazz)) {
-            tagSpec = TagFamilySpec.TagSpec.newIntArrayTag(colName);
+            tagSpec = tagSpec.setType(TagType.TAG_TYPE_INT_ARRAY);
         } else if (List.class.isAssignableFrom(clazz)) { // handle exceptions
             ParameterizedType t = (ParameterizedType) 
modelColumn.getGenericType();
             if (String.class.equals(t.getActualTypeArguments()[0])) {
-                tagSpec = TagFamilySpec.TagSpec.newStringArrayTag(colName);
+                tagSpec = tagSpec.setType(TagType.TAG_TYPE_STRING_ARRAY);
             }
-        }
-        if (tagSpec == null) {
+        } else {
             throw new IllegalStateException("type " + 
modelColumn.getType().toString() + " is not supported");
         }
+
         if (modelColumn.isIndexOnly()) {
-            tagSpec.indexedOnly();
+            tagSpec.setIndexedOnly(true);
         }
-        return tagSpec;
+        return tagSpec.build();
     }
 
     public void initializeIntervals(String specificGroupSettingsStr) {
@@ -507,7 +530,7 @@ public enum MetadataRegistry {
             return modelName + "_" + downSampling.getName();
         }
 
-        public Optional<NamedSchema<?>> findRemoteSchema(BanyanDBClient 
client) throws BanyanDBException {
+        public Optional<Object> findRemoteSchema(BanyanDBClient client) throws 
BanyanDBException {
             try {
                 switch (kind) {
                     case STREAM:
@@ -526,6 +549,17 @@ public enum MetadataRegistry {
             }
         }
 
+        public MetadataCache.EntityMetadata updateRemoteSchema(BanyanDBClient 
client) throws BanyanDBException {
+            switch (kind) {
+                case STREAM:
+                    return 
client.updateStreamMetadataCacheFromSever(this.group, this.name());
+                case MEASURE:
+                    return 
client.updateMeasureMetadataCacheFromSever(this.group, this.name());
+                default:
+                    throw new IllegalStateException("should not reach here");
+            }
+        }
+
         private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> 
tagMetadataList, boolean shouldAddID) {
             final String indexFamily = SchemaMetadata.this.indexFamily();
             final String nonIndexFamily = SchemaMetadata.this.nonIndexFamily();
@@ -534,10 +568,11 @@ public enum MetadataRegistry {
 
             final List<TagFamilySpec> tagFamilySpecs = new 
ArrayList<>(tagMetadataMap.size());
             for (final Map.Entry<String, List<TagMetadata>> entry : 
tagMetadataMap.entrySet()) {
-                final TagFamilySpec.Builder b = 
TagFamilySpec.create(entry.getKey())
-                        
.addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList()));
+                final TagFamilySpec.Builder b = TagFamilySpec.newBuilder();
+                b.setName(entry.getKey());
+                
b.addAllTags(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList()));
                 if (shouldAddID && indexFamily.equals(entry.getKey())) {
-                    
b.addTagSpec(TagFamilySpec.TagSpec.newStringTag(BanyanDBConverter.ID));
+                    
b.addTags(TagSpec.newBuilder().setType(TagType.TAG_TYPE_STRING).setName(BanyanDBConverter.ID));
                 }
                 tagFamilySpecs.add(b.build());
             }
@@ -547,19 +582,31 @@ public enum MetadataRegistry {
 
         public boolean checkResourceExistence(BanyanDBClient client) throws 
BanyanDBException {
             ResourceExist resourceExist;
+            Group.Builder gBuilder
+                = Group.newBuilder()
+                       .setMetadata(Metadata.newBuilder().setName(this.group))
+                       .setResourceOpts(ResourceOpts.newBuilder()
+                                                    .setShardNum(this.shard)
+                                                    .setSegmentInterval(
+                                                        
IntervalRule.newBuilder()
+                                                                    .setUnit(
+                                                                        
IntervalRule.Unit.UNIT_DAY)
+                                                                    .setNum(
+                                                                        
this.segmentIntervalDays))
+                                                    .setTtl(
+                                                        
IntervalRule.newBuilder()
+                                                                    .setUnit(
+                                                                        
IntervalRule.Unit.UNIT_DAY)
+                                                                    .setNum(
+                                                                        
this.ttlDays)));
             switch (kind) {
                 case STREAM:
                     resourceExist = client.existStream(this.group, 
this.name());
                     if (!resourceExist.hasGroup()) {
                         try {
-                            Group g = client.define(Group.create(this.group, 
Catalog.STREAM, this.shard,
-                                                                 
IntervalRule.create(
-                                                                     
IntervalRule.Unit.DAY, this.segmentIntervalDays),
-                                                                 
IntervalRule.create(
-                                                                     
IntervalRule.Unit.DAY, this.ttlDays)
-                            ));
+                            Group g = 
client.define(gBuilder.setCatalog(Catalog.CATALOG_STREAM).build());
                             if (g != null) {
-                                log.info("group {} created", g.name());
+                                log.info("group {} created", 
g.getMetadata().getName());
                             }
                         } catch (BanyanDBException ex) {
                             if 
(ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
@@ -574,14 +621,15 @@ public enum MetadataRegistry {
                     resourceExist = client.existMeasure(this.group, 
this.name());
                     try {
                         if (!resourceExist.hasGroup()) {
-                            Group g = client.define(Group.create(this.group, 
Catalog.MEASURE, this.shard,
-                                                                 
IntervalRule.create(
-                                                                     
IntervalRule.Unit.DAY, this.segmentIntervalDays),
-                                                                 
IntervalRule.create(
-                                                                     
IntervalRule.Unit.DAY, this.ttlDays)
-                            ));
+                            Group g = 
client.define(gBuilder.setCatalog(Catalog.CATALOG_MEASURE).build());
+//                                Group.create(this.group, Catalog.MEASURE, 
this.shard,
+//                                                                 
IntervalRule.create(
+//                                                                     
IntervalRule.Unit.DAY, this.segmentIntervalDays),
+//                                                                 
IntervalRule.create(
+//                                                                     
IntervalRule.Unit.DAY, this.ttlDays)
+//                            ));
                             if (g != null) {
-                                log.info("group {} created", g.name());
+                                log.info("group {} created", 
g.getMetadata().getName());
                             }
                         }
                     } catch (BanyanDBException ex) {
@@ -637,7 +685,7 @@ public enum MetadataRegistry {
     @Getter
     private static class TagMetadata {
         private final IndexRule indexRule;
-        private final TagFamilySpec.TagSpec tagSpec;
+        private final TagSpec tagSpec;
 
         boolean isIndex() {
             return this.indexRule != null;
@@ -679,14 +727,29 @@ public enum MetadataRegistry {
                 }
                 return;
             }
-            client.define(TopNAggregation.create(getMetadata().getGroup(), 
this.getTopNSpec().getName())
-                    .setSourceMeasureName(getMetadata().name())
-                    .setFieldValueSort(this.getTopNSpec().getSort())
-                    .setFieldName(this.getTopNSpec().getFieldName())
-                    
.setGroupByTagNames(this.getTopNSpec().getGroupByTagNames())
-                    .setCountersNumber(this.getTopNSpec().getCountersNumber())
-                    .setLruSize(this.getTopNSpec().getLruSize())
-                    .build());
+            TopNAggregation.Builder builder
+                = TopNAggregation.newBuilder()
+                                 .setMetadata(Metadata.newBuilder()
+                                                      
.setGroup(getMetadata().getGroup())
+                                                      
.setName(this.getTopNSpec().getName()))
+
+                                 .setSourceMeasure(Metadata.newBuilder()
+                                                           
.setGroup(getMetadata().getGroup())
+                                                           
.setName(getMetadata().name()))
+                                 
.setFieldValueSort(this.getTopNSpec().getSort())
+                                 
.setFieldName(this.getTopNSpec().getFieldName())
+                                 
.addAllGroupByTagNames(this.getTopNSpec().getGroupByTagNames())
+                                 
.setCountersNumber(this.getTopNSpec().getCountersNumber())
+                                 .setLruSize(this.getTopNSpec().getLruSize());
+            client.define(builder.build());
+//            client.define(TopNAggregation.create(getMetadata().getGroup(), 
this.getTopNSpec().getName())
+//                    .setSourceMeasureName(getMetadata().name())
+//                    .setFieldValueSort(this.getTopNSpec().getSort())
+//                    .setFieldName(this.getTopNSpec().getFieldName())
+//                    
.setGroupByTagNames(this.getTopNSpec().getGroupByTagNames())
+//                    
.setCountersNumber(this.getTopNSpec().getCountersNumber())
+//                    .setLruSize(this.getTopNSpec().getLruSize())
+//                    .build());
             log.info("installed TopN schema for measure {}", 
getMetadata().name());
         }
     }
@@ -700,7 +763,7 @@ public enum MetadataRegistry {
         @Singular
         private final List<String> groupByTagNames;
         private final String fieldName;
-        private final AbstractQuery.Sort sort;
+        private final BanyandbModel.Sort sort;
         private final int lruSize;
         private final int countersNumber;
     }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamModel.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamModel.java
new file mode 100644
index 0000000000..ce163115af
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamModel.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import java.util.List;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
+
+@RequiredArgsConstructor
+@Getter
+public class StreamModel {
+    private final Stream stream;
+    private final List<IndexRule> indexRules;
+}
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBContinuousProfilingPolicyDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBContinuousProfilingPolicyDAO.java
index c37160261d..52e69a4574 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBContinuousProfilingPolicyDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBContinuousProfilingPolicyDAO.java
@@ -19,8 +19,11 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.banyandb.v1.client.metadata.Property;
+import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
 import 
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingPolicy;
 import 
org.apache.skywalking.oap.server.core.storage.profiling.continuous.IContinuousProfilingPolicyDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
@@ -48,10 +51,15 @@ public class BanyanDBContinuousProfilingPolicyDAO extends 
AbstractBanyanDBDAO im
     }
 
     public Property applyAll(ContinuousProfilingPolicy policy) {
-        return Property.create(GROUP, ContinuousProfilingPolicy.INDEX_NAME, 
policy.id().build())
-            .addTag(TagAndValue.newStringTag(ContinuousProfilingPolicy.UUID, 
policy.getUuid()))
-            
.addTag(TagAndValue.newStringTag(ContinuousProfilingPolicy.CONFIGURATION_JSON, 
policy.getConfigurationJson()))
-            .build();
+        return Property.newBuilder()
+                       .setMetadata(BanyandbProperty.Metadata.newBuilder()
+                                                             
.setId(policy.id().build())
+                                                             
.setContainer(BanyandbCommon.Metadata.newBuilder()
+                                                                               
.setGroup(GROUP)
+                                                                               
.setName(ContinuousProfilingPolicy.INDEX_NAME)))
+            .addTags(TagAndValue.newStringTag(ContinuousProfilingPolicy.UUID, 
policy.getUuid()).build())
+            
.addTags(TagAndValue.newStringTag(ContinuousProfilingPolicy.CONFIGURATION_JSON, 
policy.getConfigurationJson()).build())
+                       .build();
     }
 
     @Override
@@ -65,16 +73,16 @@ public class BanyanDBContinuousProfilingPolicyDAO extends 
AbstractBanyanDBDAO im
             }
         }).filter(Objects::nonNull).map(properties -> {
             final ContinuousProfilingPolicy policy = new 
ContinuousProfilingPolicy();
-            policy.setServiceId(properties.id());
-            for (TagAndValue<?> tag : properties.tags()) {
-                if 
(tag.getTagName().equals(ContinuousProfilingPolicy.CONFIGURATION_JSON)) {
-                    policy.setConfigurationJson((String) tag.getValue());
-                } else if 
(tag.getTagName().equals(ContinuousProfilingPolicy.UUID)) {
-                    policy.setUuid((String) tag.getValue());
+            policy.setServiceId(properties.getMetadata().getId());
+            for (BanyandbModel.Tag tag : properties.getTagsList()) {
+                TagAndValue<?> tagAndValue = TagAndValue.fromProtobuf(tag);
+                if 
(tagAndValue.getTagName().equals(ContinuousProfilingPolicy.CONFIGURATION_JSON)) 
{
+                    policy.setConfigurationJson((String) 
tagAndValue.getValue());
+                } else if 
(tagAndValue.getTagName().equals(ContinuousProfilingPolicy.UUID)) {
+                    policy.setUuid((String) tagAndValue.getValue());
                 }
             }
             return policy;
         }).collect(Collectors.toList());
     }
-
-}
\ No newline at end of file
+}
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 4dcf1af099..8631e0da5f 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -23,7 +23,7 @@ 
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
 SW_KUBERNETES_COMMIT_SHA=1335f15bf821a40a7cd71448fa805f0be265afcc
 SW_ROVER_COMMIT=6bbd39aa701984482330d9dfb4dbaaff0527d55c
-SW_BANYANDB_COMMIT=59c396870ac2d81ec81113802d54277fe070d91b
+SW_BANYANDB_COMMIT=0e734c462571dcf55dbb7761211c07d8b156521e
 SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3
 
 SW_CTL_COMMIT=d5f3597733aa5217373986d776a3ee5ee8b3c468

Reply via email to