This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/banyandb-integration-stream by
this push:
new 40e1b27d98 add metadata integration
40e1b27d98 is described below
commit 40e1b27d98f2e4e0896d1a7e126ff8203524078e
Author: Megrez Lu <[email protected]>
AuthorDate: Sun May 1 22:07:20 2022 +0800
add metadata integration
---
.../banyandb/{stream => }/BanyanDBBatchDAO.java | 36 ++-
.../storage/plugin/banyandb/BanyanDBConverter.java | 95 ++++--
.../plugin/banyandb/BanyanDBIndexInstaller.java | 6 +-
.../plugin/banyandb/BanyanDBStorageClient.java | 7 +-
.../plugin/banyandb/BanyanDBStorageProvider.java | 1 -
.../storage/plugin/banyandb/MetadataRegistry.java | 6 +-
.../BanyanDBMeasureInsertRequest.java} | 23 +-
.../banyandb/measure/BanyanDBMetadataQueryDAO.java | 325 ++++++++++++++++++++-
.../banyandb/{ => measure}/BanyanDBMetricsDAO.java | 11 +-
.../banyandb/stream/BanyanDBAlarmQueryDAO.java | 2 +-
.../BanyanDBProfileThreadSnapshotQueryDAO.java | 4 +-
.../plugin/banyandb/stream/BanyanDBRecordDAO.java | 2 +-
.../plugin/banyandb/stream/BanyanDBStorageDAO.java | 2 +-
.../banyandb/stream/BanyanDBTraceQueryDAO.java | 2 +-
.../storage/plugin/banyandb/util/ByteUtil.java | 19 +-
.../plugin/banyandb/util/ByteUtilTest.java} | 27 +-
16 files changed, 485 insertions(+), 83 deletions(-)
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
similarity index 61%
rename from
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
rename to
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
index e17b8b66ac..bb776c6dfe 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
@@ -16,25 +16,35 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureInsertRequest;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient>
implements IBatchDAO {
- private StreamBulkWriteProcessor bulkProcessor;
+ private StreamBulkWriteProcessor streamBulkWriteProcessor;
+
+ private MeasureBulkWriteProcessor measureBulkWriteProcessor;
+
private final int maxBulkSize;
+
private final int flushInterval;
+
private final int concurrency;
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+
public BanyanDBBatchDAO(BanyanDBStorageClient client, int maxBulkSize, int
flushInterval, int concurrency) {
super(client);
this.maxBulkSize = maxBulkSize;
@@ -44,27 +54,31 @@ public class BanyanDBBatchDAO extends
AbstractDAO<BanyanDBStorageClient> impleme
@Override
public void insert(InsertRequest insertRequest) {
- if (bulkProcessor == null) {
- this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize,
flushInterval, concurrency);
+ if (initialized.compareAndSet(false, true)) {
+ this.streamBulkWriteProcessor =
getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency);
+ this.measureBulkWriteProcessor =
getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency);
}
-
if (insertRequest instanceof BanyanDBStreamInsertRequest) {
- this.bulkProcessor.add(((BanyanDBStreamInsertRequest)
insertRequest).getStreamWrite());
+ this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest)
insertRequest).getStreamWrite());
+ } else if (insertRequest instanceof BanyanDBMeasureInsertRequest) {
+ this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest)
insertRequest).getMeasureWrite());
}
}
@Override
public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests)
{
- if (bulkProcessor == null) {
- this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize,
flushInterval, concurrency);
+ if (initialized.compareAndSet(false, true)) {
+ this.streamBulkWriteProcessor =
getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency);
+ this.measureBulkWriteProcessor =
getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency);
}
if (CollectionUtils.isNotEmpty(prepareRequests)) {
return
CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
if (prepareRequest instanceof BanyanDBStreamInsertRequest) {
// TODO: return CompletableFuture<Void>
- this.bulkProcessor.add(((BanyanDBStreamInsertRequest)
prepareRequest).getStreamWrite());
- } else {
+
this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest)
prepareRequest).getStreamWrite());
+ } else if (prepareRequest instanceof
BanyanDBMeasureInsertRequest) {
+
this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest)
prepareRequest).getMeasureWrite());
}
return CompletableFuture.completedFuture(null);
}).toArray(CompletableFuture[]::new));
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index c3c936ef84..fe4d15f9ea 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -18,15 +18,18 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+import com.google.gson.JsonObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.v1.client.DataPoint;
import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Serializable;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import
org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
@@ -37,11 +40,15 @@ import java.util.List;
import java.util.function.Function;
public class BanyanDBConverter {
- @RequiredArgsConstructor
- public static class StreamToEntity implements Convert2Entity {
+ public static class StorageToStream implements Convert2Entity {
private final MetadataRegistry.Schema schema;
private final RowEntity rowEntity;
+ public StorageToStream(String modelName, RowEntity rowEntity) {
+ this.schema = MetadataRegistry.INSTANCE.findMetadata(modelName);
+ this.rowEntity = rowEntity;
+ }
+
@Override
public Object get(String fieldName) {
MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName);
@@ -61,13 +68,17 @@ public class BanyanDBConverter {
@Slf4j
@RequiredArgsConstructor
public static class StreamToStorage implements
Convert2Storage<StreamWrite> {
+ private final MetadataRegistry.Schema schema;
private final StreamWrite streamWrite;
@Override
public void accept(String fieldName, Object fieldValue) {
- // TODO: skip "time_bucket"
+ MetadataRegistry.ColumnSpec columnSpec =
this.schema.getSpec(fieldName);
+ if (columnSpec == null) {
+ throw new IllegalArgumentException("fail to find field[" +
fieldName + "]");
+ }
try {
- this.streamWrite.tag(fieldName, buildTag(fieldValue));
+ this.streamWrite.tag(fieldName, buildTag(fieldValue,
columnSpec.getColumnClass()));
} catch (BanyanDBException ex) {
log.error("fail to add tag", ex);
}
@@ -118,17 +129,28 @@ public class BanyanDBConverter {
@Override
public void accept(String fieldName, Object fieldValue) {
MetadataRegistry.ColumnSpec columnSpec =
this.schema.getSpec(fieldName);
+ if (columnSpec == null) {
+ throw new IllegalArgumentException("fail to find field[" +
fieldName + "]");
+ }
try {
if (columnSpec.getColumnType() ==
MetadataRegistry.ColumnType.TAG) {
- this.measureWrite.tag(fieldName, buildTag(fieldValue));
+ this.measureWrite.tag(fieldName, buildTag(fieldValue,
columnSpec.getColumnClass()));
} else {
- this.measureWrite.field(fieldName, buildField(fieldValue));
+ this.measureWrite.field(fieldName, buildField(fieldValue,
columnSpec.getColumnClass()));
}
} catch (BanyanDBException ex) {
log.error("fail to add tag", ex);
}
}
+ public void acceptID(String id) {
+ try {
+ this.measureWrite.tag(MetadataRegistry.ID,
TagAndValue.idTagValue(id));
+ } catch (BanyanDBException ex) {
+ log.error("fail to add ID tag", ex);
+ }
+ }
+
@Override
public void accept(String fieldName, byte[] fieldValue) {
MetadataRegistry.ColumnSpec columnSpec =
this.schema.getSpec(fieldName);
@@ -170,29 +192,62 @@ public class BanyanDBConverter {
}
}
- private static Serializable<BanyandbModel.TagValue> buildTag(Object value)
{
- if (Integer.class.equals(value.getClass()) ||
Long.class.equals(value.getClass())) {
- return TagAndValue.longTagValue((long) value);
- } else if (String.class.equals(value.getClass())) {
+ private static Serializable<BanyandbModel.TagValue> buildTag(Object value,
final Class<?> clazz) {
+ if (Integer.class.equals(clazz) || int.class.equals(clazz)) {
+ return TagAndValue.longTagValue(((Number) value).longValue());
+ } else if (Long.class.equals(clazz) || long.class.equals(clazz)) {
+ return TagAndValue.longTagValue((Long) value);
+ } else if (String.class.equals(clazz)) {
return TagAndValue.stringTagValue((String) value);
- } else if (Double.class.equals(value.getClass())) {
+ } else if (Double.class.equals(clazz) || double.class.equals(clazz)) {
return TagAndValue.binaryTagValue(ByteUtil.double2Bytes((double)
value));
- } else if (value instanceof StorageDataComplexObject) {
+ } else if (StorageDataComplexObject.class.isAssignableFrom(clazz)) {
return TagAndValue.stringTagValue(((StorageDataComplexObject<?>)
value).toStorageData());
+ } else if (Layer.class.equals(clazz)) {
+ return TagAndValue.longTagValue(((Integer) value).longValue());
+ } else if (JsonObject.class.equals(clazz)) {
+ return TagAndValue.stringTagValue((String) value);
}
- throw new IllegalStateException(value.getClass() + " is not
supported");
+ throw new IllegalStateException(clazz.getSimpleName() + " is not
supported");
}
- private static Serializable<BanyandbModel.FieldValue> buildField(Object
value) {
- if (Integer.class.equals(value.getClass()) ||
Long.class.equals(value.getClass())) {
- return TagAndValue.longFieldValue((long) value);
- } else if (String.class.equals(value.getClass())) {
+ private static Serializable<BanyandbModel.FieldValue> buildField(Object
value, final Class<?> clazz) {
+ if (Integer.class.equals(clazz) || int.class.equals(clazz)) {
+ return TagAndValue.longFieldValue(((Number) value).longValue());
+ } else if (Long.class.equals(clazz) || long.class.equals(clazz)) {
+ return TagAndValue.longFieldValue((Long) value);
+ } else if (String.class.equals(clazz)) {
return TagAndValue.stringFieldValue((String) value);
- } else if (Double.class.equals(value.getClass())) {
+ } else if (Double.class.equals(clazz) || double.class.equals(clazz)) {
return TagAndValue.binaryFieldValue(ByteUtil.double2Bytes((double)
value));
- } else if (value instanceof StorageDataComplexObject) {
+ } else if (StorageDataComplexObject.class.isAssignableFrom(clazz)) {
return TagAndValue.stringFieldValue(((StorageDataComplexObject<?>)
value).toStorageData());
}
- throw new IllegalStateException(value.getClass() + " is not
supported");
+ throw new IllegalStateException(clazz.getSimpleName() + " is not
supported");
+ }
+
+ public static class StorageToMeasure implements Convert2Entity {
+ private final MetadataRegistry.Schema schema;
+ private final DataPoint dataPoint;
+
+ public StorageToMeasure(String modelName, DataPoint dataPoint) {
+ this.schema = MetadataRegistry.INSTANCE.findMetadata(modelName);
+ this.dataPoint = dataPoint;
+ }
+
+ @Override
+ public Object get(String fieldName) {
+ MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName);
+ if (double.class.equals(spec.getColumnClass())) {
+ return ByteUtil.bytes2Double(dataPoint.getTagValue(fieldName));
+ } else {
+ return dataPoint.getTagValue(fieldName);
+ }
+ }
+
+ @Override
+ public <T, R> R getWith(String fieldName, Function<T, R> typeDecoder) {
+ return (R) this.get(fieldName);
+ }
}
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index 75109aa781..17d284c9a4 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -57,7 +57,11 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
}
log.info("group {} created", g.name());
// then check entity schema
- return metadata.findRemoteSchema(c).isPresent();
+ if (metadata.findRemoteSchema(c).isPresent()) {
+ MetadataRegistry.INSTANCE.registerModel(model,
this.configService);
+ return true;
+ }
+ return false;
} catch (BanyanDBException ex) {
throw new StorageException("fail to check existence", ex);
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index a92c270023..9ba378125c 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.oap.server.storage.plugin.banyandb;
import io.grpc.Status;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
+import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
@@ -149,10 +150,14 @@ public class BanyanDBStorageClient implements Client,
HealthCheckable {
this.client.write(streamWrite);
}
- public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int
flushInterval, int concurrency) {
+ public StreamBulkWriteProcessor createStreamBulkProcessor(int maxBulkSize,
int flushInterval, int concurrency) {
return this.client.buildStreamWriteProcessor(maxBulkSize,
flushInterval, concurrency);
}
+ public MeasureBulkWriteProcessor createMeasureBulkProcessor(int
maxBulkSize, int flushInterval, int concurrency) {
+ return this.client.buildMeasureWriteProcessor(maxBulkSize,
flushInterval, concurrency);
+ }
+
@Override
public void registerChecker(HealthChecker healthChecker) {
this.healthChecker.register(healthChecker);
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index 26c6b2529a..6bff41e16f 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -57,7 +57,6 @@ import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDB
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBServiceLabelDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBTopologyQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBAlarmQueryDAO;
-import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBatchDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBrowserLogQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingDataDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingScheduleQueryDAO;
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index f0b88fe538..18767ae21e 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -67,7 +67,7 @@ import java.util.stream.Collectors;
public enum MetadataRegistry {
INSTANCE;
- private static final String ID = "id";
+ public static final String ID = "id";
private final Map<String, Schema> registry = new ConcurrentHashMap<>();
public NamedSchema<?> registerModel(Model model, ConfigService
configService) {
@@ -91,8 +91,7 @@ public enum MetadataRegistry {
if (partialMetadata.getKind() == Kind.STREAM) {
final Stream.Builder builder =
Stream.create(partialMetadata.getGroup(), partialMetadata.getName());
if (entities.isEmpty()) {
- log.warn("sharding keys of model[stream.{}] must not be
empty", model.getName());
-// throw new IllegalStateException("sharding keys of model[" +
model.getName() + "] must not be empty");
+ throw new IllegalStateException("sharding keys of
model[stream." + model.getName() + "] must not be empty");
}
builder.setEntityRelativeTags(entities);
builder.addTagFamilies(tagFamilySpecs);
@@ -401,6 +400,7 @@ public enum MetadataRegistry {
private final PartialMetadata metadata;
@Singular
private final Map<String, ColumnSpec> specs;
+ private final boolean useIdAsShardingKey;
public ColumnSpec getSpec(String columnName) {
return this.specs.get(columnName);
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
similarity index 67%
copy from
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
copy to
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
index 5ae1102e38..3f965bff6f 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
@@ -16,18 +16,15 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.util;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
-import java.nio.ByteBuffer;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-public class ByteUtil {
- private static final ThreadLocal<ByteBuffer> BYTE_BUFFER =
ThreadLocal.withInitial(() -> ByteBuffer.allocate(8));
-
- public static Double bytes2Double(byte[] bytes) {
- return BYTE_BUFFER.get().put(bytes).getDouble();
- }
-
- public static byte[] double2Bytes(double number) {
- return BYTE_BUFFER.get().putDouble(number).array();
- }
-}
+@RequiredArgsConstructor
+@Getter
+public class BanyanDBMeasureInsertRequest implements InsertRequest {
+ private final MeasureWrite measureWrite;
+}
\ No newline at end of file
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index 23828464da..23f096115b 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -18,10 +18,26 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.skywalking.banyandb.v1.client.DataPoint;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import
org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import
org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType;
+import
org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
import
org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.query.enumeration.Language;
+import
org.apache.skywalking.oap.server.core.query.enumeration.ProfilingSupportStatus;
+import org.apache.skywalking.oap.server.core.query.type.Attribute;
import org.apache.skywalking.oap.server.core.query.type.Endpoint;
import org.apache.skywalking.oap.server.core.query.type.Process;
import org.apache.skywalking.oap.server.core.query.type.Service;
@@ -29,13 +45,22 @@ import
org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic.PropertyUtil.LANGUAGE;
public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements
IMetadataQueryDAO {
+ private static final Gson GSON = new Gson();
+
public BanyanDBMetadataQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@@ -43,7 +68,12 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
@Override
public List<Service> listServices(String layer, String group) throws
IOException {
MeasureQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
- ImmutableSet.of(ServiceTraffic.NAME,
ServiceTraffic.SHORT_NAME),
+ ImmutableSet.of(ServiceTraffic.NAME,
+ ServiceTraffic.SHORT_NAME,
+ ServiceTraffic.GROUP,
+ ServiceTraffic.LAYER,
+ ServiceTraffic.SERVICE_ID,
+ MetadataRegistry.ID),
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
@@ -56,41 +86,314 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
}
});
- return Collections.emptyList();
+ final List<Service> services = new ArrayList<>();
+
+ for (final DataPoint dataPoint : resp.getDataPoints()) {
+ services.add(buildService(dataPoint));
+ }
+
+ return services;
}
@Override
public List<Service> getServices(String serviceId) throws IOException {
- return Collections.emptyList();
+ MeasureQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
+ ImmutableSet.of(ServiceTraffic.NAME,
+ ServiceTraffic.SHORT_NAME,
+ ServiceTraffic.GROUP,
+ ServiceTraffic.LAYER,
+ ServiceTraffic.SERVICE_ID,
+ MetadataRegistry.ID),
+ Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ if (StringUtil.isNotEmpty(serviceId)) {
+
query.appendCondition(eq(ServiceTraffic.SERVICE_ID, serviceId));
+ }
+ }
+ });
+
+ final List<Service> services = new ArrayList<>();
+
+ for (final DataPoint dataPoint : resp.getDataPoints()) {
+ services.add(buildService(dataPoint));
+ }
+
+ return services;
}
@Override
public List<ServiceInstance> listInstances(long startTimestamp, long
endTimestamp, String serviceId) throws IOException {
- return Collections.emptyList();
+ final long minuteTimeBucket =
TimeBucket.getMinuteTimeBucket(startTimestamp);
+
+ MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
+ ImmutableSet.of(InstanceTraffic.NAME,
+ InstanceTraffic.LAYER,
+ InstanceTraffic.PROPERTIES,
+ InstanceTraffic.LAST_PING_TIME_BUCKET,
+ InstanceTraffic.SERVICE_ID,
+ MetadataRegistry.ID),
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ if (StringUtil.isNotEmpty(serviceId)) {
+
query.appendCondition(eq(InstanceTraffic.SERVICE_ID, serviceId));
+ }
+
query.appendCondition(gte(InstanceTraffic.LAST_PING_TIME_BUCKET,
minuteTimeBucket));
+ }
+ });
+
+ final List<ServiceInstance> instances = new ArrayList<>();
+
+ for (final DataPoint dataPoint : resp.getDataPoints()) {
+ instances.add(buildInstance(dataPoint));
+ }
+
+ return instances;
}
@Override
public ServiceInstance getInstance(String instanceId) throws IOException {
- return null;
+ MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
+ ImmutableSet.of(InstanceTraffic.NAME,
+ InstanceTraffic.LAYER,
+ InstanceTraffic.PROPERTIES,
+ MetadataRegistry.ID),
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ if (StringUtil.isNotEmpty(instanceId)) {
+
query.appendCondition(PairQueryCondition.IDQueryCondition.eq(MetadataRegistry.ID,
instanceId));
+ }
+ }
+ });
+
+ return resp.size() > 0 ? buildInstance(resp.getDataPoints().get(0)) :
null;
}
@Override
public List<Endpoint> findEndpoint(String keyword, String serviceId, int
limit) throws IOException {
- return Collections.emptyList();
+ MeasureQueryResponse resp = query(EndpointTraffic.INDEX_NAME,
+ ImmutableSet.of(EndpointTraffic.NAME,
+ EndpointTraffic.SERVICE_ID,
+ MetadataRegistry.ID),
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ if (StringUtil.isNotEmpty(serviceId)) {
+
query.appendCondition(eq(EndpointTraffic.SERVICE_ID, serviceId));
+ }
+ }
+ });
+
+ final List<Endpoint> endpoints = new ArrayList<>();
+
+ for (final DataPoint dataPoint : resp.getDataPoints()) {
+ endpoints.add(buildEndpoint(dataPoint));
+ }
+
+ if (StringUtil.isNotEmpty(serviceId)) {
+ return endpoints.stream().filter(e ->
e.getName().contains(keyword)).collect(Collectors.toList());
+ }
+ return endpoints;
}
@Override
- public List<Process> listProcesses(String serviceId, String instanceId,
String agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket)
throws IOException {
- return Collections.emptyList();
+ public List<Process> listProcesses(String serviceId, String instanceId,
String agentId, ProfilingSupportStatus profilingSupportStatus, long
lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
+ MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
+ ImmutableSet.of(ProcessTraffic.NAME,
+ ProcessTraffic.SERVICE_ID,
+ ProcessTraffic.INSTANCE_ID,
+ ProcessTraffic.AGENT_ID,
+ ProcessTraffic.LAYER,
+ ProcessTraffic.DETECT_TYPE,
+ ProcessTraffic.PROPERTIES,
+ ProcessTraffic.LABELS_JSON,
+ ProcessTraffic.LAST_PING_TIME_BUCKET,
+ ProcessTraffic.PROFILING_SUPPORT_STATUS,
+ MetadataRegistry.ID),
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ if (StringUtil.isNotEmpty(serviceId)) {
+
query.appendCondition(eq(ProcessTraffic.SERVICE_ID, serviceId));
+ }
+ if (StringUtil.isNotEmpty(instanceId)) {
+
query.appendCondition(eq(ProcessTraffic.INSTANCE_ID, instanceId));
+ }
+ if (StringUtil.isNotEmpty(agentId)) {
+ query.appendCondition(eq(ProcessTraffic.AGENT_ID,
instanceId));
+ }
+ if (lastPingStartTimeBucket > 0) {
+
query.appendCondition(gte(ProcessTraffic.LAST_PING_TIME_BUCKET,
lastPingStartTimeBucket));
+ }
+ if (lastPingEndTimeBucket > 0) {
+
query.appendCondition(lte(ProcessTraffic.LAST_PING_TIME_BUCKET,
lastPingEndTimeBucket));
+ }
+ if (profilingSupportStatus != null) {
+
query.appendCondition(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS,
profilingSupportStatus.value()));
+ }
+ }
+ });
+
+ final List<Process> processes = new ArrayList<>();
+
+ for (final DataPoint dataPoint : resp.getDataPoints()) {
+ processes.add(buildProcess(dataPoint));
+ }
+
+ return processes;
}
@Override
- public long getProcessesCount(String serviceId, String instanceId, String
agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws
IOException {
- return 0;
+ public long getProcessesCount(String serviceId, String instanceId, String
agentId, ProfilingSupportStatus profilingSupportStatus, long
lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
+ MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
+ ImmutableSet.of(ProcessTraffic.NAME,
+ ProcessTraffic.SERVICE_ID,
+ ProcessTraffic.INSTANCE_ID,
+ ProcessTraffic.AGENT_ID,
+ ProcessTraffic.LAYER,
+ ProcessTraffic.DETECT_TYPE,
+ ProcessTraffic.PROPERTIES,
+ ProcessTraffic.LABELS_JSON,
+ ProcessTraffic.LAST_PING_TIME_BUCKET,
+ ProcessTraffic.PROFILING_SUPPORT_STATUS,
+ MetadataRegistry.ID),
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ if (StringUtil.isNotEmpty(serviceId)) {
+
query.appendCondition(eq(ProcessTraffic.SERVICE_ID, serviceId));
+ }
+ if (StringUtil.isNotEmpty(instanceId)) {
+
query.appendCondition(eq(ProcessTraffic.INSTANCE_ID, instanceId));
+ }
+ if (StringUtil.isNotEmpty(agentId)) {
+ query.appendCondition(eq(ProcessTraffic.AGENT_ID,
instanceId));
+ }
+ if (lastPingStartTimeBucket > 0) {
+
query.appendCondition(gte(ProcessTraffic.LAST_PING_TIME_BUCKET,
lastPingStartTimeBucket));
+ }
+ if (lastPingEndTimeBucket > 0) {
+
query.appendCondition(lte(ProcessTraffic.LAST_PING_TIME_BUCKET,
lastPingEndTimeBucket));
+ }
+ if (profilingSupportStatus != null) {
+
query.appendCondition(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS,
profilingSupportStatus.value()));
+ }
+ }
+ });
+
+ return resp.getDataPoints()
+ .stream()
+ .collect(Collectors.groupingBy((Function<DataPoint, String>)
dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
+ .size();
}
@Override
public Process getProcess(String processId) throws IOException {
- return null;
+ MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
+ ImmutableSet.of(ProcessTraffic.NAME,
+ ProcessTraffic.SERVICE_ID,
+ ProcessTraffic.INSTANCE_ID,
+ ProcessTraffic.AGENT_ID,
+ ProcessTraffic.LAYER,
+ ProcessTraffic.DETECT_TYPE,
+ ProcessTraffic.PROPERTIES,
+ ProcessTraffic.LABELS_JSON,
+ MetadataRegistry.ID),
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ if (StringUtil.isNotEmpty(processId)) {
+
query.appendCondition(PairQueryCondition.IDQueryCondition.eq(MetadataRegistry.ID,
processId));
+ }
+ }
+ });
+
+ return resp.size() > 0 ? buildProcess(resp.getDataPoints().get(0)) :
null;
+ }
+
+ private Service buildService(DataPoint dataPoint) {
+ Service service = new Service();
+ service.setId(dataPoint.getTagValue(ServiceTraffic.SERVICE_ID));
+ service.setName(dataPoint.getTagValue(ServiceTraffic.NAME));
+ service.setShortName(dataPoint.getTagValue(ServiceTraffic.SHORT_NAME));
+ service.setGroup(dataPoint.getTagValue(ServiceTraffic.GROUP));
+ service.getLayers().add(Layer.valueOf(((Number)
dataPoint.getTagValue(ServiceTraffic.LAYER)).intValue()).name());
+ return service;
+ }
+
+ private ServiceInstance buildInstance(DataPoint dataPoint) {
+ ServiceInstance serviceInstance = new ServiceInstance();
+ serviceInstance.setId(dataPoint.getId());
+ serviceInstance.setName(dataPoint.getTagValue(InstanceTraffic.NAME));
+ serviceInstance.setInstanceUUID(dataPoint.getId());
+ serviceInstance.setLayer(Layer.valueOf(((Number)
dataPoint.getTagValue(InstanceTraffic.LAYER)).intValue()).name());
+
+ final String propString =
dataPoint.getTagValue(InstanceTraffic.PROPERTIES);
+ JsonObject properties = null;
+ if (StringUtil.isNotEmpty(propString)) {
+ properties = GSON.fromJson(propString, JsonObject.class);
+ }
+ if (properties != null) {
+ for (Map.Entry<String, JsonElement> property :
properties.entrySet()) {
+ String key = property.getKey();
+ String value = property.getValue().getAsString();
+ if (key.equals(LANGUAGE)) {
+ serviceInstance.setLanguage(Language.value(value));
+ } else {
+ serviceInstance.getAttributes().add(new Attribute(key,
value));
+ }
+ }
+ } else {
+ serviceInstance.setLanguage(Language.UNKNOWN);
+ }
+
+ return serviceInstance;
+ }
+
+ private Endpoint buildEndpoint(DataPoint dataPoint) {
+ Endpoint endpoint = new Endpoint();
+ endpoint.setId(dataPoint.getId());
+ endpoint.setName(dataPoint.getTagValue(EndpointTraffic.NAME));
+ return endpoint;
+ }
+
+ private Process buildProcess(DataPoint dataPoint) {
+ Process process = new Process();
+
+ process.setId(dataPoint.getId());
+ process.setName(dataPoint.getTagValue(ProcessTraffic.NAME));
+ String serviceId = dataPoint.getTagValue(ProcessTraffic.SERVICE_ID);
+ process.setServiceId(serviceId);
+
process.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName());
+ String instanceId = dataPoint.getTagValue(ProcessTraffic.INSTANCE_ID);
+ process.setInstanceId(instanceId);
+
process.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName());
+ process.setLayer(Layer.valueOf(((Number)
dataPoint.getTagValue(ProcessTraffic.LAYER)).intValue()).name());
+ process.setAgentId(dataPoint.getTagValue(ProcessTraffic.AGENT_ID));
+ process.setDetectType(ProcessDetectType.valueOf(((Number)
dataPoint.getTagValue(ProcessTraffic.DETECT_TYPE)).intValue()).name());
+
+ String propString = dataPoint.getTagValue(ProcessTraffic.PROPERTIES);
+ if (!Strings.isNullOrEmpty(propString)) {
+ JsonObject properties = GSON.fromJson(propString,
JsonObject.class);
+ for (Map.Entry<String, JsonElement> property :
properties.entrySet()) {
+ String key = property.getKey();
+ String value = property.getValue().getAsString();
+ process.getAttributes().add(new Attribute(key, value));
+ }
+ }
+ String labelJson = dataPoint.getTagValue(ProcessTraffic.LABELS_JSON);
+ if (!Strings.isNullOrEmpty(labelJson)) {
+ List<String> labels = GSON.<List<String>>fromJson(labelJson,
ArrayList.class);
+ process.getLabels().addAll(labels);
+ }
+ return process;
}
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
similarity index 81%
rename from
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
rename to
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index 20a300c3b0..fdb7950c0a 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -1,8 +1,9 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
@@ -11,6 +12,8 @@ import
org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
import java.io.IOException;
import java.util.Collections;
@@ -36,10 +39,10 @@ public class BanyanDBMetricsDAO implements IMetricsDAO {
MeasureWrite measureWrite = new
MeasureWrite(schema.getMetadata().getGroup(), // group name
model.getName(), // index-name
TimeBucket.getTimestamp(metrics.getTimeBucket(),
model.getDownsampling())); // timestamp
- final Convert2Storage<MeasureWrite> toStorage = new
BanyanDBConverter.MeasureToStorage(schema, measureWrite);
+ final BanyanDBConverter.MeasureToStorage toStorage = new
BanyanDBConverter.MeasureToStorage(schema, measureWrite);
storageBuilder.entity2Storage(metrics, toStorage);
- return new InsertRequest() {
- };
+ toStorage.acceptID(metrics.id());
+ return new BanyanDBMeasureInsertRequest(toStorage.obtain());
}
@Override
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index 55f2cf9ec7..ef1b7768a6 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -84,7 +84,7 @@ public class BanyanDBAlarmQueryDAO extends
AbstractBanyanDBDAO implements IAlarm
for (final RowEntity rowEntity : resp.getElements()) {
AlarmRecord.Builder builder = new AlarmRecord.Builder();
AlarmRecord alarmRecord = builder.storage2Entity(
- new
BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(AlarmRecord.INDEX_NAME),
rowEntity)
+ new
BanyanDBConverter.StorageToStream(AlarmRecord.INDEX_NAME, rowEntity)
);
AlarmMessage message = new AlarmMessage();
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index d60484b6f0..b3d8ed77e9 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -142,7 +142,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends
AbstractBanyanDBDAO i
List<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence
- minSequence);
for (final RowEntity rowEntity : resp.getElements()) {
ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
- new
BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(ProfileThreadSnapshotRecord.INDEX_NAME),
rowEntity));
+ new
BanyanDBConverter.StorageToStream(ProfileThreadSnapshotRecord.INDEX_NAME,
rowEntity));
result.add(record);
}
@@ -201,7 +201,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends
AbstractBanyanDBDAO i
List<ProfileThreadSnapshotRecord> records = new ArrayList<>();
for (final RowEntity rowEntity : resp.getElements()) {
ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
- new
BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(ProfileThreadSnapshotRecord.INDEX_NAME),
rowEntity));
+ new
BanyanDBConverter.StorageToStream(ProfileThreadSnapshotRecord.INDEX_NAME,
rowEntity));
records.add(record);
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index 48443b9e51..1598dc73d1 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -46,7 +46,7 @@ public class BanyanDBRecordDAO implements IRecordDAO {
model.getName(), // index-name
record.id(), // identity
TimeBucket.getTimestamp(record.getTimeBucket(),
model.getDownsampling())); // timestamp
- Convert2Storage<StreamWrite> convert2Storage = new
BanyanDBConverter.StreamToStorage(streamWrite);
+ Convert2Storage<StreamWrite> convert2Storage = new
BanyanDBConverter.StreamToStorage(schema, streamWrite);
storageBuilder.entity2Storage(record, convert2Storage);
return new BanyanDBStreamInsertRequest(convert2Storage.obtain());
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
index c3f75c8cef..76a78030e3 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -31,7 +31,7 @@ import
org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBManagementDAO;
-import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBMetricsDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetricsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBNoneStreamDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 44749dc0fb..2e3f4f71e7 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -168,7 +168,7 @@ public class BanyanDBTraceQueryDAO extends
AbstractBanyanDBDAO implements ITrace
for (final RowEntity rowEntity : resp.getElements()) {
SegmentRecord segmentRecord = new
SegmentRecord.Builder().storage2Entity(
- new
BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(SegmentRecord.INDEX_NAME),
rowEntity));
+ new
BanyanDBConverter.StorageToStream(SegmentRecord.INDEX_NAME, rowEntity));
segmentRecords.add(segmentRecord);
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
index 5ae1102e38..cf35457b7a 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
@@ -18,16 +18,27 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.util;
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
public class ByteUtil {
- private static final ThreadLocal<ByteBuffer> BYTE_BUFFER =
ThreadLocal.withInitial(() -> ByteBuffer.allocate(8));
+ private static final ThreadLocal<ByteBuf> BYTE_BUFFER =
ThreadLocal.withInitial(() -> Unpooled.buffer(8));
public static Double bytes2Double(byte[] bytes) {
- return BYTE_BUFFER.get().put(bytes).getDouble();
+ final ByteBuf buf = BYTE_BUFFER.get();
+ try {
+ return buf.writeBytes(bytes).readDouble();
+ } finally {
+ buf.clear();
+ }
}
public static byte[] double2Bytes(double number) {
- return BYTE_BUFFER.get().putDouble(number).array();
+ final ByteBuf buf = BYTE_BUFFER.get();
+ try {
+ return buf.writeDouble(number).array();
+ } finally {
+ buf.clear();
+ }
}
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtilTest.java
similarity index 52%
copy from
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
copy to
oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtilTest.java
index 5ae1102e38..a041dc1454 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtilTest.java
@@ -18,16 +18,27 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.util;
-import java.nio.ByteBuffer;
+import org.junit.Assert;
+import org.junit.Test;
-public class ByteUtil {
- private static final ThreadLocal<ByteBuffer> BYTE_BUFFER =
ThreadLocal.withInitial(() -> ByteBuffer.allocate(8));
-
- public static Double bytes2Double(byte[] bytes) {
- return BYTE_BUFFER.get().put(bytes).getDouble();
+public class ByteUtilTest {
+ @Test
+ public void testConvertDoubleAndBackOnce() {
+ double pi = 3.14159;
+ byte[] data = ByteUtil.double2Bytes(pi);
+ Assert.assertEquals(8, data.length);
+ Assert.assertEquals(pi, ByteUtil.bytes2Double(data), 0.00001);
}
- public static byte[] double2Bytes(double number) {
- return BYTE_BUFFER.get().putDouble(number).array();
+ @Test
+ public void testConvertDoubleAndBackTwice() {
+ double pi = 3.14159;
+ byte[] binaryPI = ByteUtil.double2Bytes(pi);
+ Assert.assertEquals(8, binaryPI.length);
+ Assert.assertEquals(pi, ByteUtil.bytes2Double(binaryPI), 0.00001);
+ double e = 2.71828;
+ byte[] binaryE = ByteUtil.double2Bytes(e);
+ Assert.assertEquals(8, binaryE.length);
+ Assert.assertEquals(e, ByteUtil.bytes2Double(binaryE), 0.00001);
}
}