This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch components
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/components by this push:
new 0a1a4f838e Support multiple component IDs in the service topology level
0a1a4f838e is described below
commit 0a1a4f838eae716cac448cd5ff299c16a396023f
Author: Wu Sheng <[email protected]>
AuthorDate: Fri Dec 16 09:49:33 2022 +0800
Support multiple component IDs in the service topology level
---
docs/en/changes/changes.md | 3 +
.../skywalking/oap/meter/analyzer/Analyzer.java | 4 +-
.../service/ServiceCallRelationDispatcher.java | 4 +-
.../service/ServiceRelationClientSideMetrics.java | 37 +++---
.../service/ServiceRelationServerSideMetrics.java | 37 +++---
.../service/TCPServiceCallRelationDispatcher.java | 4 +-
.../oap/server/core/analysis/metrics/IntList.java | 4 +-
.../server/core/query/ServiceTopologyBuilder.java | 3 +
.../oap/server/core/query/type/Call.java | 3 -
.../core/storage/annotation/ElasticSearch.java | 17 ++-
.../core/storage/model/ElasticSearchExtension.java | 2 +
.../server/core/storage/model/StorageModels.java | 18 +--
.../server/core/storage/model/ModelColumnTest.java | 10 +-
.../banyandb/measure/BanyanDBTopologyQueryDAO.java | 129 +++++++++++++--------
.../elasticsearch/base/ColumnTypeEsMapping.java | 11 +-
.../elasticsearch/base/StorageEsInstaller.java | 2 +-
.../elasticsearch/query/TopologyQueryEsDAO.java | 17 +--
.../jdbc/common/dao/JDBCTopologyQueryDAO.java | 18 +--
18 files changed, 202 insertions(+), 121 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 43d910fc15..3b1bb1e95a 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -41,6 +41,9 @@
* Use TTL-driven interval settings for the `measure-default` group in BanyanDB.
* Fix wrong group of non time-relative metadata in BanyanDB.
* Refactor `StorageData#id` to the new StorageID object from a String type.
+* Support multiple component IDs in the service topology level.
+* Add `ElasticSearch.Keyword` annotation to declare the target field type as
`keyword`.
+* [Breaking Change] Field `component_id` of `service_relation_client_side` and
`service_relation_server_side` have been replaced by `component_ids`(keyword).
#### UI
diff --git
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index 90a0a9e83d..be96151c0b 100644
---
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -324,7 +324,7 @@ public class Analyzer {
metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
metrics.setSourceServiceId(entity.sourceServiceId());
metrics.setDestServiceId(entity.destServiceId());
- metrics.setComponentId(0);
+ metrics.getComponentIds().add(0);
metrics.setEntityId(entity.id());
MetricsStreamProcessor.getInstance().in(metrics);
}
@@ -334,7 +334,7 @@ public class Analyzer {
metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
metrics.setSourceServiceId(entity.sourceServiceId());
metrics.setDestServiceId(entity.destServiceId());
- metrics.setComponentId(0);
+ metrics.getComponentIds().add(0);
metrics.setEntityId(entity.id());
MetricsStreamProcessor.getInstance().in(metrics);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceCallRelationDispatcher.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceCallRelationDispatcher.java
index 867f546858..2f87bce5d5 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceCallRelationDispatcher.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceCallRelationDispatcher.java
@@ -41,7 +41,7 @@ public class ServiceCallRelationDispatcher implements
SourceDispatcher<ServiceRe
metrics.setTimeBucket(source.getTimeBucket());
metrics.setSourceServiceId(source.getSourceServiceId());
metrics.setDestServiceId(source.getDestServiceId());
- metrics.setComponentId(source.getComponentId());
+ metrics.getComponentIds().add(source.getComponentId());
metrics.setEntityId(source.getEntityId());
MetricsStreamProcessor.getInstance().in(metrics);
}
@@ -51,7 +51,7 @@ public class ServiceCallRelationDispatcher implements
SourceDispatcher<ServiceRe
metrics.setTimeBucket(source.getTimeBucket());
metrics.setSourceServiceId(source.getSourceServiceId());
metrics.setDestServiceId(source.getDestServiceId());
- metrics.setComponentId(source.getComponentId());
+ metrics.getComponentIds().add(source.getComponentId());
metrics.setEntityId(source.getEntityId());
MetricsStreamProcessor.getInstance().in(metrics);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationClientSideMetrics.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationClientSideMetrics.java
index 7f1ffa402a..83c40d8ad5 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationClientSideMetrics.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationClientSideMetrics.java
@@ -22,6 +22,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import
org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
@@ -29,6 +30,7 @@ import
org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.ShardingAlgorithm;
import org.apache.skywalking.oap.server.core.storage.StorageID;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -48,7 +50,7 @@ public class ServiceRelationClientSideMetrics extends Metrics
{
public static final String INDEX_NAME = "service_relation_client_side";
public static final String SOURCE_SERVICE_ID = "source_service_id";
public static final String DEST_SERVICE_ID = "dest_service_id";
- public static final String COMPONENT_ID = "component_id";
+ public static final String COMPONENT_IDS = "component_ids";
@Setter
@Getter
@@ -60,8 +62,9 @@ public class ServiceRelationClientSideMetrics extends Metrics
{
private String destServiceId;
@Setter
@Getter
- @Column(columnName = COMPONENT_ID, storageOnly = true)
- private int componentId;
+ @Column(columnName = COMPONENT_IDS, storageOnly = true)
+ @ElasticSearch.Keyword
+ private IntList componentIds = new IntList(3);
@Setter
@Getter
@Column(columnName = ENTITY_ID, length = 512)
@@ -76,11 +79,17 @@ public class ServiceRelationClientSideMetrics extends
Metrics {
@Override
public boolean combine(Metrics metrics) {
ServiceRelationClientSideMetrics serviceRelationClientSideMetrics =
(ServiceRelationClientSideMetrics) metrics;
- if (this.getComponentId() == 0 &&
serviceRelationClientSideMetrics.getComponentId() != 0) {
- this.componentId =
serviceRelationClientSideMetrics.getComponentId();
- return true;
+ final IntList sourceIDs = this.getComponentIds();
+ final IntList targetIDs =
serviceRelationClientSideMetrics.getComponentIds();
+ boolean changed = false;
+ for (int i = 0; i < targetIDs.size(); i++) {
+ final int targetID = targetIDs.get(i);
+ if (!sourceIDs.include(targetID)) {
+ sourceIDs.add(targetID);
+ changed = true;
+ }
}
- return false;
+ return changed;
}
@Override
@@ -95,7 +104,7 @@ public class ServiceRelationClientSideMetrics extends
Metrics {
metrics.setTimeBucket(toTimeBucketInHour());
metrics.setSourceServiceId(getSourceServiceId());
metrics.setDestServiceId(getDestServiceId());
- metrics.setComponentId(getComponentId());
+ metrics.getComponentIds().copyFrom(getComponentIds());
return metrics;
}
@@ -106,7 +115,7 @@ public class ServiceRelationClientSideMetrics extends
Metrics {
metrics.setTimeBucket(toTimeBucketInDay());
metrics.setSourceServiceId(getSourceServiceId());
metrics.setDestServiceId(getDestServiceId());
- metrics.setComponentId(getComponentId());
+ metrics.getComponentIds().copyFrom(getComponentIds());
return metrics;
}
@@ -122,8 +131,7 @@ public class ServiceRelationClientSideMetrics extends
Metrics {
setEntityId(remoteData.getDataStrings(0));
setSourceServiceId(remoteData.getDataStrings(1));
setDestServiceId(remoteData.getDataStrings(2));
-
- setComponentId(remoteData.getDataIntegers(0));
+ setComponentIds(new IntList(remoteData.getDataStrings(3)));
setTimeBucket(remoteData.getDataLongs(0));
}
@@ -134,8 +142,7 @@ public class ServiceRelationClientSideMetrics extends
Metrics {
remoteBuilder.addDataStrings(getEntityId());
remoteBuilder.addDataStrings(getSourceServiceId());
remoteBuilder.addDataStrings(getDestServiceId());
-
- remoteBuilder.addDataIntegers(getComponentId());
+ remoteBuilder.addDataStrings(getComponentIds().toStorageData());
remoteBuilder.addDataLongs(getTimeBucket());
return remoteBuilder;
@@ -147,7 +154,7 @@ public class ServiceRelationClientSideMetrics extends
Metrics {
ServiceRelationClientSideMetrics metrics = new
ServiceRelationClientSideMetrics();
metrics.setSourceServiceId((String)
converter.get(SOURCE_SERVICE_ID));
metrics.setDestServiceId((String) converter.get(DEST_SERVICE_ID));
- metrics.setComponentId(((Number)
converter.get(COMPONENT_ID)).intValue());
+ metrics.setComponentIds(new IntList((String)
converter.get(COMPONENT_IDS)));
metrics.setTimeBucket(((Number)
converter.get(TIME_BUCKET)).longValue());
metrics.setEntityId((String) converter.get(ENTITY_ID));
return metrics;
@@ -159,7 +166,7 @@ public class ServiceRelationClientSideMetrics extends
Metrics {
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
converter.accept(SOURCE_SERVICE_ID,
storageData.getSourceServiceId());
converter.accept(DEST_SERVICE_ID, storageData.getDestServiceId());
- converter.accept(COMPONENT_ID, storageData.getComponentId());
+ converter.accept(COMPONENT_IDS, storageData.getComponentIds());
converter.accept(ENTITY_ID, storageData.getEntityId());
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationServerSideMetrics.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationServerSideMetrics.java
index d519cd31a1..d2752e11d3 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationServerSideMetrics.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationServerSideMetrics.java
@@ -23,6 +23,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import
org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
@@ -31,6 +32,7 @@ import
org.apache.skywalking.oap.server.core.storage.ShardingAlgorithm;
import org.apache.skywalking.oap.server.core.storage.StorageID;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -51,7 +53,7 @@ public class ServiceRelationServerSideMetrics extends Metrics
{
public static final String INDEX_NAME = "service_relation_server_side";
public static final String SOURCE_SERVICE_ID = "source_service_id";
public static final String DEST_SERVICE_ID = "dest_service_id";
- public static final String COMPONENT_ID = "component_id";
+ public static final String COMPONENT_IDS = "component_ids";
@Setter
@Getter
@@ -63,8 +65,9 @@ public class ServiceRelationServerSideMetrics extends Metrics
{
private String destServiceId;
@Setter
@Getter
- @Column(columnName = COMPONENT_ID, storageOnly = true)
- private int componentId;
+ @Column(columnName = COMPONENT_IDS, storageOnly = true)
+ @ElasticSearch.Keyword
+ private IntList componentIds = new IntList(3);
@Setter
@Getter
@Column(columnName = ENTITY_ID, length = 512)
@@ -81,11 +84,17 @@ public class ServiceRelationServerSideMetrics extends
Metrics {
@Override
public boolean combine(Metrics metrics) {
ServiceRelationServerSideMetrics serviceRelationServerSideMetrics =
(ServiceRelationServerSideMetrics) metrics;
- if (this.getComponentId() == 0 &&
serviceRelationServerSideMetrics.getComponentId() != 0) {
- this.componentId =
serviceRelationServerSideMetrics.getComponentId();
- return true;
+ final IntList sourceIDs = this.getComponentIds();
+ final IntList targetIDs =
serviceRelationServerSideMetrics.getComponentIds();
+ boolean changed = false;
+ for (int i = 0; i < targetIDs.size(); i++) {
+ final int targetID = targetIDs.get(i);
+ if (!sourceIDs.include(targetID)) {
+ sourceIDs.add(targetID);
+ changed = true;
+ }
}
- return false;
+ return changed;
}
@Override
@@ -99,7 +108,7 @@ public class ServiceRelationServerSideMetrics extends
Metrics {
metrics.setTimeBucket(toTimeBucketInHour());
metrics.setSourceServiceId(getSourceServiceId());
metrics.setDestServiceId(getDestServiceId());
- metrics.setComponentId(getComponentId());
+ metrics.getComponentIds().copyFrom(getComponentIds());
metrics.setEntityId(getEntityId());
return metrics;
}
@@ -110,7 +119,7 @@ public class ServiceRelationServerSideMetrics extends
Metrics {
metrics.setTimeBucket(toTimeBucketInDay());
metrics.setSourceServiceId(getSourceServiceId());
metrics.setDestServiceId(getDestServiceId());
- metrics.setComponentId(getComponentId());
+ metrics.getComponentIds().copyFrom(getComponentIds());
metrics.setEntityId(getEntityId());
return metrics;
}
@@ -127,8 +136,7 @@ public class ServiceRelationServerSideMetrics extends
Metrics {
setEntityId(remoteData.getDataStrings(0));
setSourceServiceId(remoteData.getDataStrings(1));
setDestServiceId(remoteData.getDataStrings(2));
-
- setComponentId(remoteData.getDataIntegers(0));
+ setComponentIds(new IntList(remoteData.getDataStrings(3)));
setTimeBucket(remoteData.getDataLongs(0));
}
@@ -139,8 +147,7 @@ public class ServiceRelationServerSideMetrics extends
Metrics {
remoteBuilder.addDataStrings(getEntityId());
remoteBuilder.addDataStrings(getSourceServiceId());
remoteBuilder.addDataStrings(getDestServiceId());
-
- remoteBuilder.addDataIntegers(getComponentId());
+ remoteBuilder.addDataStrings(getComponentIds().toStorageData());
remoteBuilder.addDataLongs(getTimeBucket());
return remoteBuilder;
@@ -153,7 +160,7 @@ public class ServiceRelationServerSideMetrics extends
Metrics {
metrics.setEntityId((String) converter.get(ENTITY_ID));
metrics.setSourceServiceId((String)
converter.get(SOURCE_SERVICE_ID));
metrics.setDestServiceId((String) converter.get(DEST_SERVICE_ID));
- metrics.setComponentId(((Number)
converter.get(COMPONENT_ID)).intValue());
+ metrics.setComponentIds(new IntList((String)
converter.get(COMPONENT_IDS)));
metrics.setTimeBucket(((Number)
converter.get(TIME_BUCKET)).longValue());
return metrics;
}
@@ -164,7 +171,7 @@ public class ServiceRelationServerSideMetrics extends
Metrics {
converter.accept(ENTITY_ID, storageData.getEntityId());
converter.accept(SOURCE_SERVICE_ID,
storageData.getSourceServiceId());
converter.accept(DEST_SERVICE_ID, storageData.getDestServiceId());
- converter.accept(COMPONENT_ID, storageData.getComponentId());
+ converter.accept(COMPONENT_IDS, storageData.getComponentIds());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/TCPServiceCallRelationDispatcher.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/TCPServiceCallRelationDispatcher.java
index 65e9a4534c..c3f2804e61 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/TCPServiceCallRelationDispatcher.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/TCPServiceCallRelationDispatcher.java
@@ -41,7 +41,7 @@ public class TCPServiceCallRelationDispatcher implements
SourceDispatcher<TCPSer
metrics.setTimeBucket(source.getTimeBucket());
metrics.setSourceServiceId(source.getSourceServiceId());
metrics.setDestServiceId(source.getDestServiceId());
- metrics.setComponentId(source.getComponentId());
+ metrics.getComponentIds().add(source.getComponentId());
metrics.setEntityId(source.getEntityId());
MetricsStreamProcessor.getInstance().in(metrics);
}
@@ -51,7 +51,7 @@ public class TCPServiceCallRelationDispatcher implements
SourceDispatcher<TCPSer
metrics.setTimeBucket(source.getTimeBucket());
metrics.setSourceServiceId(source.getSourceServiceId());
metrics.setDestServiceId(source.getDestServiceId());
- metrics.setComponentId(source.getComponentId());
+ metrics.getComponentIds().add(source.getComponentId());
metrics.setEntityId(source.getEntityId());
MetricsStreamProcessor.getInstance().in(metrics);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
index b5bd9efe90..7315b9d31a 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
@@ -77,8 +77,8 @@ public class IntList implements
StorageDataComplexObject<IntList> {
this.data.addAll(source.data);
}
- public void add(final int rank) {
- this.data.add(rank);
+ public void add(final int value) {
+ this.data.add(value);
}
public int get(final int idx) {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ServiceTopologyBuilder.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ServiceTopologyBuilder.java
index 3767e1f908..f61ba61a35 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ServiceTopologyBuilder.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ServiceTopologyBuilder.java
@@ -108,6 +108,9 @@ class ServiceTopologyBuilder {
call.addDetectPoint(DetectPoint.CLIENT);
call.addSourceComponent(componentLibraryCatalogService.getComponentName(clientCall.getComponentId()));
calls.add(call);
+ } else {
+ Call call = callMap.get(relationId);
+
call.addSourceComponent(componentLibraryCatalogService.getComponentName(clientCall.getComponentId()));
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Call.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Call.java
index 49caff4857..4676649590 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Call.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Call.java
@@ -36,9 +36,6 @@ public class Call {
private String id;
private List<DetectPoint> detectPoints;
- private List<Integer> sourceComponentIDs;
- private List<Integer> targetComponentIDs;
-
public Call() {
sourceComponents = new ArrayList<>();
targetComponents = new ArrayList<>();
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
index 5d8b683489..b592e5564f 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
@@ -69,12 +69,27 @@ public @interface ElasticSearch {
}
}
+ /**
+ * Keyword represents the annotated field needs a keyword type in the
ElasticSearch.
+ * Typically, this annotation is for a field with
+ * {@link
org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject}
type, which uses the `text`
+ * type by default.
+ *
+ * @since 9.4.0
+ */
+ @Target({ElementType.FIELD})
+ @Retention(RetentionPolicy.RUNTIME)
+ @interface Keyword {
+
+ }
+
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@interface Column {
/**
- * Warning: this is only used to solve the conflict among the existing
columns since we need support to merge all metrics
+ * Warning: this is only used to solve the conflict among the existing
columns since we need support to merge
+ * all metrics
* in one physical index template. When creating a new column, we
should avoid the compatibility issue
* between these 2 storage modes rather than use this alias.
*/
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
index 3323bd8b57..2ff5264b2a 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
@@ -38,6 +38,8 @@ public class ElasticSearchExtension {
private final String columnAlias;
+ private final boolean isKeyword;
+
public boolean needMatchQuery() {
return analyzer != null;
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index a8cc41b518..f196f8aa5b 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -153,7 +153,8 @@ public class StorageModels implements IModelManager,
ModelCreator, ModelManipula
if (field.isAnnotationPresent(Column.class)) {
if
(field.isAnnotationPresent(SQLDatabase.AdditionalEntity.class)) {
if (!record) {
- throw new IllegalStateException("Model [" + modelName
+ "] is not a Record, @SQLDatabase.AdditionalEntity only supports Record.");
+ throw new IllegalStateException(
+ "Model [" + modelName + "] is not a Record,
@SQLDatabase.AdditionalEntity only supports Record.");
}
}
@@ -185,9 +186,11 @@ public class StorageModels implements IModelManager,
ModelCreator, ModelManipula
final ElasticSearch.MatchQuery elasticSearchAnalyzer =
field.getAnnotation(
ElasticSearch.MatchQuery.class);
final ElasticSearch.Column elasticSearchColumn =
field.getAnnotation(ElasticSearch.Column.class);
+ final ElasticSearch.Keyword keywordColumn =
field.getAnnotation(ElasticSearch.Keyword.class);
ElasticSearchExtension elasticSearchExtension = new
ElasticSearchExtension(
elasticSearchAnalyzer == null ? null :
elasticSearchAnalyzer.analyzer(),
- elasticSearchColumn == null ? null :
elasticSearchColumn.columnAlias()
+ elasticSearchColumn == null ? null :
elasticSearchColumn.columnAlias(),
+ keywordColumn == null ? false : true
);
// BanyanDB extension
@@ -198,7 +201,7 @@ public class StorageModels implements IModelManager,
ModelCreator, ModelManipula
final BanyanDB.NoIndexing banyanDBNoIndex =
field.getAnnotation(
BanyanDB.NoIndexing.class);
final BanyanDB.IndexRule banyanDBIndexRule =
field.getAnnotation(
- BanyanDB.IndexRule.class);
+ BanyanDB.IndexRule.class);
BanyanDBExtension banyanDBExtension = new BanyanDBExtension(
banyanDBSeriesID == null ? -1 : banyanDBSeriesID.index(),
banyanDBGlobalIndex != null,
@@ -248,13 +251,14 @@ public class StorageModels implements IModelManager,
ModelCreator, ModelManipula
}
}
- // For the annotation need to be declared on the superclass, the other
annotation should be declared on the subclass.
+ // For the annotation need to be declared on the superclass, the other
annotation should be declared on the subclass.
if (!sqlDBModelExtension.getSharding().isPresent() &&
clazz.isAnnotationPresent(SQLDatabase.Sharding.class)) {
SQLDatabase.Sharding sharding =
clazz.getAnnotation(SQLDatabase.Sharding.class);
sqlDBModelExtension.setSharding(
- Optional.of(new
SQLDatabaseModelExtension.Sharding(sharding.shardingAlgorithm(),
-
sharding.dataSourceShardingColumn(),
-
sharding.tableShardingColumn()
+ Optional.of(new SQLDatabaseModelExtension.Sharding(
+ sharding.shardingAlgorithm(),
+ sharding.dataSourceShardingColumn(),
+ sharding.tableShardingColumn()
)));
}
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
index e2574a1dde..51c6cb9e62 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
@@ -31,7 +31,7 @@ public class ModelColumnTest {
false, false, true, 0,
new SQLDatabaseExtension(),
new ElasticSearchExtension(
-
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
+
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
new BanyanDBExtension(-1, false,
true, BanyanDB.IndexRule.IndexType.INVERTED)
);
Assert.assertEquals(true, column.isStorageOnly());
@@ -40,7 +40,7 @@ public class ModelColumnTest {
column = new ModelColumn(new ColumnName("", "abc"), DataTable.class,
DataTable.class,
false, false, true, 200,
new SQLDatabaseExtension(),
- new
ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER,
"abc"),
+ new
ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER,
"abc", false),
new BanyanDBExtension(-1, false, true,
BanyanDB.IndexRule.IndexType.INVERTED)
);
Assert.assertEquals(true, column.isStorageOnly());
@@ -50,7 +50,7 @@ public class ModelColumnTest {
column = new ModelColumn(new ColumnName("", "abc"), String.class,
String.class,
false, false, true, 200,
new SQLDatabaseExtension(),
- new
ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER,
"abc"),
+ new
ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER,
"abc", false),
new BanyanDBExtension(-1, false, true,
BanyanDB.IndexRule.IndexType.INVERTED)
);
Assert.assertEquals(false, column.isStorageOnly());
@@ -63,7 +63,7 @@ public class ModelColumnTest {
true, false, true, 200,
new SQLDatabaseExtension(),
new ElasticSearchExtension(
-
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
+
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
new BanyanDBExtension(-1, false,
true, BanyanDB.IndexRule.IndexType.INVERTED)
);
}
@@ -74,7 +74,7 @@ public class ModelColumnTest {
true, true, false, 200,
new SQLDatabaseExtension(),
new ElasticSearchExtension(
-
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
+
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
new BanyanDBExtension(-1, false,
true, BanyanDB.IndexRule.IndexType.INVERTED)
);
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
index 5190ac43be..4138537757 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
@@ -19,6 +19,12 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
@@ -31,19 +37,12 @@ import
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.Pr
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationServerSideMetrics;
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.Call;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
@@ -57,7 +56,8 @@ public class BanyanDBTopologyQueryDAO extends
AbstractBanyanDBDAO implements ITo
}
@Override
- public List<Call.CallDetail>
loadServiceRelationsDetectedAtServerSide(Duration duration, List<String>
serviceIds) throws IOException {
+ public List<Call.CallDetail>
loadServiceRelationsDetectedAtServerSide(Duration duration,
+
List<String> serviceIds) throws IOException {
if (CollectionUtils.isEmpty(serviceIds)) {
throw new UnexpectedException("Service id is empty");
}
@@ -68,7 +68,8 @@ public class BanyanDBTopologyQueryDAO extends
AbstractBanyanDBDAO implements ITo
}
@Override
- public List<Call.CallDetail>
loadServiceRelationDetectedAtClientSide(Duration duration, List<String>
serviceIds) throws IOException {
+ public List<Call.CallDetail>
loadServiceRelationDetectedAtClientSide(Duration duration,
+
List<String> serviceIds) throws IOException {
if (CollectionUtils.isEmpty(serviceIds)) {
throw new UnexpectedException("Service id is empty");
}
@@ -108,7 +109,9 @@ public class BanyanDBTopologyQueryDAO extends
AbstractBanyanDBDAO implements ITo
return queryBuilderList;
}
- List<Call.CallDetail> queryServiceRelation(Duration duration,
List<QueryBuilder<MeasureQuery>> queryBuilderList, DetectPoint detectPoint)
throws IOException {
+ List<Call.CallDetail> queryServiceRelation(Duration duration,
+
List<QueryBuilder<MeasureQuery>> queryBuilderList,
+ DetectPoint detectPoint) throws
IOException {
long startTB = 0;
long endTB = 0;
if (nonNull(duration)) {
@@ -120,46 +123,59 @@ public class BanyanDBTopologyQueryDAO extends
AbstractBanyanDBDAO implements ITo
timestampRange = new
TimestampRange(TimeBucket.getTimestamp(startTB),
TimeBucket.getTimestamp(endTB));
}
final String modelName = detectPoint == DetectPoint.SERVER ?
ServiceRelationServerSideMetrics.INDEX_NAME :
- ServiceRelationClientSideMetrics.INDEX_NAME;
+ ServiceRelationClientSideMetrics.INDEX_NAME;
final Map<String, Call.CallDetail> callMap = new HashMap<>();
for (final QueryBuilder<MeasureQuery> q : queryBuilderList) {
MeasureQueryResponse resp = query(modelName,
-
ImmutableSet.of(ServiceRelationClientSideMetrics.COMPONENT_ID,
- ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
- ServiceRelationClientSideMetrics.DEST_SERVICE_ID,
- Metrics.ENTITY_ID),
- Collections.emptySet(), timestampRange, q);
+ ImmutableSet.of(
+
ServiceRelationClientSideMetrics.COMPONENT_IDS,
+
ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
+
ServiceRelationClientSideMetrics.DEST_SERVICE_ID,
+ Metrics.ENTITY_ID
+ ),
+ Collections.emptySet(),
timestampRange, q
+ );
if (resp.size() == 0) {
continue;
}
final Call.CallDetail call = new Call.CallDetail();
final String entityId =
resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
- final int componentId = ((Number)
resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_ID)).intValue();
- call.buildFromServiceRelation(entityId, componentId, detectPoint);
- callMap.putIfAbsent(entityId, call);
+ final IntList componentIds = new IntList(
+
resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_IDS));
+ for (int i = 0; i < componentIds.size(); i++) {
+ call.buildFromServiceRelation(entityId, componentIds.get(i),
detectPoint);
+ callMap.putIfAbsent(entityId, call);
+ }
}
return new ArrayList<>(callMap.values());
}
@Override
- public List<Call.CallDetail>
loadInstanceRelationDetectedAtServerSide(String clientServiceId, String
serverServiceId, Duration duration) throws IOException {
- List<QueryBuilder<MeasureQuery>> queryBuilderList =
buildInstanceRelationsQueries(clientServiceId, serverServiceId);
+ public List<Call.CallDetail>
loadInstanceRelationDetectedAtServerSide(String clientServiceId,
+
String serverServiceId,
+
Duration duration) throws IOException {
+ List<QueryBuilder<MeasureQuery>> queryBuilderList =
buildInstanceRelationsQueries(
+ clientServiceId, serverServiceId);
return queryInstanceRelation(duration, queryBuilderList,
DetectPoint.SERVER);
}
@Override
- public List<Call.CallDetail>
loadInstanceRelationDetectedAtClientSide(String clientServiceId, String
serverServiceId, Duration duration) throws IOException {
- List<QueryBuilder<MeasureQuery>> queryBuilderList =
buildInstanceRelationsQueries(clientServiceId, serverServiceId);
+ public List<Call.CallDetail>
loadInstanceRelationDetectedAtClientSide(String clientServiceId,
+
String serverServiceId,
+
Duration duration) throws IOException {
+ List<QueryBuilder<MeasureQuery>> queryBuilderList =
buildInstanceRelationsQueries(
+ clientServiceId, serverServiceId);
return queryInstanceRelation(duration, queryBuilderList,
DetectPoint.CLIENT);
}
- private List<QueryBuilder<MeasureQuery>>
buildInstanceRelationsQueries(String clientServiceId, String serverServiceId) {
+ private List<QueryBuilder<MeasureQuery>>
buildInstanceRelationsQueries(String clientServiceId,
+
String serverServiceId) {
List<QueryBuilder<MeasureQuery>> queryBuilderList = new ArrayList<>(2);
queryBuilderList.add(new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
clientServiceId))
-
.and(eq(ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
serverServiceId));
+
.and(eq(ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
serverServiceId));
}
});
@@ -167,13 +183,15 @@ public class BanyanDBTopologyQueryDAO extends
AbstractBanyanDBDAO implements ITo
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
clientServiceId))
-
.and(eq(ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
serverServiceId));
+
.and(eq(ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
serverServiceId));
}
});
return queryBuilderList;
}
- List<Call.CallDetail> queryInstanceRelation(Duration duration,
List<QueryBuilder<MeasureQuery>> queryBuilderList, DetectPoint detectPoint)
throws IOException {
+ List<Call.CallDetail> queryInstanceRelation(Duration duration,
+
List<QueryBuilder<MeasureQuery>> queryBuilderList,
+ DetectPoint detectPoint)
throws IOException {
long startTB = 0;
long endTB = 0;
if (nonNull(duration)) {
@@ -189,17 +207,23 @@ public class BanyanDBTopologyQueryDAO extends
AbstractBanyanDBDAO implements ITo
final Map<String, Call.CallDetail> callMap = new HashMap<>();
for (final QueryBuilder<MeasureQuery> q : queryBuilderList) {
MeasureQueryResponse resp = query(modelName,
-
ImmutableSet.of(ServiceInstanceRelationServerSideMetrics.COMPONENT_ID,
-
ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
-
ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
- Metrics.ENTITY_ID),
- Collections.emptySet(), timestampRange, q);
+ ImmutableSet.of(
+
ServiceInstanceRelationServerSideMetrics.COMPONENT_ID,
+
ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
+
ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
+ Metrics.ENTITY_ID
+ ),
+ Collections.emptySet(),
timestampRange, q
+ );
if (resp.size() == 0) {
continue;
}
final Call.CallDetail call = new Call.CallDetail();
final String entityId =
resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
- final int componentId = ((Number)
resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_ID)).intValue();
+ final int componentId = ((Number) resp.getDataPoints()
+ .get(0)
+ .getTagValue(
+
ServiceRelationClientSideMetrics.COMPONENT_IDS)).intValue();
call.buildFromInstanceRelation(entityId, componentId, detectPoint);
callMap.putIfAbsent(entityId, call);
}
@@ -213,12 +237,14 @@ public class BanyanDBTopologyQueryDAO extends
AbstractBanyanDBDAO implements ITo
}
@Override
- public List<Call.CallDetail>
loadProcessRelationDetectedAtClientSide(String serviceInstanceId, Duration
duration) throws IOException {
+ public List<Call.CallDetail>
loadProcessRelationDetectedAtClientSide(String serviceInstanceId,
+
Duration duration) throws IOException {
return queryProcessRelation(duration, serviceInstanceId,
DetectPoint.CLIENT);
}
@Override
- public List<Call.CallDetail>
loadProcessRelationDetectedAtServerSide(String serviceInstanceId, Duration
duration) throws IOException {
+ public List<Call.CallDetail>
loadProcessRelationDetectedAtServerSide(String serviceInstanceId,
+
Duration duration) throws IOException {
return queryProcessRelation(duration, serviceInstanceId,
DetectPoint.SERVER);
}
@@ -240,7 +266,9 @@ public class BanyanDBTopologyQueryDAO extends
AbstractBanyanDBDAO implements ITo
return queryBuilderList;
}
- List<Call.CallDetail> queryEndpointRelation(Duration duration,
List<QueryBuilder<MeasureQuery>> queryBuilderList, DetectPoint detectPoint)
throws IOException {
+ List<Call.CallDetail> queryEndpointRelation(Duration duration,
+
List<QueryBuilder<MeasureQuery>> queryBuilderList,
+ DetectPoint detectPoint)
throws IOException {
long startTB = 0;
long endTB = 0;
if (nonNull(duration)) {
@@ -254,10 +282,13 @@ public class BanyanDBTopologyQueryDAO extends
AbstractBanyanDBDAO implements ITo
final Map<String, Call.CallDetail> callMap = new HashMap<>();
for (final QueryBuilder<MeasureQuery> q : queryBuilderList) {
MeasureQueryResponse resp =
query(EndpointRelationServerSideMetrics.INDEX_NAME,
-
ImmutableSet.of(EndpointRelationServerSideMetrics.DEST_ENDPOINT,
- EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
- Metrics.ENTITY_ID),
- Collections.emptySet(), timestampRange, q);
+ ImmutableSet.of(
+
EndpointRelationServerSideMetrics.DEST_ENDPOINT,
+
EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
+ Metrics.ENTITY_ID
+ ),
+ Collections.emptySet(),
timestampRange, q
+ );
if (resp.size() == 0) {
continue;
}
@@ -269,7 +300,9 @@ public class BanyanDBTopologyQueryDAO extends
AbstractBanyanDBDAO implements ITo
return new ArrayList<>(callMap.values());
}
- List<Call.CallDetail> queryProcessRelation(Duration duration, String
serviceInstanceId, DetectPoint detectPoint) throws IOException {
+ List<Call.CallDetail> queryProcessRelation(Duration duration,
+ String serviceInstanceId,
+ DetectPoint detectPoint) throws
IOException {
long startTB = 0;
long endTB = 0;
if (nonNull(duration)) {
@@ -284,16 +317,20 @@ public class BanyanDBTopologyQueryDAO extends
AbstractBanyanDBDAO implements ITo
ProcessRelationClientSideMetrics.INDEX_NAME;
final Map<String, Call.CallDetail> callMap = new HashMap<>();
MeasureQueryResponse resp = query(modelName,
- ImmutableSet.of(Metrics.ENTITY_ID,
ProcessRelationClientSideMetrics.COMPONENT_ID),
- Collections.emptySet(), timestampRange, new
QueryBuilder<MeasureQuery>() {
+ ImmutableSet.of(
+ Metrics.ENTITY_ID,
ProcessRelationClientSideMetrics.COMPONENT_ID),
+ Collections.emptySet(),
timestampRange, new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ProcessRelationServerSideMetrics.SERVICE_INSTANCE_ID,
serviceInstanceId));
}
- });
+ }
+ );
final Call.CallDetail call = new Call.CallDetail();
final String entityId =
resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
- final int componentId = ((Number)
resp.getDataPoints().get(0).getTagValue(ProcessRelationClientSideMetrics.COMPONENT_ID)).intValue();
+ final int componentId = ((Number) resp.getDataPoints()
+ .get(0)
+
.getTagValue(ProcessRelationClientSideMetrics.COMPONENT_ID)).intValue();
call.buildProcessRelation(entityId, componentId, detectPoint);
callMap.putIfAbsent(entityId, call);
return new ArrayList<>(callMap.values());
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
index 7a71bc4f4b..e0d2af9540 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -23,20 +23,19 @@ import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.Layer;
-import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping;
+import
org.apache.skywalking.oap.server.core.storage.model.ElasticSearchExtension;
import
org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
-public class ColumnTypeEsMapping implements DataTypeMapping {
+public class ColumnTypeEsMapping {
- @Override
- public String transform(Class<?> type, Type genericType) {
+ public String transform(Class<?> type, Type genericType, final
ElasticSearchExtension elasticSearchExtension) {
if (Integer.class.equals(type) || int.class.equals(type) ||
Layer.class.equals(type)) {
return "integer";
} else if (Long.class.equals(type) || long.class.equals(type)) {
return "long";
} else if (Double.class.equals(type) || double.class.equals(type)) {
return "double";
- } else if (String.class.equals(type)) {
+ } else if (String.class.equals(type) ||
elasticSearchExtension.isKeyword()) {
return "keyword";
} else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
return "text";
@@ -46,7 +45,7 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
return "text";
} else if (List.class.isAssignableFrom(type)) {
final Type elementType = ((ParameterizedType)
genericType).getActualTypeArguments()[0];
- return transform((Class<?>) elementType, elementType);
+ return transform((Class<?>) elementType, elementType,
elasticSearchExtension);
} else {
throw new IllegalArgumentException("Unsupported data type: " +
type.getName());
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index c3f9bd4619..515f1a3092 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -302,7 +302,7 @@ public class StorageEsInstaller extends ModelInstaller {
Map<String, Object> properties = new HashMap<>();
Mappings.Source source = new Mappings.Source();
for (ModelColumn columnDefine : model.getColumns()) {
- final String type =
columnTypeEsMapping.transform(columnDefine.getType(),
columnDefine.getGenericType());
+ final String type =
columnTypeEsMapping.transform(columnDefine.getType(),
columnDefine.getGenericType(), columnDefine.getElasticSearchExtension());
String columnName = columnDefine.getColumnName().getName();
String alias =
columnDefine.getElasticSearchExtension().getColumnAlias();
if (!config.isLogicSharding() && alias != null) {
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
index 5527eeb018..15137fdab8 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
@@ -37,6 +37,7 @@ import
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.Pr
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationServerSideMetrics;
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.Call;
@@ -256,7 +257,7 @@ public class TopologyQueryEsDAO extends EsDAO implements
ITopologyQueryDAO {
String entityId = (String) entityBucket.get("key");
final Map<String, Object> componentTerms =
(Map<String, Object>) entityBucket.get(
- ServiceRelationServerSideMetrics.COMPONENT_ID);
+ ServiceRelationServerSideMetrics.COMPONENT_IDS);
final List<Map<String, Object>> subAgg =
(List<Map<String, Object>>) componentTerms.get("buckets");
final int componentId = ((Number)
subAgg.iterator().next().get("key")).intValue();
@@ -275,8 +276,8 @@ public class TopologyQueryEsDAO extends EsDAO implements
ITopologyQueryDAO {
Aggregation
.terms(Metrics.ENTITY_ID).field(Metrics.ENTITY_ID)
.subAggregation(
-
Aggregation.terms(ServiceRelationServerSideMetrics.COMPONENT_ID)
-
.field(ServiceRelationServerSideMetrics.COMPONENT_ID)
+
Aggregation.terms(ServiceRelationServerSideMetrics.COMPONENT_IDS)
+
.field(ServiceRelationServerSideMetrics.COMPONENT_IDS)
.executionHint(TermsAggregationBuilder.ExecutionHint.MAP)
.collectMode(TermsAggregationBuilder.CollectMode.BREADTH_FIRST))
.executionHint(TermsAggregationBuilder.ExecutionHint.MAP)
@@ -296,14 +297,16 @@ public class TopologyQueryEsDAO extends EsDAO implements
ITopologyQueryDAO {
String entityId = (String) entityBucket.get("key");
final Map<String, Object> componentTerms =
(Map<String, Object>) entityBucket.get(
- ServiceRelationServerSideMetrics.COMPONENT_ID);
+ ServiceRelationServerSideMetrics.COMPONENT_IDS);
final List<Map<String, Object>> subAgg =
(List<Map<String, Object>>) componentTerms.get("buckets");
- final int componentId = ((Number)
subAgg.iterator().next().get("key")).intValue();
+ final IntList componentIds = new IntList((String)
subAgg.iterator().next().get("key"));
Call.CallDetail call = new Call.CallDetail();
- call.buildFromServiceRelation(entityId, componentId, detectPoint);
- calls.add(call);
+ for (int i = 0; i < componentIds.size(); i++) {
+ call.buildFromServiceRelation(entityId, componentIds.get(i),
detectPoint);
+ calls.add(call);
+ }
}
return calls;
}
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCTopologyQueryDAO.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCTopologyQueryDAO.java
index 1f485929a0..165f9c5958 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCTopologyQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCTopologyQueryDAO.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import lombok.RequiredArgsConstructor;
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics;
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
@@ -31,13 +32,13 @@ import
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.Pr
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationServerSideMetrics;
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
import
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.Call;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class JDBCTopologyQueryDAO implements ITopologyQueryDAO {
@@ -157,11 +158,11 @@ public class JDBCTopologyQueryDAO implements
ITopologyQueryDAO {
try (Connection connection = jdbcClient.getConnection()) {
try (ResultSet resultSet = jdbcClient.executeQuery(
connection,
- "select " + Metrics.ENTITY_ID + ", " +
ServiceRelationServerSideMetrics.COMPONENT_ID
+ "select " + Metrics.ENTITY_ID + ", " +
ServiceRelationServerSideMetrics.COMPONENT_IDS
+ " from " + tableName + " where " + Metrics.TIME_BUCKET +
">= ? and "
+ Metrics.TIME_BUCKET + "<=? " + serviceIdMatchSql
.toString() +
- " group by " + Metrics.ENTITY_ID + "," +
ServiceRelationServerSideMetrics.COMPONENT_ID, conditions
+ " group by " + Metrics.ENTITY_ID + "," +
ServiceRelationServerSideMetrics.COMPONENT_IDS, conditions
)) {
buildServiceCalls(resultSet, calls, detectPoint);
}
@@ -272,9 +273,12 @@ public class JDBCTopologyQueryDAO implements
ITopologyQueryDAO {
while (resultSet.next()) {
Call.CallDetail call = new Call.CallDetail();
String entityId = resultSet.getString(Metrics.ENTITY_ID);
- final int componentId =
resultSet.getInt(ServiceRelationServerSideMetrics.COMPONENT_ID);
- call.buildFromServiceRelation(entityId, componentId, detectPoint);
- calls.add(call);
+ final IntList componentIds = new IntList(
+
resultSet.getString(ServiceRelationServerSideMetrics.COMPONENT_IDS));
+ for (int i = 0; i < componentIds.size(); i++) {
+ call.buildFromServiceRelation(entityId, componentIds.get(i),
detectPoint);
+ calls.add(call);
+ }
}
}
@@ -283,7 +287,7 @@ public class JDBCTopologyQueryDAO implements
ITopologyQueryDAO {
while (resultSet.next()) {
Call.CallDetail call = new Call.CallDetail();
String entityId = resultSet.getString(Metrics.ENTITY_ID);
- final int componentId =
resultSet.getInt(ServiceRelationServerSideMetrics.COMPONENT_ID);
+ final int componentId =
resultSet.getInt(ServiceRelationServerSideMetrics.COMPONENT_IDS);
call.buildFromInstanceRelation(entityId, componentId, detectPoint);
calls.add(call);
}