This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch feature/update-banyandb-0.4 in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 2df823f8e7a2905dd9894e915cc2b17b1942c22f Author: Megrez Lu <[email protected]> AuthorDate: Tue Jul 4 23:35:37 2023 +0800 use banyandb java client 0.4.0 --- oap-server-bom/pom.xml | 2 +- .../storage/plugin/banyandb/BanyanDBNoneStreamDAO.java | 2 +- .../storage/plugin/banyandb/BanyanDBStorageClient.java | 11 ++++++++++- .../storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java | 4 ++-- .../storage/plugin/banyandb/stream/BanyanDBRecordDAO.java | 12 ++++++++---- .../storage/plugin/banyandb/stream/BanyanDBStorageDAO.java | 2 +- test/e2e-v2/script/env | 2 +- 7 files changed, 24 insertions(+), 11 deletions(-) diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml index a41ad75d64..f25bb37a39 100644 --- a/oap-server-bom/pom.xml +++ b/oap-server-bom/pom.xml @@ -72,7 +72,7 @@ <awaitility.version>3.0.0</awaitility.version> <httpcore.version>4.4.13</httpcore.version> <commons-compress.version>1.21</commons-compress.version> - <banyandb-java-client.version>0.4.0-rc0</banyandb-java-client.version> + <banyandb-java-client.version>0.4.0</banyandb-java-client.version> <kafka-clients.version>2.8.1</kafka-clients.version> <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version> <consul.client.version>1.5.3</consul.client.version> diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java index 25f698a8fe..1dc2c2b7d2 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java @@ -44,7 +44,7 @@ public class BanyanDBNoneStreamDAO extends AbstractDAO<BanyanDBStorageClient> im if (schema == null) { throw new IOException(model.getName() + " is not registered"); } - StreamWrite streamWrite = new StreamWrite( + StreamWrite streamWrite = getClient().client.createStreamWrite( schema.getMetadata().getGroup(), // group name schema.getMetadata().name(), // stream-name noneStream.id().build() // identity diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java index bb242a19ee..136786865d 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java @@ -23,13 +23,14 @@ import org.apache.skywalking.banyandb.v1.client.BanyanDBClient; import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor; import org.apache.skywalking.banyandb.v1.client.MeasureQuery; import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse; +import org.apache.skywalking.banyandb.v1.client.MeasureWrite; import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.banyandb.v1.client.StreamWrite; -import org.apache.skywalking.banyandb.v1.client.grpc.exception.AlreadyExistsException; import org.apache.skywalking.banyandb.v1.client.TopNQuery; import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse; +import org.apache.skywalking.banyandb.v1.client.grpc.exception.AlreadyExistsException; import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.banyandb.v1.client.metadata.Group; import org.apache.skywalking.banyandb.v1.client.metadata.Measure; @@ -197,6 +198,14 @@ public class BanyanDBStorageClient implements Client, HealthCheckable { } } + public StreamWrite createStreamWrite(String group, String name, String elementId) { + return this.client.createStreamWrite(group, name, elementId); + } + + public MeasureWrite createMeasureWrite(String group, String name, long timestamp) { + return this.client.createMeasureWrite(group, name, timestamp); + } + public void write(StreamWrite streamWrite) { this.client.write(streamWrite); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java index d1350cfa64..e971dd7032 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java @@ -153,7 +153,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD if (schema == null) { throw new IOException(model.getName() + " is not registered"); } - MeasureWrite measureWrite = new MeasureWrite(schema.getMetadata().getGroup(), // group name + MeasureWrite measureWrite = getClient().createMeasureWrite(schema.getMetadata().getGroup(), // group name schema.getMetadata().name(), // measure-name TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite); @@ -171,7 +171,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD if (schema == null) { throw new IOException(model.getName() + " is not registered"); } - MeasureWrite measureWrite = new MeasureWrite(schema.getMetadata().getGroup(), // group name + MeasureWrite measureWrite = getClient().createMeasureWrite(schema.getMetadata().getGroup(), // group name schema.getMetadata().name(), // measure-name TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java index 74e2d48cc9..1439ca174a 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java @@ -18,7 +18,6 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; -import lombok.RequiredArgsConstructor; import org.apache.skywalking.banyandb.v1.client.StreamWrite; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.storage.IRecordDAO; @@ -27,21 +26,26 @@ import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; import java.io.IOException; -@RequiredArgsConstructor -public class BanyanDBRecordDAO implements IRecordDAO { +public class BanyanDBRecordDAO extends AbstractBanyanDBDAO implements IRecordDAO { private final StorageBuilder<Record> storageBuilder; + public BanyanDBRecordDAO(BanyanDBStorageClient client, StorageBuilder<Record> storageBuilder) { + super(client); + this.storageBuilder = storageBuilder; + } + @Override public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException { MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); if (schema == null) { throw new IOException(model.getName() + " is not registered"); } - StreamWrite streamWrite = new StreamWrite( + StreamWrite streamWrite = getClient().createStreamWrite( schema.getMetadata().getGroup(), // group name model.getName(), // index-name record.id().build() // identity diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java index 12ce3c016b..24c8270cf3 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java @@ -48,7 +48,7 @@ public class BanyanDBStorageDAO extends AbstractDAO<BanyanDBStorageClient> imple @Override public IRecordDAO newRecordDao(StorageBuilder storageBuilder) { - return new BanyanDBRecordDAO((StorageBuilder<Record>) storageBuilder); + return new BanyanDBRecordDAO(getClient(), (StorageBuilder<Record>) storageBuilder); } @Override diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env index acd9241501..6df3f375b2 100644 --- a/test/e2e-v2/script/env +++ b/test/e2e-v2/script/env @@ -23,6 +23,6 @@ SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016 SW_KUBERNETES_COMMIT_SHA=e2c61c6774cf377b23516fca6f8a1e119d3191c5 SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5 -SW_BANYANDB_COMMIT=dea8c1e37d4dc19fe18397deb576151a22e2fad8 +SW_BANYANDB_COMMIT=df34a83663d66af9687ff484884ceae669c83980 SW_CTL_COMMIT=890a7bc8c015ad407ec0c1f1f1658fc480652577
