This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 9a96d1577fc567f872aacd4bd86c40d27b87c50c Author: Megrez Lu <[email protected]> AuthorDate: Sun May 1 22:07:20 2022 +0800 add metadata integration --- .../banyandb/{stream => }/BanyanDBBatchDAO.java | 36 ++- .../storage/plugin/banyandb/BanyanDBConverter.java | 95 ++++-- .../plugin/banyandb/BanyanDBIndexInstaller.java | 6 +- .../plugin/banyandb/BanyanDBStorageClient.java | 7 +- .../plugin/banyandb/BanyanDBStorageProvider.java | 1 - .../storage/plugin/banyandb/MetadataRegistry.java | 6 +- .../BanyanDBMeasureInsertRequest.java} | 23 +- .../banyandb/measure/BanyanDBMetadataQueryDAO.java | 325 ++++++++++++++++++++- .../banyandb/{ => measure}/BanyanDBMetricsDAO.java | 11 +- .../banyandb/stream/BanyanDBAlarmQueryDAO.java | 2 +- .../BanyanDBProfileThreadSnapshotQueryDAO.java | 4 +- .../plugin/banyandb/stream/BanyanDBRecordDAO.java | 2 +- .../plugin/banyandb/stream/BanyanDBStorageDAO.java | 2 +- .../banyandb/stream/BanyanDBTraceQueryDAO.java | 2 +- .../storage/plugin/banyandb/util/ByteUtil.java | 19 +- .../plugin/banyandb/util/ByteUtilTest.java} | 27 +- 16 files changed, 485 insertions(+), 83 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java similarity index 61% rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java index e17b8b66ac..bb776c6dfe 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java @@ -16,25 +16,35 @@ * */ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; +package org.apache.skywalking.oap.server.storage.plugin.banyandb; +import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor; import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor; import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; import org.apache.skywalking.oap.server.library.util.CollectionUtils; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureInsertRequest; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO { - private StreamBulkWriteProcessor bulkProcessor; + private StreamBulkWriteProcessor streamBulkWriteProcessor; + + private MeasureBulkWriteProcessor measureBulkWriteProcessor; + private final int maxBulkSize; + private final int flushInterval; + private final int concurrency; + private final AtomicBoolean initialized = new AtomicBoolean(false); + public BanyanDBBatchDAO(BanyanDBStorageClient client, int maxBulkSize, int flushInterval, int concurrency) { super(client); this.maxBulkSize = maxBulkSize; @@ -44,27 +54,31 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme @Override public void insert(InsertRequest insertRequest) { - if (bulkProcessor == null) { - this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize, flushInterval, concurrency); + if (initialized.compareAndSet(false, true)) { + this.streamBulkWriteProcessor = getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency); + this.measureBulkWriteProcessor = getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency); } - if (insertRequest instanceof BanyanDBStreamInsertRequest) { - this.bulkProcessor.add(((BanyanDBStreamInsertRequest) insertRequest).getStreamWrite()); + this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest) insertRequest).getStreamWrite()); + } else if (insertRequest instanceof BanyanDBMeasureInsertRequest) { + this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest) insertRequest).getMeasureWrite()); } } @Override public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) { - if (bulkProcessor == null) { - this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize, flushInterval, concurrency); + if (initialized.compareAndSet(false, true)) { + this.streamBulkWriteProcessor = getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency); + this.measureBulkWriteProcessor = getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency); } if (CollectionUtils.isNotEmpty(prepareRequests)) { return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> { if (prepareRequest instanceof BanyanDBStreamInsertRequest) { // TODO: return CompletableFuture<Void> - this.bulkProcessor.add(((BanyanDBStreamInsertRequest) prepareRequest).getStreamWrite()); - } else { + this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest) prepareRequest).getStreamWrite()); + } else if (prepareRequest instanceof BanyanDBMeasureInsertRequest) { + this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest) prepareRequest).getMeasureWrite()); } return CompletableFuture.completedFuture(null); }).toArray(CompletableFuture[]::new)); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java index c3c936ef84..fe4d15f9ea 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java @@ -18,15 +18,18 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; +import com.google.gson.JsonObject; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.banyandb.model.v1.BanyandbModel; +import org.apache.skywalking.banyandb.v1.client.DataPoint; import org.apache.skywalking.banyandb.v1.client.MeasureWrite; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.StreamWrite; import org.apache.skywalking.banyandb.v1.client.TagAndValue; import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.banyandb.v1.client.metadata.Serializable; +import org.apache.skywalking.oap.server.core.analysis.Layer; import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity; import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject; @@ -37,11 +40,15 @@ import java.util.List; import java.util.function.Function; public class BanyanDBConverter { - @RequiredArgsConstructor - public static class StreamToEntity implements Convert2Entity { + public static class StorageToStream implements Convert2Entity { private final MetadataRegistry.Schema schema; private final RowEntity rowEntity; + public StorageToStream(String modelName, RowEntity rowEntity) { + this.schema = MetadataRegistry.INSTANCE.findMetadata(modelName); + this.rowEntity = rowEntity; + } + @Override public Object get(String fieldName) { MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName); @@ -61,13 +68,17 @@ public class BanyanDBConverter { @Slf4j @RequiredArgsConstructor public static class StreamToStorage implements Convert2Storage<StreamWrite> { + private final MetadataRegistry.Schema schema; private final StreamWrite streamWrite; @Override public void accept(String fieldName, Object fieldValue) { - // TODO: skip "time_bucket" + MetadataRegistry.ColumnSpec columnSpec = this.schema.getSpec(fieldName); + if (columnSpec == null) { + throw new IllegalArgumentException("fail to find field[" + fieldName + "]"); + } try { - this.streamWrite.tag(fieldName, buildTag(fieldValue)); + this.streamWrite.tag(fieldName, buildTag(fieldValue, columnSpec.getColumnClass())); } catch (BanyanDBException ex) { log.error("fail to add tag", ex); } @@ -118,17 +129,28 @@ public class BanyanDBConverter { @Override public void accept(String fieldName, Object fieldValue) { MetadataRegistry.ColumnSpec columnSpec = this.schema.getSpec(fieldName); + if (columnSpec == null) { + throw new IllegalArgumentException("fail to find field[" + fieldName + "]"); + } try { if (columnSpec.getColumnType() == MetadataRegistry.ColumnType.TAG) { - this.measureWrite.tag(fieldName, buildTag(fieldValue)); + this.measureWrite.tag(fieldName, buildTag(fieldValue, columnSpec.getColumnClass())); } else { - this.measureWrite.field(fieldName, buildField(fieldValue)); + this.measureWrite.field(fieldName, buildField(fieldValue, columnSpec.getColumnClass())); } } catch (BanyanDBException ex) { log.error("fail to add tag", ex); } } + public void acceptID(String id) { + try { + this.measureWrite.tag(MetadataRegistry.ID, TagAndValue.idTagValue(id)); + } catch (BanyanDBException ex) { + log.error("fail to add ID tag", ex); + } + } + @Override public void accept(String fieldName, byte[] fieldValue) { MetadataRegistry.ColumnSpec columnSpec = this.schema.getSpec(fieldName); @@ -170,29 +192,62 @@ public class BanyanDBConverter { } } - private static Serializable<BanyandbModel.TagValue> buildTag(Object value) { - if (Integer.class.equals(value.getClass()) || Long.class.equals(value.getClass())) { - return TagAndValue.longTagValue((long) value); - } else if (String.class.equals(value.getClass())) { + private static Serializable<BanyandbModel.TagValue> buildTag(Object value, final Class<?> clazz) { + if (Integer.class.equals(clazz) || int.class.equals(clazz)) { + return TagAndValue.longTagValue(((Number) value).longValue()); + } else if (Long.class.equals(clazz) || long.class.equals(clazz)) { + return TagAndValue.longTagValue((Long) value); + } else if (String.class.equals(clazz)) { return TagAndValue.stringTagValue((String) value); - } else if (Double.class.equals(value.getClass())) { + } else if (Double.class.equals(clazz) || double.class.equals(clazz)) { return TagAndValue.binaryTagValue(ByteUtil.double2Bytes((double) value)); - } else if (value instanceof StorageDataComplexObject) { + } else if (StorageDataComplexObject.class.isAssignableFrom(clazz)) { return TagAndValue.stringTagValue(((StorageDataComplexObject<?>) value).toStorageData()); + } else if (Layer.class.equals(clazz)) { + return TagAndValue.longTagValue(((Integer) value).longValue()); + } else if (JsonObject.class.equals(clazz)) { + return TagAndValue.stringTagValue((String) value); } - throw new IllegalStateException(value.getClass() + " is not supported"); + throw new IllegalStateException(clazz.getSimpleName() + " is not supported"); } - private static Serializable<BanyandbModel.FieldValue> buildField(Object value) { - if (Integer.class.equals(value.getClass()) || Long.class.equals(value.getClass())) { - return TagAndValue.longFieldValue((long) value); - } else if (String.class.equals(value.getClass())) { + private static Serializable<BanyandbModel.FieldValue> buildField(Object value, final Class<?> clazz) { + if (Integer.class.equals(clazz) || int.class.equals(clazz)) { + return TagAndValue.longFieldValue(((Number) value).longValue()); + } else if (Long.class.equals(clazz) || long.class.equals(clazz)) { + return TagAndValue.longFieldValue((Long) value); + } else if (String.class.equals(clazz)) { return TagAndValue.stringFieldValue((String) value); - } else if (Double.class.equals(value.getClass())) { + } else if (Double.class.equals(clazz) || double.class.equals(clazz)) { return TagAndValue.binaryFieldValue(ByteUtil.double2Bytes((double) value)); - } else if (value instanceof StorageDataComplexObject) { + } else if (StorageDataComplexObject.class.isAssignableFrom(clazz)) { return TagAndValue.stringFieldValue(((StorageDataComplexObject<?>) value).toStorageData()); } - throw new IllegalStateException(value.getClass() + " is not supported"); + throw new IllegalStateException(clazz.getSimpleName() + " is not supported"); + } + + public static class StorageToMeasure implements Convert2Entity { + private final MetadataRegistry.Schema schema; + private final DataPoint dataPoint; + + public StorageToMeasure(String modelName, DataPoint dataPoint) { + this.schema = MetadataRegistry.INSTANCE.findMetadata(modelName); + this.dataPoint = dataPoint; + } + + @Override + public Object get(String fieldName) { + MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName); + if (double.class.equals(spec.getColumnClass())) { + return ByteUtil.bytes2Double(dataPoint.getTagValue(fieldName)); + } else { + return dataPoint.getTagValue(fieldName); + } + } + + @Override + public <T, R> R getWith(String fieldName, Function<T, R> typeDecoder) { + return (R) this.get(fieldName); + } } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java index 75109aa781..17d284c9a4 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java @@ -57,7 +57,11 @@ public class BanyanDBIndexInstaller extends ModelInstaller { } log.info("group {} created", g.name()); // then check entity schema - return metadata.findRemoteSchema(c).isPresent(); + if (metadata.findRemoteSchema(c).isPresent()) { + MetadataRegistry.INSTANCE.registerModel(model, this.configService); + return true; + } + return false; } catch (BanyanDBException ex) { throw new StorageException("fail to check existence", ex); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java index a92c270023..9ba378125c 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import io.grpc.Status; import org.apache.skywalking.banyandb.v1.client.BanyanDBClient; +import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor; import org.apache.skywalking.banyandb.v1.client.MeasureQuery; import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse; import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor; @@ -149,10 +150,14 @@ public class BanyanDBStorageClient implements Client, HealthCheckable { this.client.write(streamWrite); } - public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) { + public StreamBulkWriteProcessor createStreamBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) { return this.client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency); } + public MeasureBulkWriteProcessor createMeasureBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) { + return this.client.buildMeasureWriteProcessor(maxBulkSize, flushInterval, concurrency); + } + @Override public void registerChecker(HealthChecker healthChecker) { this.healthChecker.register(healthChecker); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java index 26c6b2529a..6bff41e16f 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java @@ -57,7 +57,6 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDB import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBServiceLabelDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBTopologyQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBAlarmQueryDAO; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBatchDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBrowserLogQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingDataDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingScheduleQueryDAO; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index f0b88fe538..18767ae21e 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -67,7 +67,7 @@ import java.util.stream.Collectors; public enum MetadataRegistry { INSTANCE; - private static final String ID = "id"; + public static final String ID = "id"; private final Map<String, Schema> registry = new ConcurrentHashMap<>(); public NamedSchema<?> registerModel(Model model, ConfigService configService) { @@ -91,8 +91,7 @@ public enum MetadataRegistry { if (partialMetadata.getKind() == Kind.STREAM) { final Stream.Builder builder = Stream.create(partialMetadata.getGroup(), partialMetadata.getName()); if (entities.isEmpty()) { - log.warn("sharding keys of model[stream.{}] must not be empty", model.getName()); -// throw new IllegalStateException("sharding keys of model[" + model.getName() + "] must not be empty"); + throw new IllegalStateException("sharding keys of model[stream." + model.getName() + "] must not be empty"); } builder.setEntityRelativeTags(entities); builder.addTagFamilies(tagFamilySpecs); @@ -401,6 +400,7 @@ public enum MetadataRegistry { private final PartialMetadata metadata; @Singular private final Map<String, ColumnSpec> specs; + private final boolean useIdAsShardingKey; public ColumnSpec getSpec(String columnName) { return this.specs.get(columnName); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java similarity index 67% copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java index 5ae1102e38..3f965bff6f 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java @@ -16,18 +16,15 @@ * */ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.util; +package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; -import java.nio.ByteBuffer; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.skywalking.banyandb.v1.client.MeasureWrite; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; -public class ByteUtil { - private static final ThreadLocal<ByteBuffer> BYTE_BUFFER = ThreadLocal.withInitial(() -> ByteBuffer.allocate(8)); - - public static Double bytes2Double(byte[] bytes) { - return BYTE_BUFFER.get().put(bytes).getDouble(); - } - - public static byte[] double2Bytes(double number) { - return BYTE_BUFFER.get().putDouble(number).array(); - } -} +@RequiredArgsConstructor +@Getter +public class BanyanDBMeasureInsertRequest implements InsertRequest { + private final MeasureWrite measureWrite; +} \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java index 23828464da..23f096115b 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java @@ -18,10 +18,26 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.apache.skywalking.banyandb.v1.client.DataPoint; import org.apache.skywalking.banyandb.v1.client.MeasureQuery; import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse; +import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.oap.server.core.analysis.IDManager; +import org.apache.skywalking.oap.server.core.analysis.Layer; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType; +import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic; import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic; +import org.apache.skywalking.oap.server.core.query.enumeration.Language; +import org.apache.skywalking.oap.server.core.query.enumeration.ProfilingSupportStatus; +import org.apache.skywalking.oap.server.core.query.type.Attribute; import org.apache.skywalking.oap.server.core.query.type.Endpoint; import org.apache.skywalking.oap.server.core.query.type.Process; import org.apache.skywalking.oap.server.core.query.type.Service; @@ -29,13 +45,22 @@ import org.apache.skywalking.oap.server.core.query.type.ServiceInstance; import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic.PropertyUtil.LANGUAGE; public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMetadataQueryDAO { + private static final Gson GSON = new Gson(); + public BanyanDBMetadataQueryDAO(BanyanDBStorageClient client) { super(client); } @@ -43,7 +68,12 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe @Override public List<Service> listServices(String layer, String group) throws IOException { MeasureQueryResponse resp = query(ServiceTraffic.INDEX_NAME, - ImmutableSet.of(ServiceTraffic.NAME, ServiceTraffic.SHORT_NAME), + ImmutableSet.of(ServiceTraffic.NAME, + ServiceTraffic.SHORT_NAME, + ServiceTraffic.GROUP, + ServiceTraffic.LAYER, + ServiceTraffic.SERVICE_ID, + MetadataRegistry.ID), Collections.emptySet(), new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { @@ -56,41 +86,314 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe } }); - return Collections.emptyList(); + final List<Service> services = new ArrayList<>(); + + for (final DataPoint dataPoint : resp.getDataPoints()) { + services.add(buildService(dataPoint)); + } + + return services; } @Override public List<Service> getServices(String serviceId) throws IOException { - return Collections.emptyList(); + MeasureQueryResponse resp = query(ServiceTraffic.INDEX_NAME, + ImmutableSet.of(ServiceTraffic.NAME, + ServiceTraffic.SHORT_NAME, + ServiceTraffic.GROUP, + ServiceTraffic.LAYER, + ServiceTraffic.SERVICE_ID, + MetadataRegistry.ID), + Collections.emptySet(), new QueryBuilder<MeasureQuery>() { + @Override + protected void apply(MeasureQuery query) { + if (StringUtil.isNotEmpty(serviceId)) { + query.appendCondition(eq(ServiceTraffic.SERVICE_ID, serviceId)); + } + } + }); + + final List<Service> services = new ArrayList<>(); + + for (final DataPoint dataPoint : resp.getDataPoints()) { + services.add(buildService(dataPoint)); + } + + return services; } @Override public List<ServiceInstance> listInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException { - return Collections.emptyList(); + final long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp); + + MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME, + ImmutableSet.of(InstanceTraffic.NAME, + InstanceTraffic.LAYER, + InstanceTraffic.PROPERTIES, + InstanceTraffic.LAST_PING_TIME_BUCKET, + InstanceTraffic.SERVICE_ID, + MetadataRegistry.ID), + Collections.emptySet(), + new QueryBuilder<MeasureQuery>() { + @Override + protected void apply(MeasureQuery query) { + if (StringUtil.isNotEmpty(serviceId)) { + query.appendCondition(eq(InstanceTraffic.SERVICE_ID, serviceId)); + } + query.appendCondition(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket)); + } + }); + + final List<ServiceInstance> instances = new ArrayList<>(); + + for (final DataPoint dataPoint : resp.getDataPoints()) { + instances.add(buildInstance(dataPoint)); + } + + return instances; } @Override public ServiceInstance getInstance(String instanceId) throws IOException { - return null; + MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME, + ImmutableSet.of(InstanceTraffic.NAME, + InstanceTraffic.LAYER, + InstanceTraffic.PROPERTIES, + MetadataRegistry.ID), + Collections.emptySet(), + new QueryBuilder<MeasureQuery>() { + @Override + protected void apply(MeasureQuery query) { + if (StringUtil.isNotEmpty(instanceId)) { + query.appendCondition(PairQueryCondition.IDQueryCondition.eq(MetadataRegistry.ID, instanceId)); + } + } + }); + + return resp.size() > 0 ? buildInstance(resp.getDataPoints().get(0)) : null; } @Override public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit) throws IOException { - return Collections.emptyList(); + MeasureQueryResponse resp = query(EndpointTraffic.INDEX_NAME, + ImmutableSet.of(EndpointTraffic.NAME, + EndpointTraffic.SERVICE_ID, + MetadataRegistry.ID), + Collections.emptySet(), + new QueryBuilder<MeasureQuery>() { + @Override + protected void apply(MeasureQuery query) { + if (StringUtil.isNotEmpty(serviceId)) { + query.appendCondition(eq(EndpointTraffic.SERVICE_ID, serviceId)); + } + } + }); + + final List<Endpoint> endpoints = new ArrayList<>(); + + for (final DataPoint dataPoint : resp.getDataPoints()) { + endpoints.add(buildEndpoint(dataPoint)); + } + + if (StringUtil.isNotEmpty(serviceId)) { + return endpoints.stream().filter(e -> e.getName().contains(keyword)).collect(Collectors.toList()); + } + return endpoints; } @Override - public List<Process> listProcesses(String serviceId, String instanceId, String agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { - return Collections.emptyList(); + public List<Process> listProcesses(String serviceId, String instanceId, String agentId, ProfilingSupportStatus profilingSupportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { + MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME, + ImmutableSet.of(ProcessTraffic.NAME, + ProcessTraffic.SERVICE_ID, + ProcessTraffic.INSTANCE_ID, + ProcessTraffic.AGENT_ID, + ProcessTraffic.LAYER, + ProcessTraffic.DETECT_TYPE, + ProcessTraffic.PROPERTIES, + ProcessTraffic.LABELS_JSON, + ProcessTraffic.LAST_PING_TIME_BUCKET, + ProcessTraffic.PROFILING_SUPPORT_STATUS, + MetadataRegistry.ID), + Collections.emptySet(), + new QueryBuilder<MeasureQuery>() { + @Override + protected void apply(MeasureQuery query) { + if (StringUtil.isNotEmpty(serviceId)) { + query.appendCondition(eq(ProcessTraffic.SERVICE_ID, serviceId)); + } + if (StringUtil.isNotEmpty(instanceId)) { + query.appendCondition(eq(ProcessTraffic.INSTANCE_ID, instanceId)); + } + if (StringUtil.isNotEmpty(agentId)) { + query.appendCondition(eq(ProcessTraffic.AGENT_ID, instanceId)); + } + if (lastPingStartTimeBucket > 0) { + query.appendCondition(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket)); + } + if (lastPingEndTimeBucket > 0) { + query.appendCondition(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket)); + } + if (profilingSupportStatus != null) { + query.appendCondition(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value())); + } + } + }); + + final List<Process> processes = new ArrayList<>(); + + for (final DataPoint dataPoint : resp.getDataPoints()) { + processes.add(buildProcess(dataPoint)); + } + + return processes; } @Override - public long getProcessesCount(String serviceId, String instanceId, String agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { - return 0; + public long getProcessesCount(String serviceId, String instanceId, String agentId, ProfilingSupportStatus profilingSupportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { + MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME, + ImmutableSet.of(ProcessTraffic.NAME, + ProcessTraffic.SERVICE_ID, + ProcessTraffic.INSTANCE_ID, + ProcessTraffic.AGENT_ID, + ProcessTraffic.LAYER, + ProcessTraffic.DETECT_TYPE, + ProcessTraffic.PROPERTIES, + ProcessTraffic.LABELS_JSON, + ProcessTraffic.LAST_PING_TIME_BUCKET, + ProcessTraffic.PROFILING_SUPPORT_STATUS, + MetadataRegistry.ID), + Collections.emptySet(), + new QueryBuilder<MeasureQuery>() { + @Override + protected void apply(MeasureQuery query) { + if (StringUtil.isNotEmpty(serviceId)) { + query.appendCondition(eq(ProcessTraffic.SERVICE_ID, serviceId)); + } + if (StringUtil.isNotEmpty(instanceId)) { + query.appendCondition(eq(ProcessTraffic.INSTANCE_ID, instanceId)); + } + if (StringUtil.isNotEmpty(agentId)) { + query.appendCondition(eq(ProcessTraffic.AGENT_ID, instanceId)); + } + if (lastPingStartTimeBucket > 0) { + query.appendCondition(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket)); + } + if (lastPingEndTimeBucket > 0) { + query.appendCondition(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket)); + } + if (profilingSupportStatus != null) { + query.appendCondition(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value())); + } + } + }); + + return resp.getDataPoints() + .stream() + .collect(Collectors.groupingBy((Function<DataPoint, String>) dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES))) + .size(); } @Override public Process getProcess(String processId) throws IOException { - return null; + MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME, + ImmutableSet.of(ProcessTraffic.NAME, + ProcessTraffic.SERVICE_ID, + ProcessTraffic.INSTANCE_ID, + ProcessTraffic.AGENT_ID, + ProcessTraffic.LAYER, + ProcessTraffic.DETECT_TYPE, + ProcessTraffic.PROPERTIES, + ProcessTraffic.LABELS_JSON, + MetadataRegistry.ID), + Collections.emptySet(), + new QueryBuilder<MeasureQuery>() { + @Override + protected void apply(MeasureQuery query) { + if (StringUtil.isNotEmpty(processId)) { + query.appendCondition(PairQueryCondition.IDQueryCondition.eq(MetadataRegistry.ID, processId)); + } + } + }); + + return resp.size() > 0 ? buildProcess(resp.getDataPoints().get(0)) : null; + } + + private Service buildService(DataPoint dataPoint) { + Service service = new Service(); + service.setId(dataPoint.getTagValue(ServiceTraffic.SERVICE_ID)); + service.setName(dataPoint.getTagValue(ServiceTraffic.NAME)); + service.setShortName(dataPoint.getTagValue(ServiceTraffic.SHORT_NAME)); + service.setGroup(dataPoint.getTagValue(ServiceTraffic.GROUP)); + service.getLayers().add(Layer.valueOf(((Number) dataPoint.getTagValue(ServiceTraffic.LAYER)).intValue()).name()); + return service; + } + + private ServiceInstance buildInstance(DataPoint dataPoint) { + ServiceInstance serviceInstance = new ServiceInstance(); + serviceInstance.setId(dataPoint.getId()); + serviceInstance.setName(dataPoint.getTagValue(InstanceTraffic.NAME)); + serviceInstance.setInstanceUUID(dataPoint.getId()); + serviceInstance.setLayer(Layer.valueOf(((Number) dataPoint.getTagValue(InstanceTraffic.LAYER)).intValue()).name()); + + final String propString = dataPoint.getTagValue(InstanceTraffic.PROPERTIES); + JsonObject properties = null; + if (StringUtil.isNotEmpty(propString)) { + properties = GSON.fromJson(propString, JsonObject.class); + } + if (properties != null) { + for (Map.Entry<String, JsonElement> property : properties.entrySet()) { + String key = property.getKey(); + String value = property.getValue().getAsString(); + if (key.equals(LANGUAGE)) { + serviceInstance.setLanguage(Language.value(value)); + } else { + serviceInstance.getAttributes().add(new Attribute(key, value)); + } + } + } else { + serviceInstance.setLanguage(Language.UNKNOWN); + } + + return serviceInstance; + } + + private Endpoint buildEndpoint(DataPoint dataPoint) { + Endpoint endpoint = new Endpoint(); + endpoint.setId(dataPoint.getId()); + endpoint.setName(dataPoint.getTagValue(EndpointTraffic.NAME)); + return endpoint; + } + + private Process buildProcess(DataPoint dataPoint) { + Process process = new Process(); + + process.setId(dataPoint.getId()); + process.setName(dataPoint.getTagValue(ProcessTraffic.NAME)); + String serviceId = dataPoint.getTagValue(ProcessTraffic.SERVICE_ID); + process.setServiceId(serviceId); + process.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName()); + String instanceId = dataPoint.getTagValue(ProcessTraffic.INSTANCE_ID); + process.setInstanceId(instanceId); + process.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName()); + process.setLayer(Layer.valueOf(((Number) dataPoint.getTagValue(ProcessTraffic.LAYER)).intValue()).name()); + process.setAgentId(dataPoint.getTagValue(ProcessTraffic.AGENT_ID)); + process.setDetectType(ProcessDetectType.valueOf(((Number) dataPoint.getTagValue(ProcessTraffic.DETECT_TYPE)).intValue()).name()); + + String propString = dataPoint.getTagValue(ProcessTraffic.PROPERTIES); + if (!Strings.isNullOrEmpty(propString)) { + JsonObject properties = GSON.fromJson(propString, JsonObject.class); + for (Map.Entry<String, JsonElement> property : properties.entrySet()) { + String key = property.getKey(); + String value = property.getValue().getAsString(); + process.getAttributes().add(new Attribute(key, value)); + } + } + String labelJson = dataPoint.getTagValue(ProcessTraffic.LABELS_JSON); + if (!Strings.isNullOrEmpty(labelJson)) { + List<String> labels = GSON.<List<String>>fromJson(labelJson, ArrayList.class); + process.getLabels().addAll(labels); + } + return process; } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java similarity index 81% rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java index 20a300c3b0..fdb7950c0a 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java @@ -1,8 +1,9 @@ -package org.apache.skywalking.oap.server.storage.plugin.banyandb; +package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.banyandb.v1.client.MeasureWrite; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; @@ -11,6 +12,8 @@ import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; import java.io.IOException; import java.util.Collections; @@ -36,10 +39,10 @@ public class BanyanDBMetricsDAO implements IMetricsDAO { MeasureWrite measureWrite = new MeasureWrite(schema.getMetadata().getGroup(), // group name model.getName(), // index-name TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp - final Convert2Storage<MeasureWrite> toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite); + final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite); storageBuilder.entity2Storage(metrics, toStorage); - return new InsertRequest() { - }; + toStorage.acceptID(metrics.id()); + return new BanyanDBMeasureInsertRequest(toStorage.obtain()); } @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java index 55f2cf9ec7..ef1b7768a6 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java @@ -84,7 +84,7 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm for (final RowEntity rowEntity : resp.getElements()) { AlarmRecord.Builder builder = new AlarmRecord.Builder(); AlarmRecord alarmRecord = builder.storage2Entity( - new BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(AlarmRecord.INDEX_NAME), rowEntity) + new BanyanDBConverter.StorageToStream(AlarmRecord.INDEX_NAME, rowEntity) ); AlarmMessage message = new AlarmMessage(); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java index d60484b6f0..b3d8ed77e9 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java @@ -142,7 +142,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i List<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence); for (final RowEntity rowEntity : resp.getElements()) { ProfileThreadSnapshotRecord record = this.builder.storage2Entity( - new BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(ProfileThreadSnapshotRecord.INDEX_NAME), rowEntity)); + new BanyanDBConverter.StorageToStream(ProfileThreadSnapshotRecord.INDEX_NAME, rowEntity)); result.add(record); } @@ -201,7 +201,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i List<ProfileThreadSnapshotRecord> records = new ArrayList<>(); for (final RowEntity rowEntity : resp.getElements()) { ProfileThreadSnapshotRecord record = this.builder.storage2Entity( - new BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(ProfileThreadSnapshotRecord.INDEX_NAME), rowEntity)); + new BanyanDBConverter.StorageToStream(ProfileThreadSnapshotRecord.INDEX_NAME, rowEntity)); records.add(record); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java index 48443b9e51..1598dc73d1 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java @@ -46,7 +46,7 @@ public class BanyanDBRecordDAO implements IRecordDAO { model.getName(), // index-name record.id(), // identity TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling())); // timestamp - Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(streamWrite); + Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(schema, streamWrite); storageBuilder.entity2Storage(record, convert2Storage); return new BanyanDBStreamInsertRequest(convert2Storage.obtain()); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java index c3f75c8cef..76a78030e3 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java @@ -31,7 +31,7 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBManagementDAO; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBMetricsDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetricsDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBNoneStreamDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java index 44749dc0fb..2e3f4f71e7 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java @@ -168,7 +168,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace for (final RowEntity rowEntity : resp.getElements()) { SegmentRecord segmentRecord = new SegmentRecord.Builder().storage2Entity( - new BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(SegmentRecord.INDEX_NAME), rowEntity)); + new BanyanDBConverter.StorageToStream(SegmentRecord.INDEX_NAME, rowEntity)); segmentRecords.add(segmentRecord); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java index 5ae1102e38..cf35457b7a 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java @@ -18,16 +18,27 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.util; -import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; public class ByteUtil { - private static final ThreadLocal<ByteBuffer> BYTE_BUFFER = ThreadLocal.withInitial(() -> ByteBuffer.allocate(8)); + private static final ThreadLocal<ByteBuf> BYTE_BUFFER = ThreadLocal.withInitial(() -> Unpooled.buffer(8)); public static Double bytes2Double(byte[] bytes) { - return BYTE_BUFFER.get().put(bytes).getDouble(); + final ByteBuf buf = BYTE_BUFFER.get(); + try { + return buf.writeBytes(bytes).readDouble(); + } finally { + buf.clear(); + } } public static byte[] double2Bytes(double number) { - return BYTE_BUFFER.get().putDouble(number).array(); + final ByteBuf buf = BYTE_BUFFER.get(); + try { + return buf.writeDouble(number).array(); + } finally { + buf.clear(); + } } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtilTest.java similarity index 52% copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtilTest.java index 5ae1102e38..a041dc1454 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtilTest.java @@ -18,16 +18,27 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.util; -import java.nio.ByteBuffer; +import org.junit.Assert; +import org.junit.Test; -public class ByteUtil { - private static final ThreadLocal<ByteBuffer> BYTE_BUFFER = ThreadLocal.withInitial(() -> ByteBuffer.allocate(8)); - - public static Double bytes2Double(byte[] bytes) { - return BYTE_BUFFER.get().put(bytes).getDouble(); +public class ByteUtilTest { + @Test + public void testConvertDoubleAndBackOnce() { + double pi = 3.14159; + byte[] data = ByteUtil.double2Bytes(pi); + Assert.assertEquals(8, data.length); + Assert.assertEquals(pi, ByteUtil.bytes2Double(data), 0.00001); } - public static byte[] double2Bytes(double number) { - return BYTE_BUFFER.get().putDouble(number).array(); + @Test + public void testConvertDoubleAndBackTwice() { + double pi = 3.14159; + byte[] binaryPI = ByteUtil.double2Bytes(pi); + Assert.assertEquals(8, binaryPI.length); + Assert.assertEquals(pi, ByteUtil.bytes2Double(binaryPI), 0.00001); + double e = 2.71828; + byte[] binaryE = ByteUtil.double2Bytes(e); + Assert.assertEquals(8, binaryE.length); + Assert.assertEquals(e, ByteUtil.bytes2Double(binaryE), 0.00001); } }
