This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 896223ab37b634dca2676ebafc13c795cc836dfa Author: Megrez Lu <[email protected]> AuthorDate: Sun Nov 28 23:08:31 2021 +0800 start from new code base and copy all previous stream impl --- oap-server/server-storage-plugin/pom.xml | 1 + .../{ => storage-banyandb-plugin}/pom.xml | 40 +++-- .../storage/plugin/banyandb/BanyanDBSchema.java | 66 ++++++++ .../plugin/banyandb/BanyanDBStorageClient.java | 55 ++++++ .../plugin/banyandb/BanyanDBStorageConfig.java | 47 ++++++ .../plugin/banyandb/BanyanDBStorageProvider.java | 89 ++++++++++ .../banyandb/stream/BanyanDBAlarmQueryDAO.java | 19 +++ .../plugin/banyandb/stream/BanyanDBBatchDAO.java | 75 +++++++++ .../stream/BanyanDBBrowserLogQueryDAO.java | 17 ++ .../banyandb/stream/BanyanDBEventQueryDAO.java | 23 +++ .../banyandb/stream/BanyanDBLogQueryDAO.java | 20 +++ .../banyandb/stream/BanyanDBMetadataQueryDAO.java | 55 ++++++ .../stream/BanyanDBNetworkAddressAliasDAO.java | 17 ++ .../stream/BanyanDBProfileTaskLogQueryDAO.java | 18 ++ .../stream/BanyanDBProfileTaskQueryDAO.java | 23 +++ .../BanyanDBProfileThreadSnapshotQueryDAO.java | 40 +++++ .../plugin/banyandb/stream/BanyanDBRecordDAO.java | 79 +++++++++ .../stream/BanyanDBSegmentRecordBuilder.java | 58 +++++++ .../plugin/banyandb/stream/BanyanDBStorageDAO.java | 85 ++++++++++ .../stream/BanyanDBStreamInsertRequest.java | 31 ++++ .../banyandb/stream/BanyanDBTraceQueryDAO.java | 186 +++++++++++++++++++++ .../stream/BanyanDBUITemplateManagementDAO.java | 35 ++++ ...alking.oap.server.library.module.ModuleProvider | 19 +++ 23 files changed, 1086 insertions(+), 12 deletions(-) diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/pom.xml index fc7f896467..82e3353f4f 100644 --- a/oap-server/server-storage-plugin/pom.xml +++ b/oap-server/server-storage-plugin/pom.xml @@ -34,5 +34,6 @@ <module>storage-influxdb-plugin</module> <module>storage-tidb-plugin</module> <module>storage-iotdb-plugin</module> + <module>storage-banyandb-plugin</module> </modules> </project> diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml similarity index 53% copy from oap-server/server-storage-plugin/pom.xml copy to oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml index fc7f896467..76848c1b4b 100644 --- a/oap-server/server-storage-plugin/pom.xml +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml @@ -19,20 +19,36 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <artifactId>oap-server</artifactId> + <artifactId>server-storage-plugin</artifactId> <groupId>org.apache.skywalking</groupId> - <version>9.1.0-SNAPSHOT</version> + <version>8.9.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>server-storage-plugin</artifactId> - <packaging>pom</packaging> - <modules> - <module>storage-jdbc-hikaricp-plugin</module> - <module>storage-elasticsearch-plugin</module> - <module>storage-zipkin-elasticsearch-plugin</module> - <module>storage-influxdb-plugin</module> - <module>storage-tidb-plugin</module> - <module>storage-iotdb-plugin</module> - </modules> + <artifactId>storage-banyandb-plugin</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>server-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>library-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>library-datacarrier-queue</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>banyandb-java-client</artifactId> + <version>0.1.0-SNAPSHOT</version> + </dependency> + </dependencies> + </project> diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBSchema.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBSchema.java new file mode 100644 index 0000000000..3ebf87776b --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBSchema.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import lombok.Getter; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +public class BanyanDBSchema { + public static final String NAME = "sw"; + public static final String GROUP = "default"; + public static final List<String> FIELD_NAMES; + + public static final Set<String> INDEX_FIELDS = ImmutableSet.of("http.method", "status_code", "db.type", + "db.instance", "mq.queue", "mq.topic", "mq.broker"); + + static { + Set<String> fields = new LinkedHashSet<>(); + fields.add("trace_id"); + fields.add("state"); + fields.add("service_id"); + fields.add("service_instance_id"); + fields.add("endpoint_id"); + fields.add("duration"); + fields.add("start_time"); + fields.add("http.method"); + fields.add("status_code"); + fields.add("db.type"); + fields.add("db.instance"); + fields.add("mq.queue"); + fields.add("mq.topic"); + fields.add("mq.broker"); + FIELD_NAMES = ImmutableList.copyOf(fields); + } + + public enum TraceState { + ALL(0), SUCCESS(1), ERROR(2); + + @Getter + private final int state; + + TraceState(int state) { + this.state = state; + } + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java new file mode 100644 index 0000000000..eed4386e57 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java @@ -0,0 +1,55 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb; + +import org.apache.skywalking.banyandb.v1.client.BanyanDBClient; +import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.oap.server.library.client.Client; +import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker; +import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable; +import org.apache.skywalking.oap.server.library.util.HealthChecker; + +import java.io.IOException; + +/** + * BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient}, + * which implement {@link Client} and {@link HealthCheckable}. + */ +public class BanyanDBStorageClient implements Client, HealthCheckable { + private final BanyanDBClient client; + private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker(); + + public BanyanDBStorageClient(String host, int port, String group) { + this.client = new BanyanDBClient(host, port, group); + } + + @Override + public void connect() throws Exception { + this.client.connect(); + } + + @Override + public void shutdown() throws IOException { + this.client.close(); + } + + public StreamQueryResponse query(StreamQuery streamQuery) { + try { + StreamQueryResponse response = this.client.queryStreams(streamQuery); + this.healthChecker.health(); + return response; + } catch (Throwable t) { + healthChecker.unHealth(t); + throw t; + } + } + + public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) { + return this.client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency); + } + + @Override + public void registerChecker(HealthChecker healthChecker) { + this.healthChecker.register(healthChecker); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java new file mode 100644 index 0000000000..c37aed1162 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb; + +import lombok.Getter; +import lombok.Setter; +import org.apache.skywalking.oap.server.library.module.ModuleConfig; + +@Getter +@Setter +public class BanyanDBStorageConfig extends ModuleConfig { + private String host = "127.0.0.1"; + private int port = 17912; + /** + * Group of the schema in BanyanDB + */ + private String group = "default"; + + /** + * The maximum size of write entities in a single batch write call. + */ + private int maxBulkSize = 5000; + /** + * Period of flush interval. In the timeunit of seconds. + */ + private int flushInterval = 15; + /** + * Concurrent consumer threads for batch writing. + */ + private int concurrentWriteThreads = 2; +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java new file mode 100644 index 0000000000..c4aa1016ca --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb; + +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.config.ConfigService; +import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory; +import org.apache.skywalking.oap.server.core.storage.StorageModule; +import org.apache.skywalking.oap.server.library.module.*; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; + +public class BanyanDBStorageProvider extends ModuleProvider { + private BanyanDBStorageConfig config; + private BanyanDBStorageClient client; + + public BanyanDBStorageProvider() { + this.config = new BanyanDBStorageConfig(); + } + + @Override + public String name() { + return "banyandb"; + } + + @Override + public Class<? extends ModuleDefine> module() { + return StorageModule.class; + } + + @Override + public ModuleConfig createConfigBeanIfAbsent() { + return config; + } + + @Override + public void prepare() throws ServiceNotProvidedException, ModuleStartException { + this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default()); + + this.client = new BanyanDBStorageClient(config.getHost(), config.getPort(), config.getGroup()); + } + + @Override + public void start() throws ServiceNotProvidedException, ModuleStartException { + final ConfigService configService = getManager().find(CoreModule.NAME) + .provider() + .getService(ConfigService.class); + + MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME) + .provider() + .getService(MetricsCreator.class); + HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge( + "storage_banyandb", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); + this.client.registerChecker(healthChecker); + try { + this.client.connect(); + } catch (Exception e) { + throw new ModuleStartException(e.getMessage(), e); + } + } + + @Override + public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { + + } + + @Override + public String[] requiredModules() { + return new String[]{CoreModule.NAME}; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java new file mode 100644 index 0000000000..c7bc9cc13d --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java @@ -0,0 +1,19 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; +import org.apache.skywalking.oap.server.core.query.type.Alarms; +import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO; + +import java.io.IOException; +import java.util.List; + +/** + * {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream, + * which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage} + */ +public class BanyanDBAlarmQueryDAO implements IAlarmQueryDAO { + @Override + public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException { + return new Alarms(); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java new file mode 100644 index 0000000000..9404996887 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; +import org.apache.skywalking.oap.server.core.storage.IBatchDAO; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO { + private StreamBulkWriteProcessor bulkProcessor; + + private final int maxBulkSize; + private final int flushInterval; + private final int concurrency; + + public BanyanDBBatchDAO(BanyanDBStorageClient client, int maxBulkSize, int flushInterval, int concurrency) { + super(client); + this.maxBulkSize = maxBulkSize; + this.flushInterval = flushInterval; + this.concurrency = concurrency; + } + + @Override + public void insert(InsertRequest insertRequest) { + if (bulkProcessor == null) { + this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize, flushInterval, concurrency); + } + + if (insertRequest instanceof BanyanDBStreamInsertRequest) { + this.bulkProcessor.add(((BanyanDBStreamInsertRequest) insertRequest).getStreamWrite()); + } + } + + @Override + public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) { + if (bulkProcessor == null) { + this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize, flushInterval, concurrency); + } + + if (CollectionUtils.isNotEmpty(prepareRequests)) { + return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> { + if (prepareRequest instanceof InsertRequest) { + // TODO: return CompletableFuture<Void> + this.bulkProcessor.add(((BanyanDBStreamInsertRequest) prepareRequest).getStreamWrite()); + } + return CompletableFuture.completedFuture(null); + }).toArray(CompletableFuture[]::new)); + } + + return CompletableFuture.completedFuture(null); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java new file mode 100644 index 0000000000..7607668eba --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java @@ -0,0 +1,17 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory; +import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs; +import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO; + +import java.io.IOException; + +/** + * {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream + */ +public class BanyanDBBrowserLogQueryDAO implements IBrowserLogQueryDAO { + @Override + public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId, BrowserErrorCategory category, long startSecondTB, long endSecondTB, int limit, int from) throws IOException { + return new BrowserErrorLogs(); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java new file mode 100644 index 0000000000..5ab67d95c6 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java @@ -0,0 +1,23 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition; +import org.apache.skywalking.oap.server.core.query.type.event.Events; +import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO; + +import java.util.List; + +/** + * ??? + * {@link org.apache.skywalking.oap.server.core.source.Event} is a stream + */ +public class BanyanDBEventQueryDAO implements IEventQueryDAO { + @Override + public Events queryEvents(EventQueryCondition condition) throws Exception { + return new Events(); + } + + @Override + public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception { + return new Events(); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java new file mode 100644 index 0000000000..baba93b886 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java @@ -0,0 +1,20 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; +import org.apache.skywalking.oap.server.core.query.enumeration.Order; +import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition; +import org.apache.skywalking.oap.server.core.query.type.Logs; +import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO; + +import java.io.IOException; +import java.util.List; + +/** + * {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream + */ +public class BanyanDBLogQueryDAO implements ILogQueryDAO { + @Override + public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId, TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit, long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent, List<String> excludingKeywordsOfContent) throws IOException { + return new Logs(); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java new file mode 100644 index 0000000000..678b981f16 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java @@ -0,0 +1,55 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.oap.server.core.analysis.NodeType; +import org.apache.skywalking.oap.server.core.query.type.Database; +import org.apache.skywalking.oap.server.core.query.type.Endpoint; +import org.apache.skywalking.oap.server.core.query.type.Service; +import org.apache.skywalking.oap.server.core.query.type.ServiceInstance; +import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * {@link org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic}, + * {@link org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic} + * {@link org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic} + * are all streams. + */ +public class BanyanDBMetadataQueryDAO implements IMetadataQueryDAO { + @Override + public List<Service> getAllServices(String group) throws IOException { + return Collections.emptyList(); + } + + @Override + public List<Service> getAllBrowserServices() throws IOException { + return Collections.emptyList(); + } + + @Override + public List<Database> getAllDatabases() throws IOException { + return Collections.emptyList(); + } + + @Override + public List<Service> searchServices(NodeType nodeType, String keyword) throws IOException { + return null; + } + + @Override + public Service searchService(NodeType nodeType, String serviceCode) throws IOException { + return null; + } + + @Override + public List<Endpoint> searchEndpoint(String keyword, String serviceId, int limit) throws IOException { + return Collections.emptyList(); + } + + @Override + public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException { + return Collections.emptyList(); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java new file mode 100644 index 0000000000..10675a49a1 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java @@ -0,0 +1,17 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias; +import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; + +import java.util.Collections; +import java.util.List; + +/** + * {@link NetworkAddressAlias} is a stream + */ +public class BanyanDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO { + @Override + public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) { + return Collections.emptyList(); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java new file mode 100644 index 0000000000..2b6290e2d2 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java @@ -0,0 +1,18 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog; +import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream + */ +public class BanyanDBProfileTaskLogQueryDAO implements IProfileTaskLogQueryDAO { + @Override + public List<ProfileTaskLog> getTaskLogList() throws IOException { + return Collections.emptyList(); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java new file mode 100644 index 0000000000..c99d267cbe --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java @@ -0,0 +1,23 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.oap.server.core.query.type.ProfileTask; +import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream + */ +public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO { + @Override + public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException { + return Collections.emptyList(); + } + + @Override + public ProfileTask getById(String id) throws IOException { + return null; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java new file mode 100644 index 0000000000..8a27e885fe --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java @@ -0,0 +1,40 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord; +import org.apache.skywalking.oap.server.core.query.type.BasicTrace; +import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * {@link ProfileThreadSnapshotRecord} is a stream + */ +public class BanyanDBProfileThreadSnapshotQueryDAO implements IProfileThreadSnapshotQueryDAO { + @Override + public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException { + return Collections.emptyList(); + } + + @Override + public int queryMinSequence(String segmentId, long start, long end) throws IOException { + return 0; + } + + @Override + public int queryMaxSequence(String segmentId, long start, long end) throws IOException { + return 0; + } + + @Override + public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException { + return Collections.emptyList(); + } + + @Override + public SegmentRecord getProfiledSegment(String segmentId) throws IOException { + return null; + } +} \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java new file mode 100644 index 0000000000..f59bc25f9e --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import lombok.RequiredArgsConstructor; +import org.apache.skywalking.banyandb.v1.Banyandb; +import org.apache.skywalking.banyandb.v1.client.SerializableTag; +import org.apache.skywalking.banyandb.v1.client.StreamWrite; +import org.apache.skywalking.banyandb.v1.client.Tag; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.storage.IRecordDAO; +import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@RequiredArgsConstructor +public class BanyanDBRecordDAO implements IRecordDAO { + private final StorageHashMapBuilder<Record> storageBuilder; + + @Override + public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException { + if (SegmentRecord.INDEX_NAME.equals(model.getName())) { + SegmentRecord segmentRecord = (SegmentRecord) record; + StreamWrite streamWrite = StreamWrite.builder() + .name(BanyanDBSchema.NAME) + .binary(segmentRecord.getDataBinary()) + .timestamp(segmentRecord.getStartTime()) + .elementId(segmentRecord.getSegmentId()) + .tags(buildFieldObjects(this.storageBuilder.entity2Storage(segmentRecord))) + .build(); + return new BanyanDBStreamInsertRequest(streamWrite); + } + // TODO: support other stream types + return new InsertRequest() { + }; + } + + /** + * Convert storageEntity in Map to an ordered list of {@link SerializableTag} + * + * @param segmentRecordMap which comes from {@link SegmentRecord} + * @return an ordered list of {@link SerializableTag} which is accepted by BanyanDB Client + */ + static List<SerializableTag<Banyandb.TagValue>> buildFieldObjects(Map<String, Object> segmentRecordMap) { + List<SerializableTag<Banyandb.TagValue>> tagList = new ArrayList<>(BanyanDBSchema.FIELD_NAMES.size()); + for (String fieldName : BanyanDBSchema.FIELD_NAMES) { + Object val = segmentRecordMap.get(fieldName); + if (val == null) { + tagList.add(Tag.nullField()); + } else { + tagList.add((SerializableTag<Banyandb.TagValue>) val); + } + } + return tagList; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSegmentRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSegmentRecordBuilder.java new file mode 100644 index 0000000000..b8c4744380 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSegmentRecordBuilder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder; + +import java.util.HashMap; +import java.util.Map; + +public class BanyanDBSegmentRecordBuilder implements StorageHashMapBuilder<Record> { + @Override + public SegmentRecord storage2Entity(Map<String, Object> dbMap) { + return new SegmentRecord(); + } + + /** + * Map SegmentRecord to Skywalking-BanyanDB compatible Map with indexed tags and + * without binaryData, entityId + */ + @Override + public Map<String, Object> entity2Storage(Record record) { + final SegmentRecord segmentRecord = (SegmentRecord) record; + Map<String, Object> map = new HashMap<>(); + map.put(SegmentRecord.TRACE_ID, TagAndValue.stringField(segmentRecord.getTraceId())); + map.put(SegmentRecord.SERVICE_ID, TagAndValue.stringField(segmentRecord.getServiceId())); + map.put(SegmentRecord.SERVICE_INSTANCE_ID, TagAndValue.stringField(segmentRecord.getServiceInstanceId())); + map.put(SegmentRecord.ENDPOINT_ID, TagAndValue.stringField(segmentRecord.getEndpointId())); + map.put(SegmentRecord.START_TIME, TagAndValue.longField(segmentRecord.getStartTime())); + map.put("duration", TagAndValue.longField(segmentRecord.getLatency())); + map.put("state", TagAndValue.longField(segmentRecord.getIsError())); + if (segmentRecord.getTagsRawData() != null) { + for (final Tag tag : segmentRecord.getTagsRawData()) { + map.put(tag.getKey().toLowerCase(), TagAndValue.stringField(tag.getValue())); + } + } + return map; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java new file mode 100644 index 0000000000..f5ee5fb255 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@Slf4j +public class BanyanDBStorageDAO implements StorageDAO { + @Override + public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) { + return new IMetricsDAO() { + @Override + public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException { + return Collections.emptyList(); + } + + @Override + public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException { + return new InsertRequest() { + }; + } + + @Override + public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException { + return new UpdateRequest() { + }; + } + }; + } + + @Override + public IRecordDAO newRecordDao(StorageBuilder storageBuilder) { + try { + if (SegmentRecord.class.equals(storageBuilder.getClass().getMethod("storage2Entity", Map.class).getReturnType())) { + return new BanyanDBRecordDAO(new BanyanDBSegmentRecordBuilder()); + } else { + return (model, record) -> new InsertRequest() { + }; + } + } catch (NoSuchMethodException noSuchMethodException) { + log.error("cannot find method storage2Entity", noSuchMethodException); + throw new RuntimeException("cannot find method storage2Entity"); + } + } + + @Override + public INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder) { + return (model, noneStream) -> { + }; + } + + @Override + public IManagementDAO newManagementDao(StorageBuilder storageBuilder) { + return (model, storageData) -> { + }; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java new file mode 100644 index 0000000000..aef8ec94e7 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.skywalking.banyandb.v1.client.StreamWrite; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; + +@RequiredArgsConstructor +public class BanyanDBStreamInsertRequest implements InsertRequest { + + @Getter + private final StreamWrite streamWrite; +} \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java new file mode 100644 index 0000000000..9fc403afc6 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import org.apache.skywalking.banyandb.v1.client.*; +import org.apache.skywalking.oap.server.core.analysis.IDManager; +import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.query.type.*; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; +import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ITraceQueryDAO { + private static final DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormat.forPattern("yyyyMMddHHmmss"); + + private static final List<String> BASIC_QUERY_PROJ = ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"); + private static final List<String> TRACE_ID_QUERY_PROJ = ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"); + + public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) { + super(client); + } + + @Override + public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, TraceState traceState, QueryOrder queryOrder, List<Tag> tags) throws IOException { + StreamQuery query; + if (startSecondTB != 0 && endSecondTB != 0) { + query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB), parseMillisFromEndSecondTB(endSecondTB)), BASIC_QUERY_PROJ); + } else { + query = new StreamQuery(BanyanDBSchema.NAME, BASIC_QUERY_PROJ); + } + if (minDuration != 0) { + // duration >= minDuration + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", "duration", minDuration)); + } + if (maxDuration != 0) { + // duration <= maxDuration + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", "duration", maxDuration)); + } + + if (!Strings.isNullOrEmpty(serviceId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_id", serviceId)); + } + + if (!Strings.isNullOrEmpty(serviceInstanceId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_instance_id", serviceInstanceId)); + } + + if (!Strings.isNullOrEmpty(endpointId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "endpoint_id", endpointId)); + } + + switch (traceState) { + case ERROR: + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ERROR.getState())); + break; + case SUCCESS: + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.SUCCESS.getState())); + break; + default: + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ALL.getState())); + break; + } + + switch (queryOrder) { + case BY_START_TIME: + query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC)); + break; + case BY_DURATION: + query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC)); + break; + } + + if (CollectionUtils.isNotEmpty(tags)) { + for (final Tag tag : tags) { + if (BanyanDBSchema.INDEX_FIELDS.contains(tag.getKey())) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue())); + } + } + } + + query.setLimit(limit); + query.setOffset(from); + + // build request + StreamQueryResponse response = this.getClient().query(query); + TraceBrief brief = new TraceBrief(); + brief.setTotal(response.size()); + brief.getTraces().addAll(response.getElements().stream().map(elem -> { + BasicTrace trace = new BasicTrace(); + trace.setSegmentId(elem.getId()); + final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0); + trace.getTraceIds().add((String) searchable.get(0).getValue()); + trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1); + trace.getEndpointNames().add(IDManager.EndpointID.analysisId( + (String) searchable.get(2).getValue() + ).getEndpointName()); + trace.setDuration(((Long) searchable.get(3).getValue()).intValue()); + trace.setStart(String.valueOf(searchable.get(4).getValue())); + return trace; + }).collect(Collectors.toList())); + return brief; + } + + @Override + public List<SegmentRecord> queryByTraceId(String traceId) throws IOException { + StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, TRACE_ID_QUERY_PROJ); + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId)); + query.setDataBinary(true); + StreamQueryResponse response = this.getClient().query(query); + return response.getElements().stream().map(elem -> { + SegmentRecord record = new SegmentRecord(); + final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0); + record.setSegmentId(elem.getId()); + record.setTraceId((String) searchable.get(0).getValue()); + record.setIsError(((Number) searchable.get(1).getValue()).intValue()); + record.setServiceId((String) searchable.get(2).getValue()); + record.setServiceInstanceId((String) searchable.get(3).getValue()); + record.setEndpointId((String) searchable.get(4).getValue()); + record.setLatency(((Number) searchable.get(5).getValue()).intValue()); + record.setStartTime(((Number) searchable.get(6).getValue()).longValue()); + final List<TagAndValue<?>> data = elem.getTagFamilies().get(1); + // TODO: support binary data in the client SDK + record.setDataBinary((byte[]) data.get(0).getValue()); + return record; + }).collect(Collectors.toList()); + } + + @Override + public List<Span> doFlexibleTraceQuery(String traceId) throws IOException { + return Collections.emptyList(); + } + + static long parseMillisFromStartSecondTB(long startSecondTB) { + return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC).parseMillis(String.valueOf(startSecondTB)); + } + + static long parseMillisFromEndSecondTB(long endSecondTB) { + long t = endSecondTB; + long second = t % 100; + if (second > 59) { + second = 0; + } + t = t / 100; + long minute = t % 100; + if (minute > 59) { + minute = 0; + } + t = t / 100; + long hour = t % 100; + if (hour > 23) { + hour = 0; + } + t = t / 100; + return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC) + .parseMillis(String.valueOf(((t * 100 + hour) * 100 + minute) * 100 + second)); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java new file mode 100644 index 0000000000..ad8fed926e --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java @@ -0,0 +1,35 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.oap.server.core.query.input.DashboardSetting; +import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration; +import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus; +import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream + */ +public class BanyanDBUITemplateManagementDAO implements UITemplateManagementDAO { + @Override + public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException { + return Collections.emptyList(); + } + + @Override + public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException { + return TemplateChangeStatus.builder().status(false).message("Can't add a new template").build(); + } + + @Override + public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException { + return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build(); + } + + @Override + public TemplateChangeStatus disableTemplate(String name) throws IOException { + return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build(); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider new file mode 100644 index 0000000000..6b9674593d --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageProvider
