This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch feature/update-banyandb-0.4
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 2df823f8e7a2905dd9894e915cc2b17b1942c22f
Author: Megrez Lu <[email protected]>
AuthorDate: Tue Jul 4 23:35:37 2023 +0800

    use banyandb java client 0.4.0
---
 oap-server-bom/pom.xml                                       |  2 +-
 .../storage/plugin/banyandb/BanyanDBNoneStreamDAO.java       |  2 +-
 .../storage/plugin/banyandb/BanyanDBStorageClient.java       | 11 ++++++++++-
 .../storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java  |  4 ++--
 .../storage/plugin/banyandb/stream/BanyanDBRecordDAO.java    | 12 ++++++++----
 .../storage/plugin/banyandb/stream/BanyanDBStorageDAO.java   |  2 +-
 test/e2e-v2/script/env                                       |  2 +-
 7 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index a41ad75d64..f25bb37a39 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -72,7 +72,7 @@
         <awaitility.version>3.0.0</awaitility.version>
         <httpcore.version>4.4.13</httpcore.version>
         <commons-compress.version>1.21</commons-compress.version>
-        <banyandb-java-client.version>0.4.0-rc0</banyandb-java-client.version>
+        <banyandb-java-client.version>0.4.0</banyandb-java-client.version>
         <kafka-clients.version>2.8.1</kafka-clients.version>
         <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
         <consul.client.version>1.5.3</consul.client.version>
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
index 25f698a8fe..1dc2c2b7d2 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
@@ -44,7 +44,7 @@ public class BanyanDBNoneStreamDAO extends 
AbstractDAO<BanyanDBStorageClient> im
         if (schema == null) {
             throw new IOException(model.getName() + " is not registered");
         }
-        StreamWrite streamWrite = new StreamWrite(
+        StreamWrite streamWrite = getClient().client.createStreamWrite(
             schema.getMetadata().getGroup(), // group name
             schema.getMetadata().name(), // stream-name
             noneStream.id().build() // identity
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index bb242a19ee..136786865d 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -23,13 +23,14 @@ import 
org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
 import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
 import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
 import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.AlreadyExistsException;
 import org.apache.skywalking.banyandb.v1.client.TopNQuery;
 import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
+import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.AlreadyExistsException;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Group;
 import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
@@ -197,6 +198,14 @@ public class BanyanDBStorageClient implements Client, 
HealthCheckable {
         }
     }
 
+    public StreamWrite createStreamWrite(String group, String name, String 
elementId) {
+        return this.client.createStreamWrite(group, name, elementId);
+    }
+
+    public MeasureWrite createMeasureWrite(String group, String name, long 
timestamp) {
+        return this.client.createMeasureWrite(group, name, timestamp);
+    }
+
     public void write(StreamWrite streamWrite) {
         this.client.write(streamWrite);
     }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index d1350cfa64..e971dd7032 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -153,7 +153,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO 
implements IMetricsD
         if (schema == null) {
             throw new IOException(model.getName() + " is not registered");
         }
-        MeasureWrite measureWrite = new 
MeasureWrite(schema.getMetadata().getGroup(), // group name
+        MeasureWrite measureWrite = 
getClient().createMeasureWrite(schema.getMetadata().getGroup(), // group name
                 schema.getMetadata().name(), // measure-name
                 TimeBucket.getTimestamp(metrics.getTimeBucket(), 
model.getDownsampling())); // timestamp
         final BanyanDBConverter.MeasureToStorage toStorage = new 
BanyanDBConverter.MeasureToStorage(schema, measureWrite);
@@ -171,7 +171,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO 
implements IMetricsD
         if (schema == null) {
             throw new IOException(model.getName() + " is not registered");
         }
-        MeasureWrite measureWrite = new 
MeasureWrite(schema.getMetadata().getGroup(), // group name
+        MeasureWrite measureWrite = 
getClient().createMeasureWrite(schema.getMetadata().getGroup(), // group name
                 schema.getMetadata().name(), // measure-name
                 TimeBucket.getTimestamp(metrics.getTimeBucket(), 
model.getDownsampling())); // timestamp
         final BanyanDBConverter.MeasureToStorage toStorage = new 
BanyanDBConverter.MeasureToStorage(schema, measureWrite);
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index 74e2d48cc9..1439ca174a 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
@@ -27,21 +26,26 @@ import 
org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
 
 import java.io.IOException;
 
-@RequiredArgsConstructor
-public class BanyanDBRecordDAO implements IRecordDAO {
+public class BanyanDBRecordDAO extends AbstractBanyanDBDAO implements 
IRecordDAO {
     private final StorageBuilder<Record> storageBuilder;
 
+    public BanyanDBRecordDAO(BanyanDBStorageClient client, 
StorageBuilder<Record> storageBuilder) {
+        super(client);
+        this.storageBuilder = storageBuilder;
+    }
+
     @Override
     public InsertRequest prepareBatchInsert(Model model, Record record) throws 
IOException {
         MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(model);
         if (schema == null) {
             throw new IOException(model.getName() + " is not registered");
         }
-        StreamWrite streamWrite = new StreamWrite(
+        StreamWrite streamWrite = getClient().createStreamWrite(
             schema.getMetadata().getGroup(), // group name
             model.getName(), // index-name
             record.id().build() // identity
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
index 12ce3c016b..24c8270cf3 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -48,7 +48,7 @@ public class BanyanDBStorageDAO extends 
AbstractDAO<BanyanDBStorageClient> imple
 
     @Override
     public IRecordDAO newRecordDao(StorageBuilder storageBuilder) {
-        return new BanyanDBRecordDAO((StorageBuilder<Record>) storageBuilder);
+        return new BanyanDBRecordDAO(getClient(), (StorageBuilder<Record>) 
storageBuilder);
     }
 
     @Override
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index acd9241501..6df3f375b2 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -23,6 +23,6 @@ 
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
 SW_KUBERNETES_COMMIT_SHA=e2c61c6774cf377b23516fca6f8a1e119d3191c5
 SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
-SW_BANYANDB_COMMIT=dea8c1e37d4dc19fe18397deb576151a22e2fad8
+SW_BANYANDB_COMMIT=df34a83663d66af9687ff484884ceae669c83980
 
 SW_CTL_COMMIT=890a7bc8c015ad407ec0c1f1f1658fc480652577

Reply via email to