This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch components
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/components by this push:
     new 0a1a4f838e Support multiple component IDs in the service topology level
0a1a4f838e is described below

commit 0a1a4f838eae716cac448cd5ff299c16a396023f
Author: Wu Sheng <[email protected]>
AuthorDate: Fri Dec 16 09:49:33 2022 +0800

    Support multiple component IDs in the service topology level
---
 docs/en/changes/changes.md                         |   3 +
 .../skywalking/oap/meter/analyzer/Analyzer.java    |   4 +-
 .../service/ServiceCallRelationDispatcher.java     |   4 +-
 .../service/ServiceRelationClientSideMetrics.java  |  37 +++---
 .../service/ServiceRelationServerSideMetrics.java  |  37 +++---
 .../service/TCPServiceCallRelationDispatcher.java  |   4 +-
 .../oap/server/core/analysis/metrics/IntList.java  |   4 +-
 .../server/core/query/ServiceTopologyBuilder.java  |   3 +
 .../oap/server/core/query/type/Call.java           |   3 -
 .../core/storage/annotation/ElasticSearch.java     |  17 ++-
 .../core/storage/model/ElasticSearchExtension.java |   2 +
 .../server/core/storage/model/StorageModels.java   |  18 +--
 .../server/core/storage/model/ModelColumnTest.java |  10 +-
 .../banyandb/measure/BanyanDBTopologyQueryDAO.java | 129 +++++++++++++--------
 .../elasticsearch/base/ColumnTypeEsMapping.java    |  11 +-
 .../elasticsearch/base/StorageEsInstaller.java     |   2 +-
 .../elasticsearch/query/TopologyQueryEsDAO.java    |  17 +--
 .../jdbc/common/dao/JDBCTopologyQueryDAO.java      |  18 +--
 18 files changed, 202 insertions(+), 121 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 43d910fc15..3b1bb1e95a 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -41,6 +41,9 @@
 * Use TTL-driven interval settings for the `measure-default` group in BanyanDB.
 * Fix wrong group of non time-relative metadata in BanyanDB.
 * Refactor `StorageData#id` to the new StorageID object from a String type.
+* Support multiple component IDs in the service topology level.
+* Add `ElasticSearch.Keyword` annotation to declare the target field  type as 
`keyword`.
+* [Breaking Change] Field `component_id` of `service_relation_client_side` and 
`service_relation_server_side` have been replaced by `component_ids`(keyword).
 
 #### UI
 
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index 90a0a9e83d..be96151c0b 100644
--- 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -324,7 +324,7 @@ public class Analyzer {
         
metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
         metrics.setSourceServiceId(entity.sourceServiceId());
         metrics.setDestServiceId(entity.destServiceId());
-        metrics.setComponentId(0);
+        metrics.getComponentIds().add(0);
         metrics.setEntityId(entity.id());
         MetricsStreamProcessor.getInstance().in(metrics);
     }
@@ -334,7 +334,7 @@ public class Analyzer {
         
metrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
         metrics.setSourceServiceId(entity.sourceServiceId());
         metrics.setDestServiceId(entity.destServiceId());
-        metrics.setComponentId(0);
+        metrics.getComponentIds().add(0);
         metrics.setEntityId(entity.id());
         MetricsStreamProcessor.getInstance().in(metrics);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceCallRelationDispatcher.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceCallRelationDispatcher.java
index 867f546858..2f87bce5d5 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceCallRelationDispatcher.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceCallRelationDispatcher.java
@@ -41,7 +41,7 @@ public class ServiceCallRelationDispatcher implements 
SourceDispatcher<ServiceRe
         metrics.setTimeBucket(source.getTimeBucket());
         metrics.setSourceServiceId(source.getSourceServiceId());
         metrics.setDestServiceId(source.getDestServiceId());
-        metrics.setComponentId(source.getComponentId());
+        metrics.getComponentIds().add(source.getComponentId());
         metrics.setEntityId(source.getEntityId());
         MetricsStreamProcessor.getInstance().in(metrics);
     }
@@ -51,7 +51,7 @@ public class ServiceCallRelationDispatcher implements 
SourceDispatcher<ServiceRe
         metrics.setTimeBucket(source.getTimeBucket());
         metrics.setSourceServiceId(source.getSourceServiceId());
         metrics.setDestServiceId(source.getDestServiceId());
-        metrics.setComponentId(source.getComponentId());
+        metrics.getComponentIds().add(source.getComponentId());
         metrics.setEntityId(source.getEntityId());
         MetricsStreamProcessor.getInstance().in(metrics);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationClientSideMetrics.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationClientSideMetrics.java
index 7f1ffa402a..83c40d8ad5 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationClientSideMetrics.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationClientSideMetrics.java
@@ -22,6 +22,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import 
org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
@@ -29,6 +30,7 @@ import 
org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
 import org.apache.skywalking.oap.server.core.storage.ShardingAlgorithm;
 import org.apache.skywalking.oap.server.core.storage.StorageID;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -48,7 +50,7 @@ public class ServiceRelationClientSideMetrics extends Metrics 
{
     public static final String INDEX_NAME = "service_relation_client_side";
     public static final String SOURCE_SERVICE_ID = "source_service_id";
     public static final String DEST_SERVICE_ID = "dest_service_id";
-    public static final String COMPONENT_ID = "component_id";
+    public static final String COMPONENT_IDS = "component_ids";
 
     @Setter
     @Getter
@@ -60,8 +62,9 @@ public class ServiceRelationClientSideMetrics extends Metrics 
{
     private String destServiceId;
     @Setter
     @Getter
-    @Column(columnName = COMPONENT_ID, storageOnly = true)
-    private int componentId;
+    @Column(columnName = COMPONENT_IDS, storageOnly = true)
+    @ElasticSearch.Keyword
+    private IntList componentIds = new IntList(3);
     @Setter
     @Getter
     @Column(columnName = ENTITY_ID, length = 512)
@@ -76,11 +79,17 @@ public class ServiceRelationClientSideMetrics extends 
Metrics {
     @Override
     public boolean combine(Metrics metrics) {
         ServiceRelationClientSideMetrics serviceRelationClientSideMetrics = 
(ServiceRelationClientSideMetrics) metrics;
-        if (this.getComponentId() == 0 && 
serviceRelationClientSideMetrics.getComponentId() != 0) {
-            this.componentId = 
serviceRelationClientSideMetrics.getComponentId();
-            return true;
+        final IntList sourceIDs = this.getComponentIds();
+        final IntList targetIDs = 
serviceRelationClientSideMetrics.getComponentIds();
+        boolean changed = false;
+        for (int i = 0; i < targetIDs.size(); i++) {
+            final int targetID = targetIDs.get(i);
+            if (!sourceIDs.include(targetID)) {
+                sourceIDs.add(targetID);
+                changed = true;
+            }
         }
-        return false;
+        return changed;
     }
 
     @Override
@@ -95,7 +104,7 @@ public class ServiceRelationClientSideMetrics extends 
Metrics {
         metrics.setTimeBucket(toTimeBucketInHour());
         metrics.setSourceServiceId(getSourceServiceId());
         metrics.setDestServiceId(getDestServiceId());
-        metrics.setComponentId(getComponentId());
+        metrics.getComponentIds().copyFrom(getComponentIds());
         return metrics;
     }
 
@@ -106,7 +115,7 @@ public class ServiceRelationClientSideMetrics extends 
Metrics {
         metrics.setTimeBucket(toTimeBucketInDay());
         metrics.setSourceServiceId(getSourceServiceId());
         metrics.setDestServiceId(getDestServiceId());
-        metrics.setComponentId(getComponentId());
+        metrics.getComponentIds().copyFrom(getComponentIds());
         return metrics;
     }
 
@@ -122,8 +131,7 @@ public class ServiceRelationClientSideMetrics extends 
Metrics {
         setEntityId(remoteData.getDataStrings(0));
         setSourceServiceId(remoteData.getDataStrings(1));
         setDestServiceId(remoteData.getDataStrings(2));
-
-        setComponentId(remoteData.getDataIntegers(0));
+        setComponentIds(new IntList(remoteData.getDataStrings(3)));
 
         setTimeBucket(remoteData.getDataLongs(0));
     }
@@ -134,8 +142,7 @@ public class ServiceRelationClientSideMetrics extends 
Metrics {
         remoteBuilder.addDataStrings(getEntityId());
         remoteBuilder.addDataStrings(getSourceServiceId());
         remoteBuilder.addDataStrings(getDestServiceId());
-
-        remoteBuilder.addDataIntegers(getComponentId());
+        remoteBuilder.addDataStrings(getComponentIds().toStorageData());
 
         remoteBuilder.addDataLongs(getTimeBucket());
         return remoteBuilder;
@@ -147,7 +154,7 @@ public class ServiceRelationClientSideMetrics extends 
Metrics {
             ServiceRelationClientSideMetrics metrics = new 
ServiceRelationClientSideMetrics();
             metrics.setSourceServiceId((String) 
converter.get(SOURCE_SERVICE_ID));
             metrics.setDestServiceId((String) converter.get(DEST_SERVICE_ID));
-            metrics.setComponentId(((Number) 
converter.get(COMPONENT_ID)).intValue());
+            metrics.setComponentIds(new IntList((String) 
converter.get(COMPONENT_IDS)));
             metrics.setTimeBucket(((Number) 
converter.get(TIME_BUCKET)).longValue());
             metrics.setEntityId((String) converter.get(ENTITY_ID));
             return metrics;
@@ -159,7 +166,7 @@ public class ServiceRelationClientSideMetrics extends 
Metrics {
             converter.accept(TIME_BUCKET, storageData.getTimeBucket());
             converter.accept(SOURCE_SERVICE_ID, 
storageData.getSourceServiceId());
             converter.accept(DEST_SERVICE_ID, storageData.getDestServiceId());
-            converter.accept(COMPONENT_ID, storageData.getComponentId());
+            converter.accept(COMPONENT_IDS, storageData.getComponentIds());
             converter.accept(ENTITY_ID, storageData.getEntityId());
         }
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationServerSideMetrics.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationServerSideMetrics.java
index d519cd31a1..d2752e11d3 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationServerSideMetrics.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/ServiceRelationServerSideMetrics.java
@@ -23,6 +23,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
 import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import 
org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
@@ -31,6 +32,7 @@ import 
org.apache.skywalking.oap.server.core.storage.ShardingAlgorithm;
 import org.apache.skywalking.oap.server.core.storage.StorageID;
 import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
 import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -51,7 +53,7 @@ public class ServiceRelationServerSideMetrics extends Metrics 
{
     public static final String INDEX_NAME = "service_relation_server_side";
     public static final String SOURCE_SERVICE_ID = "source_service_id";
     public static final String DEST_SERVICE_ID = "dest_service_id";
-    public static final String COMPONENT_ID = "component_id";
+    public static final String COMPONENT_IDS = "component_ids";
 
     @Setter
     @Getter
@@ -63,8 +65,9 @@ public class ServiceRelationServerSideMetrics extends Metrics 
{
     private String destServiceId;
     @Setter
     @Getter
-    @Column(columnName = COMPONENT_ID, storageOnly = true)
-    private int componentId;
+    @Column(columnName = COMPONENT_IDS, storageOnly = true)
+    @ElasticSearch.Keyword
+    private IntList componentIds = new IntList(3);
     @Setter
     @Getter
     @Column(columnName = ENTITY_ID, length = 512)
@@ -81,11 +84,17 @@ public class ServiceRelationServerSideMetrics extends 
Metrics {
     @Override
     public boolean combine(Metrics metrics) {
         ServiceRelationServerSideMetrics serviceRelationServerSideMetrics = 
(ServiceRelationServerSideMetrics) metrics;
-        if (this.getComponentId() == 0 && 
serviceRelationServerSideMetrics.getComponentId() != 0) {
-            this.componentId = 
serviceRelationServerSideMetrics.getComponentId();
-            return true;
+        final IntList sourceIDs = this.getComponentIds();
+        final IntList targetIDs = 
serviceRelationServerSideMetrics.getComponentIds();
+        boolean changed = false;
+        for (int i = 0; i < targetIDs.size(); i++) {
+            final int targetID = targetIDs.get(i);
+            if (!sourceIDs.include(targetID)) {
+                sourceIDs.add(targetID);
+                changed = true;
+            }
         }
-        return false;
+        return changed;
     }
 
     @Override
@@ -99,7 +108,7 @@ public class ServiceRelationServerSideMetrics extends 
Metrics {
         metrics.setTimeBucket(toTimeBucketInHour());
         metrics.setSourceServiceId(getSourceServiceId());
         metrics.setDestServiceId(getDestServiceId());
-        metrics.setComponentId(getComponentId());
+        metrics.getComponentIds().copyFrom(getComponentIds());
         metrics.setEntityId(getEntityId());
         return metrics;
     }
@@ -110,7 +119,7 @@ public class ServiceRelationServerSideMetrics extends 
Metrics {
         metrics.setTimeBucket(toTimeBucketInDay());
         metrics.setSourceServiceId(getSourceServiceId());
         metrics.setDestServiceId(getDestServiceId());
-        metrics.setComponentId(getComponentId());
+        metrics.getComponentIds().copyFrom(getComponentIds());
         metrics.setEntityId(getEntityId());
         return metrics;
     }
@@ -127,8 +136,7 @@ public class ServiceRelationServerSideMetrics extends 
Metrics {
         setEntityId(remoteData.getDataStrings(0));
         setSourceServiceId(remoteData.getDataStrings(1));
         setDestServiceId(remoteData.getDataStrings(2));
-
-        setComponentId(remoteData.getDataIntegers(0));
+        setComponentIds(new IntList(remoteData.getDataStrings(3)));
 
         setTimeBucket(remoteData.getDataLongs(0));
     }
@@ -139,8 +147,7 @@ public class ServiceRelationServerSideMetrics extends 
Metrics {
         remoteBuilder.addDataStrings(getEntityId());
         remoteBuilder.addDataStrings(getSourceServiceId());
         remoteBuilder.addDataStrings(getDestServiceId());
-
-        remoteBuilder.addDataIntegers(getComponentId());
+        remoteBuilder.addDataStrings(getComponentIds().toStorageData());
 
         remoteBuilder.addDataLongs(getTimeBucket());
         return remoteBuilder;
@@ -153,7 +160,7 @@ public class ServiceRelationServerSideMetrics extends 
Metrics {
             metrics.setEntityId((String) converter.get(ENTITY_ID));
             metrics.setSourceServiceId((String) 
converter.get(SOURCE_SERVICE_ID));
             metrics.setDestServiceId((String) converter.get(DEST_SERVICE_ID));
-            metrics.setComponentId(((Number) 
converter.get(COMPONENT_ID)).intValue());
+            metrics.setComponentIds(new IntList((String) 
converter.get(COMPONENT_IDS)));
             metrics.setTimeBucket(((Number) 
converter.get(TIME_BUCKET)).longValue());
             return metrics;
         }
@@ -164,7 +171,7 @@ public class ServiceRelationServerSideMetrics extends 
Metrics {
             converter.accept(ENTITY_ID, storageData.getEntityId());
             converter.accept(SOURCE_SERVICE_ID, 
storageData.getSourceServiceId());
             converter.accept(DEST_SERVICE_ID, storageData.getDestServiceId());
-            converter.accept(COMPONENT_ID, storageData.getComponentId());
+            converter.accept(COMPONENT_IDS, storageData.getComponentIds());
             converter.accept(TIME_BUCKET, storageData.getTimeBucket());
         }
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/TCPServiceCallRelationDispatcher.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/TCPServiceCallRelationDispatcher.java
index 65e9a4534c..c3f2804e61 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/TCPServiceCallRelationDispatcher.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/relation/service/TCPServiceCallRelationDispatcher.java
@@ -41,7 +41,7 @@ public class TCPServiceCallRelationDispatcher implements 
SourceDispatcher<TCPSer
         metrics.setTimeBucket(source.getTimeBucket());
         metrics.setSourceServiceId(source.getSourceServiceId());
         metrics.setDestServiceId(source.getDestServiceId());
-        metrics.setComponentId(source.getComponentId());
+        metrics.getComponentIds().add(source.getComponentId());
         metrics.setEntityId(source.getEntityId());
         MetricsStreamProcessor.getInstance().in(metrics);
     }
@@ -51,7 +51,7 @@ public class TCPServiceCallRelationDispatcher implements 
SourceDispatcher<TCPSer
         metrics.setTimeBucket(source.getTimeBucket());
         metrics.setSourceServiceId(source.getSourceServiceId());
         metrics.setDestServiceId(source.getDestServiceId());
-        metrics.setComponentId(source.getComponentId());
+        metrics.getComponentIds().add(source.getComponentId());
         metrics.setEntityId(source.getEntityId());
         MetricsStreamProcessor.getInstance().in(metrics);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
index b5bd9efe90..7315b9d31a 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
@@ -77,8 +77,8 @@ public class IntList implements 
StorageDataComplexObject<IntList> {
         this.data.addAll(source.data);
     }
 
-    public void add(final int rank) {
-        this.data.add(rank);
+    public void add(final int value) {
+        this.data.add(value);
     }
 
     public int get(final int idx) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ServiceTopologyBuilder.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ServiceTopologyBuilder.java
index 3767e1f908..f61ba61a35 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ServiceTopologyBuilder.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/ServiceTopologyBuilder.java
@@ -108,6 +108,9 @@ class ServiceTopologyBuilder {
                 call.addDetectPoint(DetectPoint.CLIENT);
                 
call.addSourceComponent(componentLibraryCatalogService.getComponentName(clientCall.getComponentId()));
                 calls.add(call);
+            } else {
+                Call call = callMap.get(relationId);
+                
call.addSourceComponent(componentLibraryCatalogService.getComponentName(clientCall.getComponentId()));
             }
         }
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Call.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Call.java
index 49caff4857..4676649590 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Call.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Call.java
@@ -36,9 +36,6 @@ public class Call {
     private String id;
     private List<DetectPoint> detectPoints;
 
-    private List<Integer> sourceComponentIDs;
-    private List<Integer> targetComponentIDs;
-
     public Call() {
         sourceComponents = new ArrayList<>();
         targetComponents = new ArrayList<>();
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
index 5d8b683489..b592e5564f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ElasticSearch.java
@@ -69,12 +69,27 @@ public @interface ElasticSearch {
         }
     }
 
+    /**
+     * Keyword represents the annotated field needs a keyword type in the 
ElasticSearch.
+     * Typically, this annotation is for a field with
+     * {@link 
org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject} 
type, which uses the `text`
+     * type by default.
+     *
+     * @since 9.4.0
+     */
+    @Target({ElementType.FIELD})
+    @Retention(RetentionPolicy.RUNTIME)
+    @interface Keyword {
+
+    }
+
     @Target({ElementType.FIELD})
     @Retention(RetentionPolicy.RUNTIME)
     @interface Column {
 
         /**
-         * Warning: this is only used to solve the conflict among the existing 
columns since we need support to merge all metrics
+         * Warning: this is only used to solve the conflict among the existing 
columns since we need support to merge
+         * all metrics
          * in one physical index template. When creating a new column, we 
should avoid the compatibility issue
          * between these 2 storage modes rather than use this alias.
          */
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
index 3323bd8b57..2ff5264b2a 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ElasticSearchExtension.java
@@ -38,6 +38,8 @@ public class ElasticSearchExtension {
 
     private final String columnAlias;
 
+    private final boolean isKeyword;
+
     public boolean needMatchQuery() {
         return analyzer != null;
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index a8cc41b518..f196f8aa5b 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -153,7 +153,8 @@ public class StorageModels implements IModelManager, 
ModelCreator, ModelManipula
             if (field.isAnnotationPresent(Column.class)) {
                 if 
(field.isAnnotationPresent(SQLDatabase.AdditionalEntity.class)) {
                     if (!record) {
-                        throw new IllegalStateException("Model [" + modelName 
+ "] is not a Record, @SQLDatabase.AdditionalEntity only supports Record.");
+                        throw new IllegalStateException(
+                            "Model [" + modelName + "] is not a Record, 
@SQLDatabase.AdditionalEntity only supports Record.");
                     }
                 }
 
@@ -185,9 +186,11 @@ public class StorageModels implements IModelManager, 
ModelCreator, ModelManipula
                 final ElasticSearch.MatchQuery elasticSearchAnalyzer = 
field.getAnnotation(
                     ElasticSearch.MatchQuery.class);
                 final ElasticSearch.Column elasticSearchColumn = 
field.getAnnotation(ElasticSearch.Column.class);
+                final ElasticSearch.Keyword keywordColumn = 
field.getAnnotation(ElasticSearch.Keyword.class);
                 ElasticSearchExtension elasticSearchExtension = new 
ElasticSearchExtension(
                     elasticSearchAnalyzer == null ? null : 
elasticSearchAnalyzer.analyzer(),
-                    elasticSearchColumn == null ? null : 
elasticSearchColumn.columnAlias()
+                    elasticSearchColumn == null ? null : 
elasticSearchColumn.columnAlias(),
+                    keywordColumn == null ? false : true
                 );
 
                 // BanyanDB extension
@@ -198,7 +201,7 @@ public class StorageModels implements IModelManager, 
ModelCreator, ModelManipula
                 final BanyanDB.NoIndexing banyanDBNoIndex = 
field.getAnnotation(
                     BanyanDB.NoIndexing.class);
                 final BanyanDB.IndexRule banyanDBIndexRule = 
field.getAnnotation(
-                        BanyanDB.IndexRule.class);
+                    BanyanDB.IndexRule.class);
                 BanyanDBExtension banyanDBExtension = new BanyanDBExtension(
                     banyanDBSeriesID == null ? -1 : banyanDBSeriesID.index(),
                     banyanDBGlobalIndex != null,
@@ -248,13 +251,14 @@ public class StorageModels implements IModelManager, 
ModelCreator, ModelManipula
             }
         }
 
-       // For the annotation need to be declared on the superclass, the other 
annotation should be declared on the subclass.
+        // For the annotation need to be declared on the superclass, the other 
annotation should be declared on the subclass.
         if (!sqlDBModelExtension.getSharding().isPresent() && 
clazz.isAnnotationPresent(SQLDatabase.Sharding.class)) {
             SQLDatabase.Sharding sharding = 
clazz.getAnnotation(SQLDatabase.Sharding.class);
             sqlDBModelExtension.setSharding(
-                Optional.of(new 
SQLDatabaseModelExtension.Sharding(sharding.shardingAlgorithm(),
-                                                                   
sharding.dataSourceShardingColumn(),
-                                                                   
sharding.tableShardingColumn()
+                Optional.of(new SQLDatabaseModelExtension.Sharding(
+                    sharding.shardingAlgorithm(),
+                    sharding.dataSourceShardingColumn(),
+                    sharding.tableShardingColumn()
                 )));
         }
 
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
index e2574a1dde..51c6cb9e62 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java
@@ -31,7 +31,7 @@ public class ModelColumnTest {
                                              false, false, true, 0,
                                              new SQLDatabaseExtension(),
                                              new ElasticSearchExtension(
-                                                 
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
+                                                 
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
                                              new BanyanDBExtension(-1, false, 
true, BanyanDB.IndexRule.IndexType.INVERTED)
         );
         Assert.assertEquals(true, column.isStorageOnly());
@@ -40,7 +40,7 @@ public class ModelColumnTest {
         column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, 
DataTable.class,
                                  false, false, true, 200,
                                  new SQLDatabaseExtension(),
-                                 new 
ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, 
"abc"),
+                                 new 
ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, 
"abc", false),
                                  new BanyanDBExtension(-1, false, true, 
BanyanDB.IndexRule.IndexType.INVERTED)
         );
         Assert.assertEquals(true, column.isStorageOnly());
@@ -50,7 +50,7 @@ public class ModelColumnTest {
         column = new ModelColumn(new ColumnName("", "abc"), String.class, 
String.class,
                                  false, false, true, 200,
                                  new SQLDatabaseExtension(),
-                                 new 
ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, 
"abc"),
+                                 new 
ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, 
"abc", false),
                                  new BanyanDBExtension(-1, false, true, 
BanyanDB.IndexRule.IndexType.INVERTED)
         );
         Assert.assertEquals(false, column.isStorageOnly());
@@ -63,7 +63,7 @@ public class ModelColumnTest {
                                              true, false, true, 200,
                                              new SQLDatabaseExtension(),
                                              new ElasticSearchExtension(
-                                                 
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
+                                                 
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
                                              new BanyanDBExtension(-1, false, 
true, BanyanDB.IndexRule.IndexType.INVERTED)
         );
     }
@@ -74,7 +74,7 @@ public class ModelColumnTest {
                                              true, true, false, 200,
                                              new SQLDatabaseExtension(),
                                              new ElasticSearchExtension(
-                                                 
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
+                                                 
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false),
                                              new BanyanDBExtension(-1, false, 
true, BanyanDB.IndexRule.IndexType.INVERTED)
         );
     }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
index 5190ac43be..4138537757 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
@@ -19,6 +19,12 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
 import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
 import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
@@ -31,19 +37,12 @@ import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.Pr
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationServerSideMetrics;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.type.Call;
 import org.apache.skywalking.oap.server.core.source.DetectPoint;
 import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
@@ -57,7 +56,8 @@ public class BanyanDBTopologyQueryDAO extends 
AbstractBanyanDBDAO implements ITo
     }
 
     @Override
-    public List<Call.CallDetail> 
loadServiceRelationsDetectedAtServerSide(Duration duration, List<String> 
serviceIds) throws IOException {
+    public List<Call.CallDetail> 
loadServiceRelationsDetectedAtServerSide(Duration duration,
+                                                                          
List<String> serviceIds) throws IOException {
         if (CollectionUtils.isEmpty(serviceIds)) {
             throw new UnexpectedException("Service id is empty");
         }
@@ -68,7 +68,8 @@ public class BanyanDBTopologyQueryDAO extends 
AbstractBanyanDBDAO implements ITo
     }
 
     @Override
-    public List<Call.CallDetail> 
loadServiceRelationDetectedAtClientSide(Duration duration, List<String> 
serviceIds) throws IOException {
+    public List<Call.CallDetail> 
loadServiceRelationDetectedAtClientSide(Duration duration,
+                                                                         
List<String> serviceIds) throws IOException {
         if (CollectionUtils.isEmpty(serviceIds)) {
             throw new UnexpectedException("Service id is empty");
         }
@@ -108,7 +109,9 @@ public class BanyanDBTopologyQueryDAO extends 
AbstractBanyanDBDAO implements ITo
         return queryBuilderList;
     }
 
-    List<Call.CallDetail> queryServiceRelation(Duration duration, 
List<QueryBuilder<MeasureQuery>> queryBuilderList, DetectPoint detectPoint) 
throws IOException {
+    List<Call.CallDetail> queryServiceRelation(Duration duration,
+                                               
List<QueryBuilder<MeasureQuery>> queryBuilderList,
+                                               DetectPoint detectPoint) throws 
IOException {
         long startTB = 0;
         long endTB = 0;
         if (nonNull(duration)) {
@@ -120,46 +123,59 @@ public class BanyanDBTopologyQueryDAO extends 
AbstractBanyanDBDAO implements ITo
             timestampRange = new 
TimestampRange(TimeBucket.getTimestamp(startTB), 
TimeBucket.getTimestamp(endTB));
         }
         final String modelName = detectPoint == DetectPoint.SERVER ? 
ServiceRelationServerSideMetrics.INDEX_NAME :
-                ServiceRelationClientSideMetrics.INDEX_NAME;
+            ServiceRelationClientSideMetrics.INDEX_NAME;
         final Map<String, Call.CallDetail> callMap = new HashMap<>();
         for (final QueryBuilder<MeasureQuery> q : queryBuilderList) {
             MeasureQueryResponse resp = query(modelName,
-                    
ImmutableSet.of(ServiceRelationClientSideMetrics.COMPONENT_ID,
-                            ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
-                            ServiceRelationClientSideMetrics.DEST_SERVICE_ID,
-                            Metrics.ENTITY_ID),
-                    Collections.emptySet(), timestampRange, q);
+                                              ImmutableSet.of(
+                                                  
ServiceRelationClientSideMetrics.COMPONENT_IDS,
+                                                  
ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
+                                                  
ServiceRelationClientSideMetrics.DEST_SERVICE_ID,
+                                                  Metrics.ENTITY_ID
+                                              ),
+                                              Collections.emptySet(), 
timestampRange, q
+            );
             if (resp.size() == 0) {
                 continue;
             }
             final Call.CallDetail call = new Call.CallDetail();
             final String entityId = 
resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
-            final int componentId = ((Number) 
resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_ID)).intValue();
-            call.buildFromServiceRelation(entityId, componentId, detectPoint);
-            callMap.putIfAbsent(entityId, call);
+            final IntList componentIds = new IntList(
+                
resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_IDS));
+            for (int i = 0; i < componentIds.size(); i++) {
+                call.buildFromServiceRelation(entityId, componentIds.get(i), 
detectPoint);
+                callMap.putIfAbsent(entityId, call);
+            }
         }
         return new ArrayList<>(callMap.values());
     }
 
     @Override
-    public List<Call.CallDetail> 
loadInstanceRelationDetectedAtServerSide(String clientServiceId, String 
serverServiceId, Duration duration) throws IOException {
-        List<QueryBuilder<MeasureQuery>> queryBuilderList = 
buildInstanceRelationsQueries(clientServiceId, serverServiceId);
+    public List<Call.CallDetail> 
loadInstanceRelationDetectedAtServerSide(String clientServiceId,
+                                                                          
String serverServiceId,
+                                                                          
Duration duration) throws IOException {
+        List<QueryBuilder<MeasureQuery>> queryBuilderList = 
buildInstanceRelationsQueries(
+            clientServiceId, serverServiceId);
         return queryInstanceRelation(duration, queryBuilderList, 
DetectPoint.SERVER);
     }
 
     @Override
-    public List<Call.CallDetail> 
loadInstanceRelationDetectedAtClientSide(String clientServiceId, String 
serverServiceId, Duration duration) throws IOException {
-        List<QueryBuilder<MeasureQuery>> queryBuilderList = 
buildInstanceRelationsQueries(clientServiceId, serverServiceId);
+    public List<Call.CallDetail> 
loadInstanceRelationDetectedAtClientSide(String clientServiceId,
+                                                                          
String serverServiceId,
+                                                                          
Duration duration) throws IOException {
+        List<QueryBuilder<MeasureQuery>> queryBuilderList = 
buildInstanceRelationsQueries(
+            clientServiceId, serverServiceId);
         return queryInstanceRelation(duration, queryBuilderList, 
DetectPoint.CLIENT);
     }
 
-    private List<QueryBuilder<MeasureQuery>> 
buildInstanceRelationsQueries(String clientServiceId, String serverServiceId) {
+    private List<QueryBuilder<MeasureQuery>> 
buildInstanceRelationsQueries(String clientServiceId,
+                                                                           
String serverServiceId) {
         List<QueryBuilder<MeasureQuery>> queryBuilderList = new ArrayList<>(2);
         queryBuilderList.add(new QueryBuilder<MeasureQuery>() {
             @Override
             protected void apply(MeasureQuery query) {
                 
query.and(eq(ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID, 
clientServiceId))
-                        
.and(eq(ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID, 
serverServiceId));
+                     
.and(eq(ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID, 
serverServiceId));
             }
         });
 
@@ -167,13 +183,15 @@ public class BanyanDBTopologyQueryDAO extends 
AbstractBanyanDBDAO implements ITo
             @Override
             protected void apply(MeasureQuery query) {
                 
query.and(eq(ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID, 
clientServiceId))
-                        
.and(eq(ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID, 
serverServiceId));
+                     
.and(eq(ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID, 
serverServiceId));
             }
         });
         return queryBuilderList;
     }
 
-    List<Call.CallDetail> queryInstanceRelation(Duration duration, 
List<QueryBuilder<MeasureQuery>> queryBuilderList, DetectPoint detectPoint) 
throws IOException {
+    List<Call.CallDetail> queryInstanceRelation(Duration duration,
+                                                
List<QueryBuilder<MeasureQuery>> queryBuilderList,
+                                                DetectPoint detectPoint) 
throws IOException {
         long startTB = 0;
         long endTB = 0;
         if (nonNull(duration)) {
@@ -189,17 +207,23 @@ public class BanyanDBTopologyQueryDAO extends 
AbstractBanyanDBDAO implements ITo
         final Map<String, Call.CallDetail> callMap = new HashMap<>();
         for (final QueryBuilder<MeasureQuery> q : queryBuilderList) {
             MeasureQueryResponse resp = query(modelName,
-                    
ImmutableSet.of(ServiceInstanceRelationServerSideMetrics.COMPONENT_ID,
-                            
ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
-                            
ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
-                            Metrics.ENTITY_ID),
-                    Collections.emptySet(), timestampRange, q);
+                                              ImmutableSet.of(
+                                                  
ServiceInstanceRelationServerSideMetrics.COMPONENT_ID,
+                                                  
ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
+                                                  
ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
+                                                  Metrics.ENTITY_ID
+                                              ),
+                                              Collections.emptySet(), 
timestampRange, q
+            );
             if (resp.size() == 0) {
                 continue;
             }
             final Call.CallDetail call = new Call.CallDetail();
             final String entityId = 
resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
-            final int componentId = ((Number) 
resp.getDataPoints().get(0).getTagValue(ServiceRelationClientSideMetrics.COMPONENT_ID)).intValue();
+            final int componentId = ((Number) resp.getDataPoints()
+                                                  .get(0)
+                                                  .getTagValue(
+                                                      
ServiceRelationClientSideMetrics.COMPONENT_IDS)).intValue();
             call.buildFromInstanceRelation(entityId, componentId, detectPoint);
             callMap.putIfAbsent(entityId, call);
         }
@@ -213,12 +237,14 @@ public class BanyanDBTopologyQueryDAO extends 
AbstractBanyanDBDAO implements ITo
     }
 
     @Override
-    public List<Call.CallDetail> 
loadProcessRelationDetectedAtClientSide(String serviceInstanceId, Duration 
duration) throws IOException {
+    public List<Call.CallDetail> 
loadProcessRelationDetectedAtClientSide(String serviceInstanceId,
+                                                                         
Duration duration) throws IOException {
         return queryProcessRelation(duration, serviceInstanceId, 
DetectPoint.CLIENT);
     }
 
     @Override
-    public List<Call.CallDetail> 
loadProcessRelationDetectedAtServerSide(String serviceInstanceId, Duration 
duration) throws IOException {
+    public List<Call.CallDetail> 
loadProcessRelationDetectedAtServerSide(String serviceInstanceId,
+                                                                         
Duration duration) throws IOException {
         return queryProcessRelation(duration, serviceInstanceId, 
DetectPoint.SERVER);
     }
 
@@ -240,7 +266,9 @@ public class BanyanDBTopologyQueryDAO extends 
AbstractBanyanDBDAO implements ITo
         return queryBuilderList;
     }
 
-    List<Call.CallDetail> queryEndpointRelation(Duration duration, 
List<QueryBuilder<MeasureQuery>> queryBuilderList, DetectPoint detectPoint) 
throws IOException {
+    List<Call.CallDetail> queryEndpointRelation(Duration duration,
+                                                
List<QueryBuilder<MeasureQuery>> queryBuilderList,
+                                                DetectPoint detectPoint) 
throws IOException {
         long startTB = 0;
         long endTB = 0;
         if (nonNull(duration)) {
@@ -254,10 +282,13 @@ public class BanyanDBTopologyQueryDAO extends 
AbstractBanyanDBDAO implements ITo
         final Map<String, Call.CallDetail> callMap = new HashMap<>();
         for (final QueryBuilder<MeasureQuery> q : queryBuilderList) {
             MeasureQueryResponse resp = 
query(EndpointRelationServerSideMetrics.INDEX_NAME,
-                    
ImmutableSet.of(EndpointRelationServerSideMetrics.DEST_ENDPOINT,
-                            EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
-                            Metrics.ENTITY_ID),
-                    Collections.emptySet(), timestampRange, q);
+                                              ImmutableSet.of(
+                                                  
EndpointRelationServerSideMetrics.DEST_ENDPOINT,
+                                                  
EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
+                                                  Metrics.ENTITY_ID
+                                              ),
+                                              Collections.emptySet(), 
timestampRange, q
+            );
             if (resp.size() == 0) {
                 continue;
             }
@@ -269,7 +300,9 @@ public class BanyanDBTopologyQueryDAO extends 
AbstractBanyanDBDAO implements ITo
         return new ArrayList<>(callMap.values());
     }
 
-    List<Call.CallDetail> queryProcessRelation(Duration duration, String 
serviceInstanceId, DetectPoint detectPoint) throws IOException {
+    List<Call.CallDetail> queryProcessRelation(Duration duration,
+                                               String serviceInstanceId,
+                                               DetectPoint detectPoint) throws 
IOException {
         long startTB = 0;
         long endTB = 0;
         if (nonNull(duration)) {
@@ -284,16 +317,20 @@ public class BanyanDBTopologyQueryDAO extends 
AbstractBanyanDBDAO implements ITo
             ProcessRelationClientSideMetrics.INDEX_NAME;
         final Map<String, Call.CallDetail> callMap = new HashMap<>();
         MeasureQueryResponse resp = query(modelName,
-            ImmutableSet.of(Metrics.ENTITY_ID, 
ProcessRelationClientSideMetrics.COMPONENT_ID),
-            Collections.emptySet(), timestampRange, new 
QueryBuilder<MeasureQuery>() {
+                                          ImmutableSet.of(
+                                              Metrics.ENTITY_ID, 
ProcessRelationClientSideMetrics.COMPONENT_ID),
+                                          Collections.emptySet(), 
timestampRange, new QueryBuilder<MeasureQuery>() {
                 @Override
                 protected void apply(MeasureQuery query) {
                     
query.and(eq(ProcessRelationServerSideMetrics.SERVICE_INSTANCE_ID, 
serviceInstanceId));
                 }
-            });
+            }
+        );
         final Call.CallDetail call = new Call.CallDetail();
         final String entityId = 
resp.getDataPoints().get(0).getTagValue(Metrics.ENTITY_ID);
-        final int componentId = ((Number) 
resp.getDataPoints().get(0).getTagValue(ProcessRelationClientSideMetrics.COMPONENT_ID)).intValue();
+        final int componentId = ((Number) resp.getDataPoints()
+                                              .get(0)
+                                              
.getTagValue(ProcessRelationClientSideMetrics.COMPONENT_ID)).intValue();
         call.buildProcessRelation(entityId, componentId, detectPoint);
         callMap.putIfAbsent(entityId, call);
         return new ArrayList<>(callMap.values());
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
index 7a71bc4f4b..e0d2af9540 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -23,20 +23,19 @@ import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.List;
 import org.apache.skywalking.oap.server.core.analysis.Layer;
-import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping;
+import 
org.apache.skywalking.oap.server.core.storage.model.ElasticSearchExtension;
 import 
org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
 
-public class ColumnTypeEsMapping implements DataTypeMapping {
+public class ColumnTypeEsMapping {
 
-    @Override
-    public String transform(Class<?> type, Type genericType) {
+    public String transform(Class<?> type, Type genericType, final 
ElasticSearchExtension elasticSearchExtension) {
         if (Integer.class.equals(type) || int.class.equals(type) || 
Layer.class.equals(type)) {
             return "integer";
         } else if (Long.class.equals(type) || long.class.equals(type)) {
             return "long";
         } else if (Double.class.equals(type) || double.class.equals(type)) {
             return "double";
-        } else if (String.class.equals(type)) {
+        } else if (String.class.equals(type) || 
elasticSearchExtension.isKeyword()) {
             return "keyword";
         } else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
             return "text";
@@ -46,7 +45,7 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
             return "text";
         } else if (List.class.isAssignableFrom(type)) {
             final Type elementType = ((ParameterizedType) 
genericType).getActualTypeArguments()[0];
-            return transform((Class<?>) elementType, elementType);
+            return transform((Class<?>) elementType, elementType, 
elasticSearchExtension);
         } else {
             throw new IllegalArgumentException("Unsupported data type: " + 
type.getName());
         }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index c3f9bd4619..515f1a3092 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -302,7 +302,7 @@ public class StorageEsInstaller extends ModelInstaller {
         Map<String, Object> properties = new HashMap<>();
         Mappings.Source source = new Mappings.Source();
         for (ModelColumn columnDefine : model.getColumns()) {
-            final String type = 
columnTypeEsMapping.transform(columnDefine.getType(), 
columnDefine.getGenericType());
+            final String type = 
columnTypeEsMapping.transform(columnDefine.getType(), 
columnDefine.getGenericType(), columnDefine.getElasticSearchExtension());
             String columnName = columnDefine.getColumnName().getName();
             String alias = 
columnDefine.getElasticSearchExtension().getColumnAlias();
             if (!config.isLogicSharding() && alias != null) {
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
index 5527eeb018..15137fdab8 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopologyQueryEsDAO.java
@@ -37,6 +37,7 @@ import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.Pr
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationServerSideMetrics;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.type.Call;
@@ -256,7 +257,7 @@ public class TopologyQueryEsDAO extends EsDAO implements 
ITopologyQueryDAO {
             String entityId = (String) entityBucket.get("key");
             final Map<String, Object> componentTerms =
                 (Map<String, Object>) entityBucket.get(
-                    ServiceRelationServerSideMetrics.COMPONENT_ID);
+                    ServiceRelationServerSideMetrics.COMPONENT_IDS);
             final List<Map<String, Object>> subAgg =
                 (List<Map<String, Object>>) componentTerms.get("buckets");
             final int componentId = ((Number) 
subAgg.iterator().next().get("key")).intValue();
@@ -275,8 +276,8 @@ public class TopologyQueryEsDAO extends EsDAO implements 
ITopologyQueryDAO {
             Aggregation
                 .terms(Metrics.ENTITY_ID).field(Metrics.ENTITY_ID)
                 .subAggregation(
-                    
Aggregation.terms(ServiceRelationServerSideMetrics.COMPONENT_ID)
-                               
.field(ServiceRelationServerSideMetrics.COMPONENT_ID)
+                    
Aggregation.terms(ServiceRelationServerSideMetrics.COMPONENT_IDS)
+                               
.field(ServiceRelationServerSideMetrics.COMPONENT_IDS)
                                
.executionHint(TermsAggregationBuilder.ExecutionHint.MAP)
                                
.collectMode(TermsAggregationBuilder.CollectMode.BREADTH_FIRST))
                 .executionHint(TermsAggregationBuilder.ExecutionHint.MAP)
@@ -296,14 +297,16 @@ public class TopologyQueryEsDAO extends EsDAO implements 
ITopologyQueryDAO {
             String entityId = (String) entityBucket.get("key");
             final Map<String, Object> componentTerms =
                 (Map<String, Object>) entityBucket.get(
-                    ServiceRelationServerSideMetrics.COMPONENT_ID);
+                    ServiceRelationServerSideMetrics.COMPONENT_IDS);
             final List<Map<String, Object>> subAgg =
                 (List<Map<String, Object>>) componentTerms.get("buckets");
-            final int componentId = ((Number) 
subAgg.iterator().next().get("key")).intValue();
+            final IntList componentIds = new IntList((String) 
subAgg.iterator().next().get("key"));
 
             Call.CallDetail call = new Call.CallDetail();
-            call.buildFromServiceRelation(entityId, componentId, detectPoint);
-            calls.add(call);
+            for (int i = 0; i < componentIds.size(); i++) {
+                call.buildFromServiceRelation(entityId, componentIds.get(i), 
detectPoint);
+                calls.add(call);
+            }
         }
         return calls;
     }
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCTopologyQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCTopologyQueryDAO.java
index 1f485929a0..165f9c5958 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCTopologyQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCTopologyQueryDAO.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import lombok.RequiredArgsConstructor;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
@@ -31,13 +32,13 @@ import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.Pr
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationServerSideMetrics;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.type.Call;
 import org.apache.skywalking.oap.server.core.source.DetectPoint;
 import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import lombok.RequiredArgsConstructor;
 
 @RequiredArgsConstructor
 public class JDBCTopologyQueryDAO implements ITopologyQueryDAO {
@@ -157,11 +158,11 @@ public class JDBCTopologyQueryDAO implements 
ITopologyQueryDAO {
         try (Connection connection = jdbcClient.getConnection()) {
             try (ResultSet resultSet = jdbcClient.executeQuery(
                 connection,
-                "select " + Metrics.ENTITY_ID + ", " + 
ServiceRelationServerSideMetrics.COMPONENT_ID
+                "select " + Metrics.ENTITY_ID + ", " + 
ServiceRelationServerSideMetrics.COMPONENT_IDS
                     + " from " + tableName + " where " + Metrics.TIME_BUCKET + 
">= ? and "
                     + Metrics.TIME_BUCKET + "<=? " + serviceIdMatchSql
                     .toString() +
-                    " group by " + Metrics.ENTITY_ID + "," + 
ServiceRelationServerSideMetrics.COMPONENT_ID, conditions
+                    " group by " + Metrics.ENTITY_ID + "," + 
ServiceRelationServerSideMetrics.COMPONENT_IDS, conditions
             )) {
                 buildServiceCalls(resultSet, calls, detectPoint);
             }
@@ -272,9 +273,12 @@ public class JDBCTopologyQueryDAO implements 
ITopologyQueryDAO {
         while (resultSet.next()) {
             Call.CallDetail call = new Call.CallDetail();
             String entityId = resultSet.getString(Metrics.ENTITY_ID);
-            final int componentId = 
resultSet.getInt(ServiceRelationServerSideMetrics.COMPONENT_ID);
-            call.buildFromServiceRelation(entityId, componentId, detectPoint);
-            calls.add(call);
+            final IntList componentIds = new IntList(
+                
resultSet.getString(ServiceRelationServerSideMetrics.COMPONENT_IDS));
+            for (int i = 0; i < componentIds.size(); i++) {
+                call.buildFromServiceRelation(entityId, componentIds.get(i), 
detectPoint);
+                calls.add(call);
+            }
         }
     }
 
@@ -283,7 +287,7 @@ public class JDBCTopologyQueryDAO implements 
ITopologyQueryDAO {
         while (resultSet.next()) {
             Call.CallDetail call = new Call.CallDetail();
             String entityId = resultSet.getString(Metrics.ENTITY_ID);
-            final int componentId = 
resultSet.getInt(ServiceRelationServerSideMetrics.COMPONENT_ID);
+            final int componentId = 
resultSet.getInt(ServiceRelationServerSideMetrics.COMPONENT_IDS);
             call.buildFromInstanceRelation(entityId, componentId, detectPoint);
             calls.add(call);
         }

Reply via email to