This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit cb705ed0cc8d014839f2f13078442307b6e2b9df Author: Megrez Lu <[email protected]> AuthorDate: Wed Dec 1 23:29:21 2021 +0800 add alarm, networkAddressAlias and logs --- .../banyandb/converter/ProfileTaskMapper.java | 28 --------- .../ProfileThreadSnapshotRecordMapper.java | 27 -------- .../banyandb/deserializer/AlarmMessageMapper.java | 51 +++++++++++++++ .../BasicTraceMapper.java | 2 +- .../deserializer/BrowserErrorLogMapper.java | 59 ++++++++++++++++++ .../DashboardConfigurationMapper.java | 2 +- .../plugin/banyandb/deserializer/EventMapper.java | 24 ++++++++ .../plugin/banyandb/deserializer/LogMapper.java | 61 ++++++++++++++++++ .../deserializer/NetworkAddressAliasMapper.java | 40 ++++++++++++ .../deserializer/ProfileTaskLogMapper.java | 39 ++++++++++++ .../banyandb/deserializer/ProfileTaskMapper.java | 42 +++++++++++++ .../ProfileThreadSnapshotRecordMapper.java | 36 +++++++++++ .../RowEntityMapper.java | 2 +- .../SegmentRecordMapper.java | 2 +- .../banyandb/stream/BanyanDBAlarmQueryDAO.java | 47 +++++++++++++- .../stream/BanyanDBBrowserLogQueryDAO.java | 50 ++++++++++++++- .../banyandb/stream/BanyanDBLogQueryDAO.java | 72 +++++++++++++++++++++- .../stream/BanyanDBNetworkAddressAliasDAO.java | 23 ++++++- .../stream/BanyanDBProfileTaskLogQueryDAO.java | 31 +++++++++- .../stream/BanyanDBProfileTaskQueryDAO.java | 32 +++++++--- .../BanyanDBProfileThreadSnapshotQueryDAO.java | 8 +-- .../banyandb/stream/BanyanDBTraceQueryDAO.java | 6 +- .../stream/BanyanDBUITemplateManagementDAO.java | 4 +- 23 files changed, 597 insertions(+), 91 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java deleted file mode 100644 index ea1ca09cb0..0000000000 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; - -import com.google.common.collect.ImmutableList; -import org.apache.skywalking.banyandb.v1.client.RowEntity; -import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord; -import org.apache.skywalking.oap.server.core.query.type.ProfileTask; - -import java.util.Collections; -import java.util.List; - -public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> { - @Override - public List<String> searchableProjection() { - return ImmutableList.of(ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME, - ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD, ProfileTaskRecord.DUMP_PERIOD, - ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT); - } - - @Override - public List<String> dataProjection() { - return Collections.emptyList(); - } - - @Override - public ProfileTask map(RowEntity row) { - return null; - } -} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java deleted file mode 100644 index 9a25688132..0000000000 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; - -import com.google.common.collect.ImmutableList; -import org.apache.skywalking.banyandb.v1.client.RowEntity; -import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord; - -import java.util.Collections; -import java.util.List; - -public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> { - @Override - public List<String> searchableProjection() { - return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, - ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE, - ProfileThreadSnapshotRecord.STACK_BINARY); - } - - @Override - public List<String> dataProjection() { - return Collections.emptyList(); - } - - @Override - public ProfileThreadSnapshotRecord map(RowEntity row) { - return null; - } -} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java new file mode 100644 index 0000000000..d906dbb78f --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java @@ -0,0 +1,51 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import lombok.RequiredArgsConstructor; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.alarm.AlarmRecord; +import org.apache.skywalking.oap.server.core.query.enumeration.Scope; +import org.apache.skywalking.oap.server.core.query.type.AlarmMessage; +import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO; + +import java.util.List; + +@RequiredArgsConstructor +public class AlarmMessageMapper implements RowEntityMapper<AlarmMessage> { + private final IAlarmQueryDAO alarmQueryDAO; + + @Override + public List<String> searchableProjection() { + return ImmutableList.of(AlarmRecord.SCOPE, // 0 + AlarmRecord.START_TIME); // 1 + } + + @Override + public List<String> dataProjection() { + return ImmutableList.of(AlarmRecord.ID0, // 0 + AlarmRecord.ID1, // 1 + AlarmRecord.ALARM_MESSAGE, // 2 + AlarmRecord.TAGS_RAW_DATA); // 3 + } + + @Override + public AlarmMessage map(RowEntity row) { + AlarmMessage alarmMessage = new AlarmMessage(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + int scopeID = ((Number) searchable.get(0).getValue()).intValue(); + alarmMessage.setScopeId(scopeID); + alarmMessage.setScope(Scope.Finder.valueOf(scopeID)); + alarmMessage.setStartTime(((Number) searchable.get(1).getValue()).longValue()); + final List<TagAndValue<?>> data = row.getTagFamilies().get(1); + alarmMessage.setId((String) data.get(0).getValue()); + alarmMessage.setId1((String) data.get(1).getValue()); + alarmMessage.setMessage((String) data.get(2).getValue()); + Object o = data.get(3).getValue(); + if (o instanceof ByteString && !((ByteString) o).isEmpty()) { + this.alarmQueryDAO.parserDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags()); + } + return alarmMessage; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java similarity index 98% rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java index b3512158f2..bbcf7091d0 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java @@ -1,4 +1,4 @@ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; import com.google.common.collect.ImmutableList; import org.apache.skywalking.banyandb.v1.client.RowEntity; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java new file mode 100644 index 0000000000..0807955376 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java @@ -0,0 +1,59 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord; +import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog; +import org.apache.skywalking.oap.server.core.query.type.ErrorCategory; + +import java.util.List; + +public class BrowserErrorLogMapper implements RowEntityMapper<BrowserErrorLog> { + @Override + public List<String> searchableProjection() { + return ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID, + BrowserErrorLogRecord.SERVICE_VERSION_ID, + BrowserErrorLogRecord.PAGE_PATH_ID, + BrowserErrorLogRecord.ERROR_CATEGORY, + BrowserErrorLogRecord.TIMESTAMP + ); + } + + @Override + public List<String> dataProjection() { + return ImmutableList.of(BrowserErrorLogRecord.DATA_BINARY); + } + + @Override + public BrowserErrorLog map(RowEntity row) { + // FIXME: use protobuf directly + BrowserErrorLog log = new BrowserErrorLog(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + log.setService((String) searchable.get(0).getValue()); + log.setServiceVersion((String) searchable.get(1).getValue()); + log.setPagePath((String) searchable.get(2).getValue()); + log.setCategory(ErrorCategory.valueOf((String) searchable.get(3).getValue())); + log.setTime(((Number) searchable.get(4).getValue()).longValue()); + final List<TagAndValue<?>> data = row.getTagFamilies().get(1); + Object o = data.get(0).getValue(); + if (o instanceof ByteString && !((ByteString) o).isEmpty()) { + try { + org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog + .parseFrom((ByteString) o); + log.setGrade(browserErrorLog.getGrade()); + log.setCol(browserErrorLog.getCol()); + log.setLine(browserErrorLog.getLine()); + log.setMessage(browserErrorLog.getMessage()); + log.setErrorUrl(browserErrorLog.getErrorUrl()); + log.setStack(browserErrorLog.getStack()); + log.setFirstReportedError(browserErrorLog.getFirstReportedError()); + } catch (InvalidProtocolBufferException ex) { + throw new RuntimeException("fail to parse proto buffer", ex); + } + } + return log; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java similarity index 99% rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java index 3f7df9cb72..6b9e0962e6 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java @@ -1,4 +1,4 @@ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; import com.google.common.collect.ImmutableList; import org.apache.skywalking.banyandb.v1.client.RowEntity; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java new file mode 100644 index 0000000000..1c94903cbd --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java @@ -0,0 +1,24 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.oap.server.core.query.type.event.Event; + +import java.util.List; + +public class EventMapper implements RowEntityMapper<Event> { + @Override + public List<String> searchableProjection() { + return ImmutableList.of(); + } + + @Override + public List<String> dataProjection() { + return null; + } + + @Override + public Event map(RowEntity row) { + return null; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java new file mode 100644 index 0000000000..f1c5aaf442 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java @@ -0,0 +1,61 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair; +import org.apache.skywalking.apm.network.logging.v3.LogTags; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord; +import org.apache.skywalking.oap.server.core.query.type.KeyValue; +import org.apache.skywalking.oap.server.core.query.type.Log; + +import java.util.List; + +public class LogMapper implements RowEntityMapper<Log> { + @Override + public List<String> searchableProjection() { + return ImmutableList.of( + AbstractLogRecord.SERVICE_ID, // 0 + AbstractLogRecord.SERVICE_INSTANCE_ID, // 1 + AbstractLogRecord.ENDPOINT_ID, // 2 + AbstractLogRecord.TRACE_ID, // 3 + AbstractLogRecord.TRACE_SEGMENT_ID, + AbstractLogRecord.SPAN_ID, + AbstractLogRecord.TIMESTAMP); // 6 + } + + @Override + public List<String> dataProjection() { + return ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, + AbstractLogRecord.CONTENT, + AbstractLogRecord.TAGS_RAW_DATA); // 2 + } + + @Override + public Log map(RowEntity row) { + Log log = new Log(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + log.setServiceId((String) searchable.get(0).getValue()); + log.setServiceInstanceId((String) searchable.get(1).getValue()); + log.setEndpointId((String) searchable.get(2).getValue()); + log.setTraceId((String) searchable.get(3).getValue()); + log.setTimestamp(((Number) searchable.get(6).getValue()).longValue()); + final List<TagAndValue<?>> data = row.getTagFamilies().get(1); + if (data.get(2).getValue() == null || ((ByteString) data.get(2).getValue()).isEmpty()) { + log.setContent(""); + } else { + try { + // Don't read the tags as they have been in the data binary already. + LogTags logTags = LogTags.parseFrom((ByteString) data.get(2).getValue()); + for (final KeyStringValuePair pair : logTags.getDataList()) { + log.getTags().add(new KeyValue(pair.getKey(), pair.getValue())); + } + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + return log; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java new file mode 100644 index 0000000000..aeb823c752 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java @@ -0,0 +1,40 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; + +import java.util.List; + +public class NetworkAddressAliasMapper implements RowEntityMapper<NetworkAddressAlias> { + @Override + public List<String> searchableProjection() { + return ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET); + } + + @Override + public List<String> dataProjection() { + // TODO: make these static fields public + return ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id"); + } + + @Override + public NetworkAddressAlias map(RowEntity row) { + NetworkAddressAlias model = new NetworkAddressAlias(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + // searchable - last_update_time_bucket + model.setLastUpdateTimeBucket(((Number) searchable.get(0).getValue()).longValue()); + final List<TagAndValue<?>> data = row.getTagFamilies().get(1); + // data - time_bucket + model.setTimeBucket(((Number) data.get(0).getValue()).longValue()); + // data - address + model.setAddress((String) data.get(1).getValue()); + // data - represent_service_id + model.setRepresentServiceId((String) data.get(2).getValue()); + // data - represent_service_instance_id + model.setRepresentServiceInstanceId((String) data.get(3).getValue()); + return model; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java new file mode 100644 index 0000000000..bc157253f0 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java @@ -0,0 +1,39 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord; +import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog; +import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType; + +import java.util.List; + +public class ProfileTaskLogMapper implements RowEntityMapper<ProfileTaskLog> { + @Override + public List<String> searchableProjection() { + return ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME); + } + + @Override + public List<String> dataProjection() { + return ImmutableList.of(ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.INSTANCE_ID, + ProfileTaskLogRecord.OPERATION_TYPE); + } + + @Override + public ProfileTaskLog map(RowEntity row) { + ProfileTaskLog profileTaskLog = new ProfileTaskLog(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + // searchable - operation_time + profileTaskLog.setOperationTime(((Number) searchable.get(0).getValue()).longValue()); + final List<TagAndValue<?>> data = row.getTagFamilies().get(1); + // searchable - task_id + profileTaskLog.setTaskId((String) data.get(0).getValue()); + // searchable - instance_id + profileTaskLog.setInstanceId((String) data.get(1).getValue()); + // searchable - operation_type + profileTaskLog.setOperationType(ProfileTaskLogOperationType.parse(((Number) data.get(2).getValue()).intValue())); + return profileTaskLog; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java new file mode 100644 index 0000000000..8a49404d23 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java @@ -0,0 +1,42 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord; +import org.apache.skywalking.oap.server.core.query.type.ProfileTask; + +import java.util.Collections; +import java.util.List; + +public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> { + public static final String ID = "profile_task_query_id"; + + @Override + public List<String> searchableProjection() { + return ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME, + ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD, + ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT); + } + + @Override + public List<String> dataProjection() { + return Collections.emptyList(); + } + + @Override + public ProfileTask map(RowEntity row) { + ProfileTask profileTask = new ProfileTask(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + profileTask.setId((String) searchable.get(0).getValue()); + profileTask.setServiceId((String) searchable.get(1).getValue()); + profileTask.setEndpointName((String) searchable.get(2).getValue()); + profileTask.setStartTime(((Number) searchable.get(3).getValue()).longValue()); + profileTask.setDuration(((Number) searchable.get(4).getValue()).intValue()); + profileTask.setMinDurationThreshold(((Number) searchable.get(5).getValue()).intValue()); + profileTask.setDumpPeriod(((Number) searchable.get(6).getValue()).intValue()); + profileTask.setCreateTime(((Number) searchable.get(7).getValue()).intValue()); + profileTask.setMaxSamplingCount(((Number) searchable.get(8).getValue()).intValue()); + return null; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java new file mode 100644 index 0000000000..99f23117f2 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java @@ -0,0 +1,36 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord; + +import java.util.Collections; +import java.util.List; + +public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> { + @Override + public List<String> searchableProjection() { + return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, + ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE); + } + + @Override + public List<String> dataProjection() { + return Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY); + } + + @Override + public ProfileThreadSnapshotRecord map(RowEntity row) { + ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + record.setTaskId((String) searchable.get(0).getValue()); + record.setSegmentId((String) searchable.get(1).getValue()); + record.setDumpTime(((Number) searchable.get(2).getValue()).longValue()); + record.setSequence(((Number) searchable.get(3).getValue()).intValue()); + final List<TagAndValue<?>> data = row.getTagFamilies().get(1); + record.setStackBinary(((ByteString) data.get(0).getValue()).toByteArray()); + return record; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java similarity index 95% rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java index b2facb3373..cc1d48d94f 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java @@ -1,4 +1,4 @@ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; import org.apache.skywalking.banyandb.v1.client.RowEntity; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java similarity index 99% rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java index 6ea579ff55..70318136f6 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java @@ -1,4 +1,4 @@ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java index c7bc9cc13d..7e122611bf 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java @@ -1,19 +1,62 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; +import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.oap.server.core.alarm.AlarmRecord; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; +import org.apache.skywalking.oap.server.core.query.type.AlarmMessage; import org.apache.skywalking.oap.server.core.query.type.Alarms; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.AlarmMessageMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import java.io.IOException; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream, * which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage} */ -public class BanyanDBAlarmQueryDAO implements IAlarmQueryDAO { +public class BanyanDBAlarmQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IAlarmQueryDAO { + private final RowEntityMapper<AlarmMessage> mapper; + + public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) { + super(client); + mapper = new AlarmMessageMapper(this); + } + @Override public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException { - return new Alarms(); + final StreamQuery query = new StreamQuery(AlarmRecord.INDEX_NAME, mapper.searchableProjection()); + + if (Objects.nonNull(scopeId)) { + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AlarmRecord.SCOPE, (long) scopeId)); + } + if (startTB != 0 && endTB != 0) { + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(startTB))); + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(endTB))); + } + + // TODO: support keyword search + + // TODO: support tag search + + query.setLimit(limit); + query.setOffset(from); + + StreamQueryResponse resp = getClient().query(query); + + List<AlarmMessage> messages = resp.getElements().stream().map(mapper::map).collect(Collectors.toList()); + + Alarms alarms = new Alarms(); + alarms.setTotal(messages.size()); + alarms.getMsgs().addAll(messages); + return alarms; } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java index 7607668eba..25f2c27f4b 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java @@ -1,17 +1,63 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; +import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord; import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory; +import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog; import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO; +import org.apache.skywalking.oap.server.library.util.StringUtil; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BrowserErrorLogMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import java.io.IOException; +import java.util.Objects; +import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream */ -public class BanyanDBBrowserLogQueryDAO implements IBrowserLogQueryDAO { +public class BanyanDBBrowserLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IBrowserLogQueryDAO { + private static final RowEntityMapper<BrowserErrorLog> MAPPER = new BrowserErrorLogMapper(); + + public BanyanDBBrowserLogQueryDAO(BanyanDBStorageClient client) { + super(client); + } + @Override public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId, BrowserErrorCategory category, long startSecondTB, long endSecondTB, int limit, int from) throws IOException { - return new BrowserErrorLogs(); + final StreamQuery query = new StreamQuery(BrowserErrorLogRecord.INDEX_NAME, MAPPER.searchableProjection()); + query.setDataProjections(MAPPER.dataProjection()); + + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_ID, serviceId)); + + if (startSecondTB != 0 && endSecondTB != 0) { + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startSecondTB))); + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endSecondTB))); + } + if (StringUtil.isNotEmpty(serviceVersionId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId)); + } + if (StringUtil.isNotEmpty(pagePathId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId)); + } + if (Objects.nonNull(category)) { + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", BrowserErrorLogRecord.ERROR_CATEGORY, (long) category.getValue())); + } + + query.setOffset(from); + query.setLimit(limit); + + final StreamQueryResponse resp = getClient().query(query); + + final BrowserErrorLogs logs = new BrowserErrorLogs(); + logs.getLogs().addAll(resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList())); + logs.setTotal(logs.getLogs().size()); + return logs; } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java index baba93b886..661f65b930 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java @@ -1,20 +1,86 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; +import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; import org.apache.skywalking.oap.server.core.query.enumeration.Order; import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition; +import org.apache.skywalking.oap.server.core.query.type.Log; import org.apache.skywalking.oap.server.core.query.type.Logs; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO; +import org.apache.skywalking.oap.server.library.util.StringUtil; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.LogMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import java.io.IOException; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream */ -public class BanyanDBLogQueryDAO implements ILogQueryDAO { +public class BanyanDBLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ILogQueryDAO { + private static final RowEntityMapper<Log> MAPPER = new LogMapper(); + + public BanyanDBLogQueryDAO(BanyanDBStorageClient client) { + super(client); + } + @Override - public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId, TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit, long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent, List<String> excludingKeywordsOfContent) throws IOException { - return new Logs(); + public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId, + TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit, + long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent, + List<String> excludingKeywordsOfContent) throws IOException { + final StreamQuery query = new StreamQuery(LogRecord.INDEX_NAME, MAPPER.searchableProjection()); + if (StringUtil.isNotEmpty(serviceId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_ID, serviceId)); + } + + if (startTB != 0 && endTB != 0) { + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startTB))); + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endTB))); + } + + if (StringUtil.isNotEmpty(serviceInstanceId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId)); + } + if (StringUtil.isNotEmpty(endpointId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.ENDPOINT_ID, endpointId)); + } + if (Objects.nonNull(relatedTrace)) { + if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId())); + } + if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId())); + } + if (Objects.nonNull(relatedTrace.getSpanId())) { + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId())); + } + } + + // TODO: if we allow to index tags? +// if (CollectionUtils.isNotEmpty(tags)) { +// for (final Tag tag : tags) { +// query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue())); +// } +// } + + StreamQueryResponse resp = getClient().query(query); + + List<Log> entities = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); + + Logs logs = new Logs(); + logs.getLogs().addAll(entities); + logs.setTotal(entities.size()); + + return logs; } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java index 10675a49a1..337c288e96 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java @@ -1,17 +1,34 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; +import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.NetworkAddressAliasMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; -import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** * {@link NetworkAddressAlias} is a stream */ -public class BanyanDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO { +public class BanyanDBNetworkAddressAliasDAO extends AbstractDAO<BanyanDBStorageClient> implements INetworkAddressAliasDAO { + private static final RowEntityMapper<NetworkAddressAlias> MAPPER = new NetworkAddressAliasMapper(); + + public BanyanDBNetworkAddressAliasDAO(BanyanDBStorageClient client) { + super(client); + } + @Override public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) { - return Collections.emptyList(); + StreamQuery query = new StreamQuery(NetworkAddressAlias.INDEX_NAME, MAPPER.searchableProjection()); + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket)); + + StreamQueryResponse resp = getClient().query(query); + return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java index 2b6290e2d2..6c19e2f701 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java @@ -1,18 +1,43 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord; import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileTaskLogMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import java.io.IOException; -import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream */ -public class BanyanDBProfileTaskLogQueryDAO implements IProfileTaskLogQueryDAO { +public class BanyanDBProfileTaskLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskLogQueryDAO { + private static final RowEntityMapper<ProfileTaskLog> MAPPER = new ProfileTaskLogMapper(); + + private final int queryMaxSize; + + public BanyanDBProfileTaskLogQueryDAO(BanyanDBStorageClient client, int queryMaxSize) { + super(client); + this.queryMaxSize = queryMaxSize; + } + @Override public List<ProfileTaskLog> getTaskLogList() throws IOException { - return Collections.emptyList(); + final StreamQuery query = new StreamQuery(ProfileTaskLogRecord.INDEX_NAME, MAPPER.searchableProjection()); + query.setDataProjections(MAPPER.dataProjection()); + query.setLimit(this.queryMaxSize); + + StreamQueryResponse resp = getClient().query(query); + return resp.getElements().stream() + .map(MAPPER::map) + .sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime)) + .collect(Collectors.toList()); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java index 34af74eb93..67f28ce62f 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java @@ -10,8 +10,8 @@ import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileTaskMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileTaskMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import java.io.IOException; import java.util.List; @@ -34,34 +34,46 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClie query.setDataProjections(MAPPER.dataProjection()); if (StringUtil.isNotEmpty(serviceId)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.SERVICE_ID, serviceId)); + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", + ProfileTaskRecord.SERVICE_ID, serviceId)); } if (StringUtil.isNotEmpty(endpointName)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.ENDPOINT_NAME, endpointName)); + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", + ProfileTaskRecord.ENDPOINT_NAME, endpointName)); } if (Objects.nonNull(startTimeBucket)) { - query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket))); + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", + ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket))); } if (Objects.nonNull(endTimeBucket)) { - query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket))); + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", + ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket))); } - // TODO: why delete? - if (Objects.nonNull(limit)) { query.setLimit(limit); } + query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC)); + StreamQueryResponse resp = getClient().query(query); return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); } @Override public ProfileTask getById(String id) throws IOException { - // TODO: support id query - throw new UnsupportedOperationException("element id get is not supported"); + if (StringUtil.isEmpty(id)) { + return null; + } + + final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection()); + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskMapper.ID, id)); + query.setLimit(1); + + StreamQueryResponse resp = getClient().query(query); + return resp.getElements().stream().map(MAPPER::map).findAny().orElse(null); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java index 456636f287..cb5067e890 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java @@ -9,10 +9,10 @@ import org.apache.skywalking.oap.server.core.query.type.BasicTrace; import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileThreadSnapshotRecordMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileThreadSnapshotRecordMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper; import java.io.IOException; import java.util.ArrayList; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java index a37cfbb54c..838e8a1eb3 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java @@ -31,9 +31,9 @@ import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java index a4875e65f1..33a69a969c 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java @@ -13,8 +13,8 @@ import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO; import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.DashboardConfigurationMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.DashboardConfigurationMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import java.io.IOException; import java.util.List;
