This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c273d6f80 [Feature] Add kafka monitoring (#11282)
1c273d6f80 is described below

commit 1c273d6f802c1e3b24701cff157cfe7776ae4322
Author: Zhu Wang <[email protected]>
AuthorDate: Wed Sep 13 06:12:34 2023 -0700

    [Feature] Add kafka monitoring (#11282)
---
 .github/workflows/skywalking.yaml                  |  14 +-
 docs/en/changes/changes.md                         |   1 +
 docs/en/setup/backend/backend-kafka-monitoring.md  |  84 +++
 docs/menu.yml                                      |   2 +
 .../skywalking/oap/server/core/analysis/Layer.java |   7 +-
 .../ui/template/UITemplateInitializer.java         |   1 +
 .../src/main/resources/application.yml             |   2 +-
 .../resources/otel-rules/kafka/kafka-broker.yaml   | 118 ++++
 .../resources/otel-rules/kafka/kafka-cluster.yaml  |  58 ++
 .../kafka/kafka-broker.json                        | 598 +++++++++++++++++++++
 .../kafka/kafka-cluster.json                       | 224 ++++++++
 .../ui-initialized-templates/kafka/kafka-root.json |  42 ++
 .../resources/ui-initialized-templates/menu.yaml   |   5 +
 .../e2e-v2/cases/kafka/kafka-monitoring/Dockerfile |  26 +
 .../kafka/kafka-monitoring/docker-compose.yml      | 135 +++++
 test/e2e-v2/cases/kafka/kafka-monitoring/e2e.yaml  |  37 ++
 .../kafka/kafka-monitoring/expected/instance.yml   |  26 +
 .../expected/metrics-has-value-gc-label.yml        |  46 ++
 .../expected/metrics-has-value-instance-label.yml  |  46 ++
 .../expected/metrics-has-value-jvm-label.yml       |  33 ++
 .../expected/metrics-has-value-scource-label.yml   |  59 ++
 .../expected/metrics-has-value-service-label.yml   |  33 ++
 .../expected/metrics-has-value.yml                 |  31 ++
 .../kafka/kafka-monitoring/expected/service.yml    |  24 +
 .../cases/kafka/kafka-monitoring/kafka-cases.yaml  |  85 +++
 .../kafka-monitoring/otel-collector-config.yaml    |  56 ++
 26 files changed, 1788 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/skywalking.yaml 
b/.github/workflows/skywalking.yaml
index ceef78e804..bfa47123a5 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -637,6 +637,8 @@ jobs:
             config: test/e2e-v2/cases/mongodb/e2e.yaml
           - name: RabbitMQ
             config: test/e2e-v2/cases/rabbitmq/e2e.yaml
+          - name: Kafka
+            config: test/e2e-v2/cases/kafka/kafka-monitoring/e2e.yaml
           - name: MQE Service
             config: test/e2e-v2/cases/mqe/e2e.yaml
 
@@ -674,7 +676,9 @@ jobs:
           name: docker-images-11
           path: docker-images
       - name: Load docker images
-        run: find docker-images -name "*.tar" -exec docker load -i {} \;
+        run: |
+          find docker-images -name "*.tar" -exec docker load -i {} \;
+          find docker-images -name "*.tar" -exec rm {} \;
       - name: Cache maven repository
         uses: actions/cache@v3
         with:
@@ -730,7 +734,9 @@ jobs:
           name: docker-images-11
           path: docker-images
       - name: Load docker images
-        run: find docker-images -name "*.tar" -exec docker load -i {} \;
+        run: |
+          find docker-images -name "*.tar" -exec docker load -i {} \;
+          find docker-images -name "*.tar" -exec rm {} \;
       - name: ${{ matrix.test.name }}
         uses: 
apache/skywalking-infra-e2e@0a5b398fc9668ccb848b16e6da4f09180955dc3e
         env:
@@ -768,7 +774,9 @@ jobs:
           name: docker-images-${{ matrix.java-version }}
           path: docker-images
       - name: Load docker images
-        run: find docker-images -name "*.tar" -exec docker load -i {} \;
+        run: |
+          find docker-images -name "*.tar" -exec docker load -i {} \;
+          find docker-images -name "*.tar" -exec rm {} \;
       - uses: actions/setup-java@v3
         with:
           java-version: ${{ matrix.java-version }}
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index d5287d4c40..11d4a9b00c 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -7,6 +7,7 @@
 
 * ElasticSearchClient: Add `deleteById` API.
 * Fix Custom alarm rules are overwritten by 'resource/alarm-settings.yml'
+* Support Kafka Monitoring.
 
 #### UI
 
diff --git a/docs/en/setup/backend/backend-kafka-monitoring.md 
b/docs/en/setup/backend/backend-kafka-monitoring.md
new file mode 100644
index 0000000000..3592033ac3
--- /dev/null
+++ b/docs/en/setup/backend/backend-kafka-monitoring.md
@@ -0,0 +1,84 @@
+# Kafka monitoring
+
+SkyWalking leverages Prometheus JMX Exporter to collect metrics data from the 
Kafka and leverages OpenTelemetry Collector to transfer the metrics to
+[OpenTelemetry receiver](opentelemetry-receiver.md) and into the [Meter 
System](./../../concepts-and-designs/meter.md).
+Kafka entity as a `Service` in OAP and on the `Layer: KAFKA`.
+
+## Data flow
+
+1. The `prometheus_JMX_Exporter` collect metrics data from Kafka. Note: 
Running the exporter as a Java agent.
+2. OpenTelemetry Collector fetches metrics from `prometheus_JMX_Exporter` via 
Prometheus Receiver and pushes metrics to SkyWalking OAP Server via 
OpenTelemetry gRPC exporter.
+3. The SkyWalking OAP Server parses the expression with 
[MAL](../../concepts-and-designs/mal.md) to
+   filter/calculate/aggregate and store the results.
+
+## Setup
+
+1. Setup 
[prometheus_JMX_Exporter](https://github.com/prometheus/jmx_exporter). This is 
an example for JMX Exporter configuration 
[kafka-2_0_0.yml](https://github.com/prometheus/jmx_exporter/blob/main/example_configs/kafka-2_0_0.yml).
+2. Set up [OpenTelemetry 
Collector](https://opentelemetry.io/docs/collector/getting-started/#kubernetes).
 The example
+   for OpenTelemetry Collector configuration, refer
+   to 
[here](../../../../test/e2e-v2/cases/rabbitmq/otel-collector-config.yaml).
+3. Config SkyWalking [OpenTelemetry receiver](opentelemetry-receiver.md).
+
+## Kafka Monitoring
+
+Kafka monitoring provides multidimensional metrics monitoring of Kafka cluster 
as `Layer: KAFKA` `Service` in
+the OAP. In each cluster, the kafka brokers are represented as `Instance`.
+
+### Kafka Cluster Supported Metrics
+
+| Monitoring Panel                    | Metric Name                            
         | Description                                                          
                                                       | Data Source            
   |
+|-------------------------------------|-------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|---------------------------|
+| Under-Replicated Partitions         | 
meter_kafka_under_replicated_partitions         | Number of under-replicated 
partitions in the broker. A higher number is a sign of potential issues.        
                 | Prometheus JMX Exporter   |
+| Offline Partitions Count            | meter_kafka_offline_partitions_count   
         | Number of partitions that are offline. Non-zero values indicate a 
problem.                                                  | Prometheus JMX 
Exporter   |
+| Partition Count                     | meter_kafka_partition_count            
         | Total number of partitions on the broker.                            
                                                       | Prometheus JMX 
Exporter   |
+| Leader Count                        | meter_kafka_leader_count               
         | Number of leader partitions on this broker.                          
                                                       | Prometheus JMX 
Exporter   |
+| Active Controller Count             | meter_kafka_active_controller_count    
         | The number of active controllers in the cluster. Typically should be 
1.                                                     | Prometheus JMX 
Exporter   |
+| Leader Election Rate                | meter_kafka_leader_election_rate       
         | The rate of leader elections per minute. High rate could be a sign 
of instability.                                          | Prometheus JMX 
Exporter   |
+| Unclean Leader Elections Per Second | 
meter_kafka_unclean_leader_elections_per_second | The rate of unclean leader 
elections per second. Non-zero values indicate a serious problem.               
                 | Prometheus JMX Exporter   |
+| Max Lag                             | meter_kafka_max_lag                    
         | The maximum lag between the leader and followers in terms of 
messages still needed to be sent. Higher lag indicates delays. | Prometheus JMX 
Exporter   |
+
+### Kafka Broker Supported Metrics
+
+| Monitoring Panel                  | Unit      | Metric Name                  
                         | Description                                          
         | Data Source                   |
+|-----------------------------------|-----------|-------------------------------------------------------|---------------------------------------------------------------|-------------------------------|
+| CPU Usage                         | %         | 
meter_kafka_broker_cpu_time_total                     | CPU usage in percentage 
                                      | Prometheus JMX Exporter       |
+| Memory Usage                      | %         | 
meter_kafka_broker_memory_usage_percentage            | JVM heap memory usage 
in percentage                           | Prometheus JMX Exporter       |
+| Incoming Messages                 | Msg/sec   | 
meter_kafka_broker_messages_per_second                | Rate of incoming 
messages                                     | Prometheus JMX Exporter       |
+| Bytes In                          | Bytes/sec | 
meter_kafka_broker_bytes_in_per_second                | Rate of incoming bytes  
                                      | Prometheus JMX Exporter       |
+| Bytes Out                         | Bytes/sec | 
meter_kafka_broker_bytes_out_per_second               | Rate of outgoing bytes  
                                      | Prometheus JMX Exporter       |
+| Replication Bytes In              | Bytes/sec | 
meter_kafka_broker_replication_bytes_in_per_second    | Rate of incoming bytes 
for replication                        | Prometheus JMX Exporter       |
+| Replication Bytes Out             | Bytes/sec | 
meter_kafka_broker_replication_bytes_out_per_second   | Rate of outgoing bytes 
for replication                        | Prometheus JMX Exporter       |
+| Under-Replicated Partitions       | Count     | 
meter_kafka_broker_under_replicated_partitions        | Number of 
under-replicated partitions                         | Prometheus JMX Exporter   
    |
+| Under Min ISR Partition Count     | Count     | 
meter_kafka_broker_under_min_isr_partition_count      | Number of partitions 
below the minimum ISR (In-Sync Replicas) | Prometheus JMX Exporter       |
+| Partition Count                   | Count     | 
meter_kafka_broker_partition_count                    | Total number of 
partitions                                    | Prometheus JMX Exporter       |
+| Leader Count                      | Count     | 
meter_kafka_broker_leader_count                       | Number of partitions 
for which this broker is the leader      | Prometheus JMX Exporter       |
+| ISR Shrinks                       | Count/sec | 
meter_kafka_broker_isr_shrinks_per_second             | Rate of ISR (In-Sync 
Replicas) shrinking                      | Prometheus JMX Exporter       |
+| ISR Expands                       | Count/sec | 
meter_kafka_broker_isr_expands_per_second             | Rate of ISR (In-Sync 
Replicas) expanding                      | Prometheus JMX Exporter       |
+| Max Lag                           | Count     | meter_kafka_broker_max_lag   
                         | Maximum lag between the leader and follower for a 
partition   | Prometheus JMX Exporter       |
+| Purgatory Size                    | Count     | 
meter_kafka_broker_purgatory_size                     | Size of purgatory for 
Produce and Fetch operations            | Prometheus JMX Exporter       |
+| Garbage Collector Count           | Count/sec | 
meter_kafka_broker_garbage_collector_count            | Rate of garbage 
collection cycles                             | Prometheus JMX Exporter       |
+| Requests Per Second               | Req/sec   | 
meter_kafka_broker_requests_per_second                | Rate of requests to the 
broker                                | Prometheus JMX Exporter       |
+| Request Queue Time                | ms        | 
meter_kafka_broker_request_queue_time_ms              | Average time a request 
spends in the request queue            | Prometheus JMX Exporter       |
+| Remote Time                       | ms        | 
meter_kafka_broker_remote_time_ms                     | Average time taken for 
a remote operation                     | Prometheus JMX Exporter       |
+| Response Queue Time               | ms        | 
meter_kafka_broker_response_queue_time_ms             | Average time a response 
spends in the response queue          | Prometheus JMX Exporter       |
+| Response Send Time                | ms        | 
meter_kafka_broker_response_send_time_ms              | Average time taken to 
send a response                         | Prometheus JMX Exporter       |
+| Network Processor Avg Idle        | %         | 
meter_kafka_broker_network_processor_avg_idle_percent | Percentage of idle time 
for the network processor             | Prometheus JMX Exporter       |
+| Topic Messages In Total           | Count     | 
meter_kafka_broker_topic_messages_in_total            | Total number of 
messages per topic                            | Prometheus JMX Exporter       |
+| Topic Bytes Out Per Second        | Bytes/sec | 
meter_kafka_broker_topic_bytesout_per_second          | Rate of outgoing bytes 
per topic                              | Prometheus JMX Exporter       |
+| Topic Bytes In Per Second         | Bytes/sec | 
meter_kafka_broker_topic_bytesin_per_second           | Rate of incoming bytes 
per topic                              | Prometheus JMX Exporter       |
+| Topic Fetch Requests Per Second   | Req/sec   | 
meter_kafka_broker_topic_fetch_requests_per_second    | Rate of fetch requests 
per topic                              | Prometheus JMX Exporter       |
+| Topic Produce Requests Per Second | Req/sec   | 
meter_kafka_broker_topic_produce_requests_per_second  | Rate of produce 
requests per topic                            | Prometheus JMX Exporter       |
+
+## Customizations
+
+You can customize your own metrics/expression/dashboard panel.
+The metrics definition and expression rules are found
+in `/config/otel-rules/kafka/kafka-cluster.yaml, 
/config/otel-rules/kafka/kafka-node.yaml`.
+The RabbitMQ dashboard panel configurations are found in 
`/config/ui-initialized-templates/kafka`.
+
+## Reference
+For more details on monitoring Kafka and the metrics to focus on, see the 
following articles:
+
+- [Monitoring Kafka Streams 
Applications](https://docs.confluent.io/platform/current/streams/monitoring.html)
+- [Kafka Monitoring](https://kafka.apache.org/documentation/#monitoring)
+
diff --git a/docs/menu.yml b/docs/menu.yml
index b74f238710..c1e176d6bc 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -261,6 +261,8 @@ catalog:
             catalog:
               - name: "RabbitMQ"
                 path: "/en/setup/backend/backend-rabbitmq-monitoring"
+              - name: "Kafka"
+                path: "/en/setup/backend/backend-kafka-monitoring"
           - name: "Self Observability"
             catalog:
               - name: "OAP self telemetry"
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java
index c41125279f..ab5e54d18f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java
@@ -188,7 +188,12 @@ public enum Layer {
     /**
      * MongoDB is a document database. It stores data in a type of JSON format 
called BSON.
      */
-    MONGODB(30, true);
+    MONGODB(30, true),
+
+    /**
+     * Kafka is a distributed streaming platform that is used publish and 
subscribe to streams of records.
+     */
+    KAFKA(31, true);
 
     private final int value;
     /**
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/management/ui/template/UITemplateInitializer.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/management/ui/template/UITemplateInitializer.java
index 45b165613c..0ae525b6a3 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/management/ui/template/UITemplateInitializer.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/management/ui/template/UITemplateInitializer.java
@@ -69,6 +69,7 @@ public class UITemplateInitializer {
         Layer.ELASTICSEARCH.name(),
         Layer.RABBITMQ.name(),
         Layer.MONGODB.name(),
+        Layer.KAFKA.name(),
         "custom"
     };
     private final UITemplateManagementService uiTemplateManagementService;
diff --git a/oap-server/server-starter/src/main/resources/application.yml 
b/oap-server/server-starter/src/main/resources/application.yml
index fd62d0355e..5629af155b 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -341,7 +341,7 @@ receiver-otel:
   selector: ${SW_OTEL_RECEIVER:default}
   default:
     enabledHandlers: 
${SW_OTEL_RECEIVER_ENABLED_HANDLERS:"otlp-metrics,otlp-logs"}
-    enabledOtelMetricsRules: 
${SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES:"apisix,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*"}
+    enabledOtelMetricsRules: 
${SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES:"apisix,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*,kafka/*"}
 
 receiver-zipkin:
   selector: ${SW_RECEIVER_ZIPKIN:-}
diff --git 
a/oap-server/server-starter/src/main/resources/otel-rules/kafka/kafka-broker.yaml
 
b/oap-server/server-starter/src/main/resources/otel-rules/kafka/kafka-broker.yaml
new file mode 100644
index 0000000000..d274366918
--- /dev/null
+++ 
b/oap-server/server-starter/src/main/resources/otel-rules/kafka/kafka-broker.yaml
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This will parse a textual representation of a duration. The formats
+# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS}
+# with days considered to be exactly 24 hours.
+# <p>
+# Examples:
+# <pre>
+#    "PT20.345S" -- parses as "20.345 seconds"
+#    "PT15M"     -- parses as "15 minutes" (where a minute is 60 seconds)
+#    "PT10H"     -- parses as "10 hours" (where an hour is 3600 seconds)
+#    "P2D"       -- parses as "2 days" (where a day is 24 hours or 86400 
seconds)
+#    "P2DT3H4M"  -- parses as "2 days, 3 hours and 4 minutes"
+#    "P-6H3M"    -- parses as "-6 hours and +3 minutes"
+#    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
+#    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
+# </pre>
+filter: "{ tags -> tags.job_name == 'kafka-monitoring' }" # The OpenTelemetry 
job name
+expSuffix: tag({tags -> tags.cluster = 'kafka::' + 
tags.cluster}).instance(['cluster'], ['broker'], Layer.KAFKA)
+metricPrefix: meter_kafka_broker
+metricsRules:
+
+  - name: cpu_time_total
+    exp: (process_cpu_seconds_total * 100).sum(['cluster', 
'broker']).rate('PT1M')
+
+  - name: memory_usage_percentage
+    exp: (jvm_memory_bytes_used * 100).tagMatch("area", 
"heap").sum(['cluster', 'broker', 'area']) / 
(jvm_memory_bytes_max).tagMatch("area", "heap").sum(['cluster', 'broker', 
'area'])
+
+  - name: messages_per_second
+    exp: kafka_server_brokertopicmetrics_messagesin_total.sum(['cluster', 
'broker']).rate('PT1M')
+
+  - name: bytes_in_per_second
+    exp: kafka_server_brokertopicmetrics_bytesin_total.sum(['cluster', 
'broker']).rate('PT1M')
+
+  - name: bytes_out_per_second
+    exp: kafka_server_brokertopicmetrics_bytesout_total.sum(['cluster', 
'broker']).rate('PT1M')
+
+  - name: replication_bytes_in_per_second
+    exp: 
kafka_server_brokertopicmetrics_replicationbytesin_total.sum(['cluster', 
'broker']).rate('PT1M')
+
+  - name: replication_bytes_out_per_second
+    exp: 
kafka_server_brokertopicmetrics_replicationbytesout_total.sum(['cluster', 
'broker']).rate('PT1M')
+
+  - name: under_replicated_partitions
+    exp: kafka_server_replicamanager_underreplicatedpartitions.sum(['cluster', 
'broker'])
+
+  - name: under_min_isr_partition_count
+    exp: kafka_server_replicamanager_underminisrpartitioncount.sum(['cluster', 
'broker'])
+
+  - name: partition_count
+    exp: kafka_server_replicamanager_partitioncount.sum(['cluster', 'broker'])
+
+  - name: leader_count
+    exp: kafka_server_replicamanager_leadercount.sum(['cluster', 'broker'])
+
+  - name: isr_shrinks_per_second
+    exp: kafka_server_replicamanager_isrshrinks_total.sum(['cluster', 
'broker']).rate('PT1M')
+    
+  - name: isr_expands_per_second
+    exp: kafka_server_replicamanager_isrexpands_total.sum(['cluster', 
'broker']).rate('PT1M')
+
+  - name: max_lag
+    exp: kafka_server_replicafetchermanager_maxlag.sum(['cluster', 'broker'])
+
+  - name: purgatory_size
+    exp: 
kafka_server_delayedoperationpurgatory_purgatorysize.tagMatch("delayedOperation",
 "Produce|Fetch").sum(['cluster', 'broker','delayedOperation'])
+
+  - name: garbage_collector_count
+    exp: jvm_gc_collection_seconds_count.tagMatch("gc", "G1 Young 
Generation|G1 Old Generation").sum(['cluster', 'broker','gc']).rate('PT1M')
+
+  - name: requests_per_second
+    exp: kafka_network_requestmetrics_requests_total.tagMatch("request", 
"FetchConsumer|Produce|Fetch").sum(['cluster','broker','request']).rate('PT1M')
+
+  - name: request_queue_time_ms
+    exp: 
kafka_network_requestmetrics_requestqueuetimems_count.tagMatch("request", 
"Produce|FetchConsumer|FetchFollower").sum(['cluster','broker','request']).rate('PT1M')
+
+  - name: remote_time_ms
+    exp: kafka_network_requestmetrics_remotetimems_count.tagMatch("request", 
"Produce|FetchConsumer|FetchFollower").sum(['cluster','broker','request']).rate('PT1M')
+
+  - name: response_queue_time_ms
+    exp: 
kafka_network_requestmetrics_responsequeuetimems_count.tagMatch("request", 
"Produce|FetchConsumer|FetchFollower").sum(['cluster','broker','request']).rate('PT1M')
+
+  - name: response_send_time_ms
+    exp: 
kafka_network_requestmetrics_responsesendtimems_count.tagMatch("request", 
"Produce|FetchConsumer|FetchFollower").sum(['cluster','broker','request']).rate('PT1M')
+
+  - name: network_processor_avg_idle_percent
+    exp: (kafka_network_socketserver_networkprocessoravgidlepercent * 
100).sum(['cluster','broker']).rate('PT1M')
+
+  - name: topic_messages_in_total
+    exp: 
kafka_server_brokertopicmetrics_messagesin_total.sum(['cluster','broker','topic'])
+
+  - name: topic_bytesout_per_second
+    exp: 
kafka_server_brokertopicmetrics_bytesout_total.sum(['cluster','broker','topic']).rate('PT1M')
+
+  - name: topic_bytesin_per_second
+    exp: 
kafka_server_brokertopicmetrics_bytesout_total.sum(['cluster','broker','topic']).rate('PT1M')
+
+  - name: topic_fetch_requests_per_second
+    exp: 
kafka_server_brokertopicmetrics_totalfetchrequests_total.sum(['cluster','broker','topic']).rate('PT1M')
+
+  - name: topic_produce_requests_per_second
+    exp: 
kafka_server_brokertopicmetrics_totalproducerequests_total.sum(['cluster','broker','topic']).rate('PT1M')
+
+
+
diff --git 
a/oap-server/server-starter/src/main/resources/otel-rules/kafka/kafka-cluster.yaml
 
b/oap-server/server-starter/src/main/resources/otel-rules/kafka/kafka-cluster.yaml
new file mode 100644
index 0000000000..7a9bac0b24
--- /dev/null
+++ 
b/oap-server/server-starter/src/main/resources/otel-rules/kafka/kafka-cluster.yaml
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This will parse a textual representation of a duration. The formats
+# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS}
+# with days considered to be exactly 24 hours.
+# <p>
+# Examples:
+# <pre>
+#    "PT20.345S" -- parses as "20.345 seconds"
+#    "PT15M"     -- parses as "15 minutes" (where a minute is 60 seconds)
+#    "PT10H"     -- parses as "10 hours" (where an hour is 3600 seconds)
+#    "P2D"       -- parses as "2 days" (where a day is 24 hours or 86400 
seconds)
+#    "P2DT3H4M"  -- parses as "2 days, 3 hours and 4 minutes"
+#    "P-6H3M"    -- parses as "-6 hours and +3 minutes"
+#    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
+#    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
+# </pre>
+filter: "{ tags -> tags.job_name == 'kafka-monitoring' }" # The OpenTelemetry 
job name
+expSuffix: tag({tags -> tags.cluster = 'kafka::' + 
tags.cluster}).service(['cluster'], Layer.KAFKA)
+metricPrefix: meter_kafka
+metricsRules:
+
+  - name: under_replicated_partitions
+    exp: 
kafka_server_replicamanager_underreplicatedpartitions.sum(['cluster','broker'])
+
+  - name: offline_partitions_count
+    exp: 
kafka_controller_kafkacontroller_offlinepartitionscount.sum(['cluster','broker'])
+
+  - name: partition_count
+    exp: kafka_server_replicamanager_partitioncount.sum(['cluster', 'broker'])
+
+  - name: leader_count
+    exp: kafka_server_replicamanager_leadercount.sum(['cluster', 'broker'])
+
+  - name: active_controller_count
+    exp: 
kafka_controller_kafkacontroller_activecontrollercount.sum(['cluster', 
'broker'])
+
+  - name: leader_election_rate
+    exp: 
kafka_controller_controllerstats_leaderelectionrateandtimems_count.sum(['cluster',
 'broker']).rate('PT1M')
+
+  - name: unclean_leader_elections_per_second
+    exp: 
kafka_controller_controllerstats_uncleanleaderelections_total.sum(['cluster', 
'broker']).rate('PT1M')
+
+  - name: max_lag
+    exp: kafka_server_replicafetchermanager_maxlag.sum(['cluster', 'broker'])
diff --git 
a/oap-server/server-starter/src/main/resources/ui-initialized-templates/kafka/kafka-broker.json
 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/kafka/kafka-broker.json
new file mode 100644
index 0000000000..845114ab38
--- /dev/null
+++ 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/kafka/kafka-broker.json
@@ -0,0 +1,598 @@
+[{
+       "id": "Kafka-Broker",
+       "configuration": {
+               "children": [{
+                       "x": 0,
+                       "y": 5,
+                       "w": 6,
+                       "h": 10,
+                       "i": "1",
+                       "type": "Widget",
+                       "widget": {
+                               "title": "CPU Usage (%)",
+                               "tips": "CPU usage in percentage ",
+                               "name": "cpu_time_total"
+                       },
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "expressions": ["meter_kafka_broker_cpu_time_total"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 0,
+                       "y": 47,
+                       "w": 6,
+                       "h": 10,
+                       "i": "19",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Incoming Messages (Msg/sec)",
+                               "tips": "Rate of incoming messages",
+                               "name": "messages_per_second"
+                       },
+                       "expressions": 
["meter_kafka_broker_messages_per_second"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 0,
+                       "y": 27,
+                       "w": 6,
+                       "h": 10,
+                       "i": "20",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "name": "bytes_in_per_second",
+                               "title": "Bytes In (Bytes/sec)",
+                               "tips": "Rate of incoming bytes"
+                       },
+                       "expressions": 
["meter_kafka_broker_bytes_in_per_second"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 0,
+                       "y": 37,
+                       "w": 6,
+                       "h": 10,
+                       "i": "21",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Bytes Out (Bytes/sec)",
+                               "tips": "Rate of outgoing bytes",
+                               "name": "bytes_out_per_second"
+                       },
+                       "expressions": 
["meter_kafka_broker_bytes_out_per_second"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 15,
+                       "y": 0,
+                       "w": 5,
+                       "h": 5,
+                       "i": "22",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Card",
+                               "fontSize": 14,
+                               "textAlign": "center",
+                               "showUnit": true
+                       },
+                       "widget": {
+                               "title": "Under-Replicated Partitions",
+                               "tips": "Number of under-replicated partitions",
+                               "name": "under_replicated_partitions"
+                       },
+                       "expressions": 
["latest(meter_kafka_broker_under_replicated_partitions)"],
+                       "typesOfMQE": ["SINGLE_VALUE"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 10,
+                       "y": 0,
+                       "w": 5,
+                       "h": 5,
+                       "i": "23",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Card",
+                               "fontSize": 14,
+                               "textAlign": "center",
+                               "showUnit": true
+                       },
+                       "widget": {
+                               "title": "Under Min ISR Partition Count",
+                               "tips": "Number of partitions below the minimum 
ISR (In-Sync Replicas)",
+                               "name": "under_min_isr_partition_count"
+                       },
+                       "expressions": 
["latest(meter_kafka_broker_under_min_isr_partition_count)"],
+                       "typesOfMQE": ["SINGLE_VALUE"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 18,
+                       "y": 5,
+                       "w": 6,
+                       "h": 10,
+                       "i": "24",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Partition Count",
+                               "tips": "Total number of partitions",
+                               "name": "partition_count"
+                       },
+                       "expressions": ["meter_kafka_broker_partition_count"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 6,
+                       "y": 37,
+                       "w": 6,
+                       "h": 10,
+                       "i": "25",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Replication Bytes Out (Bytes/sec)",
+                               "tips": "Rate of outgoing bytes for 
replication",
+                               "name": "replication_bytes_out_per_second"
+                       },
+                       "expressions": 
["meter_kafka_broker_replication_bytes_out_per_second"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 6,
+                       "y": 27,
+                       "w": 6,
+                       "h": 10,
+                       "i": "26",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Replication Bytes In (Bytes/sec)",
+                               "tips": "Rate of incoming bytes for 
replication",
+                               "name": "replication_bytes_in_per_second"
+                       },
+                       "expressions": 
["meter_kafka_broker_replication_bytes_in_per_second"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 6,
+                       "y": 47,
+                       "w": 6,
+                       "h": 10,
+                       "i": "27",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Leader Count",
+                               "tips": "Number of partitions for which this 
broker is the leader",
+                               "name": "leader_count"
+                       },
+                       "expressions": ["meter_kafka_broker_leader_count"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 20,
+                       "y": 0,
+                       "w": 4,
+                       "h": 5,
+                       "i": "28",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Card",
+                               "fontSize": 14,
+                               "textAlign": "center",
+                               "showUnit": true
+                       },
+                       "widget": {
+                               "title": "Max Lag",
+                               "tips": "Maximum lag between the leader and 
follower for a partition",
+                               "name": "max_lag"
+                       },
+                       "expressions": ["sum(meter_kafka_broker_max_lag)"],
+                       "typesOfMQE": ["SINGLE_VALUE"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 0,
+                       "y": 0,
+                       "w": 5,
+                       "h": 5,
+                       "i": "29",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Card",
+                               "fontSize": 14,
+                               "textAlign": "center",
+                               "showUnit": true
+                       },
+                       "widget": {
+                               "title": "ISR Shrinks",
+                               "tips": "Rate of ISR (In-Sync Replicas) 
shrinking ",
+                               "name": "isr_shrinks_per_second"
+                       },
+                       "expressions": 
["latest(meter_kafka_broker_isr_shrinks_per_second)"],
+                       "typesOfMQE": ["SINGLE_VALUE"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 5,
+                       "y": 0,
+                       "w": 5,
+                       "h": 5,
+                       "i": "30",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Card",
+                               "fontSize": 14,
+                               "textAlign": "center",
+                               "showUnit": true
+                       },
+                       "widget": {
+                               "title": "ISR Expands",
+                               "tips": "Rate of ISR (In-Sync Replicas) 
expanding",
+                               "name": "isr_expands_per_second"
+                       },
+                       "expressions": 
["latest(meter_kafka_broker_isr_expands_per_second)"],
+                       "typesOfMQE": ["SINGLE_VALUE"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 12,
+                       "y": 47,
+                       "w": 6,
+                       "h": 10,
+                       "i": "31",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Purgatory Size",
+                               "tips": "Size of purgatory for Produce and 
Fetch operations",
+                               "name": "purgatory_size"
+                       },
+                       "expressions": ["meter_kafka_broker_purgatory_size"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 12,
+                       "y": 5,
+                       "w": 6,
+                       "h": 10,
+                       "i": "33",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "garbage_collector_count",
+                               "tips": "garbage collection cycles",
+                               "name": "garbage_collector_count"
+                       },
+                       "expressions": 
["meter_kafka_broker_garbage_collector_count"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 12,
+                       "y": 27,
+                       "w": 6,
+                       "h": 10,
+                       "i": "34",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Request Queue Time (ms)",
+                               "tips": "Average time a request spends in the 
request queue",
+                               "name": "request_queue_time_ms"
+                       },
+                       "expressions": 
["meter_kafka_broker_request_queue_time_ms"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 18,
+                       "y": 47,
+                       "w": 6,
+                       "h": 10,
+                       "i": "35",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Remote Time (ms)",
+                               "tips": "Average time taken for a remote 
operation",
+                               "name": "remote_time_ms"
+                       },
+                       "expressions": ["meter_kafka_broker_remote_time_ms"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 12,
+                       "y": 37,
+                       "w": 6,
+                       "h": 10,
+                       "i": "36",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Response Queue Time (ms)",
+                               "tips": "Average time a response spends in the 
response queue",
+                               "name": "response_queue_time_ms"
+                       },
+                       "expressions": 
["meter_kafka_broker_response_queue_time_ms"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 18,
+                       "y": 37,
+                       "w": 6,
+                       "h": 10,
+                       "i": "37",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Response Send Time (ms)",
+                               "tips": "Average time taken to send a response",
+                               "name": "response_send_time_ms"
+                       },
+                       "expressions": 
["meter_kafka_broker_response_send_time_ms"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 18,
+                       "y": 27,
+                       "w": 6,
+                       "h": 10,
+                       "i": "38",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "title": "Requests Per Second (Req/sec)",
+                               "tips": "Rate of requests to the broker",
+                               "name": "requests_per_second"
+                       },
+                       "expressions": 
["meter_kafka_broker_requests_per_second"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                       "metricMode": "Expression"
+               }, {
+                       "x": 6,
+                       "y": 5,
+                       "w": 6,
+                       "h": 10,
+                       "i": "39",
+                       "type": "Widget",
+                       "metricMode": "Expression",
+                       "expressions": 
["meter_kafka_broker_memory_usage_percentage"],
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "name": "memory_usage_percentage",
+                               "title": "memory_usage_percentage (%)",
+                               "tips": "JVM heap memory usage in percentage"
+                       },
+                       "typesOfMQE": ["TIME_SERIES_VALUES"]
+               }, {
+                       "x": 19,
+                       "y": 15,
+                       "w": 5,
+                       "h": 12,
+                       "i": "40",
+                       "type": "Widget",
+                       "metricMode": "Expression",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "name": "topic_produce_requests_per_second",
+                               "title": "Topic Produce Requests Per Second 
(Req/sec)",
+                               "tips": "Rate of produce requests per topic"
+                       },
+                       "expressions": 
["meter_kafka_broker_topic_produce_requests_per_second"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"]
+               }, {
+                       "x": 15,
+                       "y": 15,
+                       "w": 4,
+                       "h": 12,
+                       "i": "41",
+                       "type": "Widget",
+                       "metricMode": "Expression",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "name": "topic_fetch_requests_per_second",
+                               "title": "Topic Fetch Requests Per Second 
(Req/sec)",
+                               "tips": "Rate of fetch requests per topic"
+                       },
+                       "expressions": 
["meter_kafka_broker_topic_fetch_requests_per_second"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"]
+               }, {
+                       "x": 5,
+                       "y": 15,
+                       "w": 5,
+                       "h": 12,
+                       "i": "42",
+                       "type": "Widget",
+                       "metricMode": "Expression",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "name": "topic_bytesin_per_second",
+                               "title": "Topic Bytes In Per Second 
(Bytes/sec)",
+                               "tips": "Rate of incoming bytes per topic"
+                       },
+                       "expressions": 
["meter_kafka_broker_topic_bytesin_per_second"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"]
+               }, {
+                       "x": 10,
+                       "y": 15,
+                       "w": 5,
+                       "h": 12,
+                       "i": "43",
+                       "type": "Widget",
+                       "metricMode": "Expression",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "name": "topic_bytesout_per_second",
+                               "title": "Topic Bytes Out Per Second 
(Bytes/sec)",
+                               "tips": "Rate of outgoing bytes per topic"
+                       },
+                       "expressions": 
["meter_kafka_broker_topic_bytesout_per_second"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"]
+               }, {
+                       "x": 0,
+                       "y": 15,
+                       "w": 5,
+                       "h": 12,
+                       "i": "44",
+                       "type": "Widget",
+                       "metricMode": "Expression",
+                       "graph": {
+                               "type": "Line",
+                               "step": false,
+                               "smooth": false,
+                               "showSymbol": true,
+                               "showXAxis": true,
+                               "showYAxis": true
+                       },
+                       "widget": {
+                               "name": "topic_messages_in_total",
+                               "title": "Topic Messages In Total",
+                               "tips": "Total number of messages per topic"
+                       },
+                       "expressions": 
["meter_kafka_broker_topic_messages_in_total"],
+                       "typesOfMQE": ["TIME_SERIES_VALUES"]
+               }],
+               "layer": "KAFKA",
+               "entity": "ServiceInstance",
+               "name": "Kafka-Broker",
+               "id": "Kafka-Broker"
+       }
+}]
\ No newline at end of file
diff --git 
a/oap-server/server-starter/src/main/resources/ui-initialized-templates/kafka/kafka-cluster.json
 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/kafka/kafka-cluster.json
new file mode 100644
index 0000000000..466b711436
--- /dev/null
+++ 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/kafka/kafka-cluster.json
@@ -0,0 +1,224 @@
+[{
+       "id": "Kafka-Cluster",
+       "configuration": {
+               "children": [{
+                       "x": 0,
+                       "y": 0,
+                       "w": 24,
+                       "h": 59,
+                       "i": "16",
+                       "type": "Tab",
+                       "children": [{
+                               "name": "Overview",
+                               "children": [{
+                                       "x": 16,
+                                       "y": 0,
+                                       "w": 8,
+                                       "h": 10,
+                                       "i": "13",
+                                       "type": "Widget",
+                                       "graph": {
+                                               "type": "Line",
+                                               "step": false,
+                                               "smooth": false,
+                                               "showSymbol": true,
+                                               "showXAxis": true,
+                                               "showYAxis": true
+                                       },
+                                       "widget": {
+                                               "title": "Partition Count",
+                                               "tips": "Number of leader 
partitions on this broker.",
+                                               "name": "partition_count"
+                                       },
+                                       "relatedTrace": {
+                                               "enableRelate": false
+                                       },
+                                       "expressions": 
["meter_kafka_partition_count"],
+                                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                                       "metricMode": "Expression"
+                               }, {
+                                       "x": 16,
+                                       "y": 10,
+                                       "w": 8,
+                                       "h": 10,
+                                       "i": "14",
+                                       "type": "Widget",
+                                       "graph": {
+                                               "type": "Line",
+                                               "step": false,
+                                               "smooth": false,
+                                               "showSymbol": true,
+                                               "showXAxis": true,
+                                               "showYAxis": true
+                                       },
+                                       "widget": {
+                                               "title": "Leader Count",
+                                               "tips": "Number of leader 
partitions on this broker. ",
+                                               "name": "leader_count"
+                                       },
+                                       "expressions": 
["meter_kafka_leader_count"],
+                                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                                       "metricMode": "Expression"
+                               }, {
+                                       "x": 8,
+                                       "y": 20,
+                                       "w": 8,
+                                       "h": 9,
+                                       "i": "15",
+                                       "type": "Widget",
+                                       "graph": {
+                                               "type": "Line",
+                                               "step": false,
+                                               "smooth": false,
+                                               "showSymbol": true,
+                                               "showXAxis": true,
+                                               "showYAxis": true
+                                       },
+                                       "widget": {
+                                               "title": "Active Controller 
Count",
+                                               "tips": "The number of active 
controllers in the cluster. Typically should be 1. ",
+                                               "name": 
"active_controller_count"
+                                       },
+                                       "expressions": 
["meter_kafka_active_controller_count"],
+                                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                                       "metricMode": "Expression"
+                               }, {
+                                       "x": 8,
+                                       "y": 10,
+                                       "w": 8,
+                                       "h": 10,
+                                       "i": "18",
+                                       "type": "Widget",
+                                       "graph": {
+                                               "type": "Line",
+                                               "step": false,
+                                               "smooth": false,
+                                               "showSymbol": true,
+                                               "showXAxis": true,
+                                               "showYAxis": true
+                                       },
+                                       "widget": {
+                                               "title": "Max Lag",
+                                               "tips": "The maximum lag 
between the leader and followers in terms of messages still needed to be sent. 
Higher lag indicates delays.",
+                                               "name": "max_lag"
+                                       },
+                                       "expressions": ["meter_kafka_max_lag"],
+                                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                                       "metricMode": "Expression"
+                               }, {
+                                       "x": 0,
+                                       "y": 0,
+                                       "w": 8,
+                                       "h": 10,
+                                       "i": "19",
+                                       "type": "Widget",
+                                       "metricMode": "Expression",
+                                       "graph": {
+                                               "type": "Line",
+                                               "step": false,
+                                               "smooth": false,
+                                               "showSymbol": true,
+                                               "showXAxis": true,
+                                               "showYAxis": true
+                                       },
+                                       "expressions": 
["meter_kafka_under_replicated_partitions"],
+                                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                                       "widget": {
+                                               "name": 
"under_replicated_partitions",
+                                               "title": "Under Replicated 
Partitions",
+                                               "tips": "Number of 
under-replicated partitions in the broker. A higher number is a sign of 
potential issues. "
+                                       }
+                               }, {
+                                       "x": 8,
+                                       "y": 0,
+                                       "w": 8,
+                                       "h": 10,
+                                       "i": "20",
+                                       "type": "Widget",
+                                       "metricMode": "Expression",
+                                       "graph": {
+                                               "type": "Line",
+                                               "step": false,
+                                               "smooth": false,
+                                               "showSymbol": true,
+                                               "showXAxis": true,
+                                               "showYAxis": true
+                                       },
+                                       "widget": {
+                                               "name": 
"offline_partitions_count",
+                                               "title": "Offline Partitions 
Count",
+                                               "tips": "Number of partitions 
that are offline. Non-zero values indicate a problem."
+                                       },
+                                       "expressions": 
["meter_kafka_offline_partitions_count"],
+                                       "typesOfMQE": ["TIME_SERIES_VALUES"]
+                               }, {
+                                       "x": 0,
+                                       "y": 10,
+                                       "w": 8,
+                                       "h": 10,
+                                       "i": "21",
+                                       "type": "Widget",
+                                       "metricMode": "Expression",
+                                       "expressions": 
["meter_kafka_unclean_leader_elections_per_second"],
+                                       "typesOfMQE": ["TIME_SERIES_VALUES"],
+                                       "graph": {
+                                               "type": "Line",
+                                               "step": false,
+                                               "smooth": false,
+                                               "showSymbol": true,
+                                               "showXAxis": true,
+                                               "showYAxis": true
+                                       },
+                                       "widget": {
+                                               "name": 
"unclean_leader_elections_per_second",
+                                               "title": "Unclean Leader 
Elections Per Second",
+                                               "tips": "The rate of unclean 
leader elections per second. Non-zero values indicate a serious problem. "
+                                       }
+                               }, {
+                                       "x": 0,
+                                       "y": 20,
+                                       "w": 8,
+                                       "h": 9,
+                                       "i": "22",
+                                       "type": "Widget",
+                                       "metricMode": "Expression",
+                                       "graph": {
+                                               "type": "Line",
+                                               "step": false,
+                                               "smooth": false,
+                                               "showSymbol": true,
+                                               "showXAxis": true,
+                                               "showYAxis": true
+                                       },
+                                       "widget": {
+                                               "name": "leader_election_rate",
+                                               "title": "Leader Election Rate",
+                                               "tips": "The rate of unclean 
leader elections per second. Non-zero values indicate a serious problem."
+                                       },
+                                       "expressions": 
["meter_kafka_leader_election_rate"],
+                                       "typesOfMQE": ["TIME_SERIES_VALUES"]
+                               }]
+                       }, {
+                               "name": "Broker",
+                               "children": [{
+                                       "x": 0,
+                                       "y": 0,
+                                       "w": 24,
+                                       "h": 48,
+                                       "i": "0",
+                                       "type": "Widget",
+                                       "graph": {
+                                               "type": "InstanceList",
+                                               "dashboardName": "Kafka-Broker",
+                                               "fontSize": 12
+                                       }
+                               }]
+                       }]
+               }],
+               "layer": "KAFKA",
+               "entity": "Service",
+               "name": "Kafka-Cluster",
+               "id": "Kafka-Cluster",
+               "isRoot": false
+       }
+}]
\ No newline at end of file
diff --git 
a/oap-server/server-starter/src/main/resources/ui-initialized-templates/kafka/kafka-root.json
 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/kafka/kafka-root.json
new file mode 100644
index 0000000000..680d257ef6
--- /dev/null
+++ 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/kafka/kafka-root.json
@@ -0,0 +1,42 @@
+[{
+       "id": "Kafka-Root",
+       "configuration": {
+               "children": [{
+                       "x": 0,
+                       "y": 3,
+                       "w": 24,
+                       "h": 29,
+                       "i": "0",
+                       "type": "Widget",
+                       "graph": {
+                               "type": "ServiceList",
+                               "dashboardName": "Kafka-Cluster",
+                               "fontSize": 12,
+                               "showXAxis": false,
+                               "showYAxis": false,
+                               "showGroup": true
+                       }
+               }, {
+                       "x": 0,
+                       "y": 0,
+                       "w": 24,
+                       "h": 3,
+                       "i": "1",
+                       "type": "Text",
+                       "graph": {
+                               "fontColor": "blue",
+                               "backgroundColor": "white",
+                               "content": "Provide Kafka monitoring through 
OpenTelemetry's Prometheus Receiver",
+                               "fontSize": 14,
+                               "textAlign": "left",
+                               "url": 
"https://skywalking.apache.org/docs/main/next/en/setup/backend/backend-kafka-monitoring/";
+                       }
+               }],
+               "id": "Kafka-Root",
+               "layer": "KAFKA",
+               "entity": "All",
+               "name": "Kafka-Root",
+               "isRoot": true,
+               "path": "/Message-Queue/Kafka"
+       }
+}]
\ No newline at end of file
diff --git 
a/oap-server/server-starter/src/main/resources/ui-initialized-templates/menu.yaml
 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/menu.yaml
index 0ba044653d..673df5d122 100644
--- 
a/oap-server/server-starter/src/main/resources/ui-initialized-templates/menu.yaml
+++ 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/menu.yaml
@@ -198,6 +198,11 @@ menus:
         description: Provide RabbitMQ monitoring through OpenTelemetry's 
Prometheus Receiver.
         documentLink: 
https://skywalking.apache.org/docs/main/next/en/setup/backend/backend-rabbitmq-monitoring/
         i18nKey: mq_rabbitmq
+      - title: Kafka
+        layer: KAFKA
+        description: Provide Kafka monitoring through OpenTelemetry's 
Prometheus Receiver.
+        documentLink: 
https://skywalking.apache.org/docs/main/next/en/setup/backend/backend-kafka-monitoring/
+        i18nKey: mq_kafka
   - title: Self Observability
     icon: self_observability
     description: Self Observability provides the observabilities for running 
components and servers from the SkyWalking ecosystem.
diff --git a/test/e2e-v2/cases/kafka/kafka-monitoring/Dockerfile 
b/test/e2e-v2/cases/kafka/kafka-monitoring/Dockerfile
new file mode 100644
index 0000000000..8c5a1906cb
--- /dev/null
+++ b/test/e2e-v2/cases/kafka/kafka-monitoring/Dockerfile
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Use the Bitnami Kafka image as the base image
+FROM bitnami/kafka:latest
+
+# Download the JMX Prometheus Java Agent and config file
+ADD 
https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.18.0/jmx_prometheus_javaagent-0.18.0.jar
 /etc/jmx_prometheus_javaagent-0.18.0.jar
+ADD 
https://raw.githubusercontent.com/prometheus/jmx_exporter/main/example_configs/kafka-2_0_0.yml
 /etc/kafka-2_0_0.yml
+
+# Set the necessary permissions for the jar and yml
+USER root
+RUN chmod 644 /etc/jmx_prometheus_javaagent-0.18.0.jar
+RUN chmod 644 /etc/kafka-2_0_0.yml
diff --git a/test/e2e-v2/cases/kafka/kafka-monitoring/docker-compose.yml 
b/test/e2e-v2/cases/kafka/kafka-monitoring/docker-compose.yml
new file mode 100644
index 0000000000..d0c672b015
--- /dev/null
+++ b/test/e2e-v2/cases/kafka/kafka-monitoring/docker-compose.yml
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3'
+
+services:
+  oap:
+    extends:
+      file: ../../../script/docker-compose/base-compose.yml
+      service: oap
+    ports:
+      - "12800:12800"
+    networks:
+      - e2e
+
+  zookeeper:
+    image: zookeeper:latest
+    expose:
+      - 2181
+    networks:
+      - e2e
+    environment:
+      - ALLOW_ANONYMOUS_LOGIN=yes
+    healthcheck:
+      test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 2181"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+
+  broker1:
+    build:
+      context: .
+    expose:
+      - 9092
+      - 7071
+    ports:
+      - '9092:9092'
+      - '7071:7071'
+    networks:
+      - e2e
+    environment:
+      - KAFKA_ENABLE_KRAFT=no
+      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
+      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
+      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://broker1:9092
+      - ALLOW_PLAINTEXT_LISTENER=yes
+      - 
KAFKA_OPTS=-javaagent:/etc/jmx_prometheus_javaagent-0.18.0.jar=7071:/etc/kafka-2_0_0.yml
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+    healthcheck:
+       test: ["CMD", "kafka-topics.sh", "--list", "--zookeeper", 
"zookeeper:2181"]
+       interval: 5s
+       timeout: 60s
+       retries: 120
+    
+  broker2:
+    build:
+      context: .
+    expose:
+      - 9093
+      - 7072
+    ports:
+      - '9093:9093'
+      - '7072:7072'
+    networks:
+      - e2e
+    environment:
+      - KAFKA_ENABLE_KRAFT=no
+      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
+      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9093
+      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://broker2:9093
+      - ALLOW_PLAINTEXT_LISTENER=yes
+      - 
KAFKA_OPTS=-javaagent:/etc/jmx_prometheus_javaagent-0.18.0.jar=7072:/etc/kafka-2_0_0.yml
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+    healthcheck:
+      test: [ "CMD", "kafka-topics.sh", "--list", "--zookeeper", 
"zookeeper:2181" ]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+
+  kafka-producer-perf-test:
+    image: bitnami/kafka:latest
+    networks:
+      - e2e
+    depends_on:
+      - broker1
+      - broker2
+    command:
+      - bash
+      - -c
+      - >
+        kafka-producer-perf-test.sh --topic perftest --num-records 100000 
--record-size 100 --throughput 500 --producer-props 
bootstrap.servers=broker1:9092,broker2:9093
+
+  kafka-consumer-perf-test:
+    image: bitnami/kafka:latest
+    networks:
+      - e2e
+    depends_on:
+      - kafka-producer-perf-test
+    command:
+      - bash
+      - -c
+      - >
+        kafka-consumer-perf-test.sh --bootstrap-server 
broker1:9092,broker2:9093 --topic perftest --messages 100000
+
+  otel-collector:
+    image: otel/opentelemetry-collector:0.50.0
+    networks:
+      - e2e
+    command: [ "--config=/etc/otel-collector-config.yaml" ]
+    volumes:
+      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
+    expose:
+      - 55678
+    depends_on:
+      - broker1
+      - broker2
+
+networks:
+  e2e:
diff --git a/test/e2e-v2/cases/kafka/kafka-monitoring/e2e.yaml 
b/test/e2e-v2/cases/kafka/kafka-monitoring/e2e.yaml
new file mode 100644
index 0000000000..7b622598ad
--- /dev/null
+++ b/test/e2e-v2/cases/kafka/kafka-monitoring/e2e.yaml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used 
to test.
+
+setup:
+  env: compose
+  file: docker-compose.yml
+  timeout: 20m
+  init-system-environment: ../../../script/env
+  steps:
+    - name: set PATH
+      command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
+    - name: install yq
+      command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+    - name: install swctl
+      command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+verify:
+  retry:
+    count: 20
+    interval: 3s
+  cases:
+    - includes:
+        - ./kafka-cases.yaml
diff --git a/test/e2e-v2/cases/kafka/kafka-monitoring/expected/instance.yml 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/instance.yml
new file mode 100644
index 0000000000..ef669d2d86
--- /dev/null
+++ b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/instance.yml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+  {{- contains . }}
+- id: {{ notEmpty .id }}
+  name: broker1:7071
+  instanceuuid: {{ notEmpty .instanceuuid }}
+  attributes: []
+  language: UNKNOWN
+- id: {{ notEmpty .id }}
+  name: broker2:7072
+  instanceuuid: {{ notEmpty .instanceuuid }}
+  attributes: []
+  language: UNKNOWN
+  {{- end }}
\ No newline at end of file
diff --git 
a/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-gc-label.yml
 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-gc-label.yml
new file mode 100644
index 0000000000..734b642cde
--- /dev/null
+++ 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-gc-label.yml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+type: TIME_SERIES_VALUES
+results:
+  {{- contains .results }}
+  - metric:
+      labels:
+        - key: _
+          value: "G1 Young Generation"
+    values:
+      {{- contains .values }}
+      - id: {{ notEmpty .id }}
+        value: {{ .value }}
+        traceid: null
+      - id: {{ notEmpty .id }}
+        value: null
+        traceid: null
+      {{- end}}
+  - metric:
+      labels:
+        - key: _
+          value: "G1 Old Generation"
+    values:
+      { { - contains .values } }
+      - id: { { notEmpty .id } }
+        value: { { .value } }
+        traceid: null
+      - id: { { notEmpty .id } }
+        value: null
+        traceid: null
+      { { - end } }
+  {{- end}}
+error: null
\ No newline at end of file
diff --git 
a/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-instance-label.yml
 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-instance-label.yml
new file mode 100644
index 0000000000..5cb8e9f069
--- /dev/null
+++ 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-instance-label.yml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+type: TIME_SERIES_VALUES
+results:
+  {{- contains .results }}
+  - metric:
+      labels:
+        - key: _
+          value: "Produce"
+    values:
+      {{- contains .values }}
+      - id: {{ notEmpty .id }}
+        value: {{ .value }}
+        traceid: null
+      - id: {{ notEmpty .id }}
+        value: null
+        traceid: null
+      {{- end}}
+  - metric:
+      labels:
+        - key: _
+          value: "Fetch"
+    values:
+      { { - contains .values } }
+      - id: { { notEmpty .id } }
+        value: { { .value } }
+        traceid: null
+      - id: { { notEmpty .id } }
+        value: null
+        traceid: null
+      { { - end } }
+  {{- end}}
+error: null
\ No newline at end of file
diff --git 
a/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-jvm-label.yml
 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-jvm-label.yml
new file mode 100644
index 0000000000..e0bac36858
--- /dev/null
+++ 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-jvm-label.yml
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+type: TIME_SERIES_VALUES
+results:
+  {{- contains .results }}
+  - metric:
+      labels:
+        - key: _
+          value: "heap"
+    values:
+      {{- contains .values }}
+      - id: {{ notEmpty .id }}
+        value: {{ .value }}
+        traceid: null
+      - id: {{ notEmpty .id }}
+        value: null
+        traceid: null
+      {{- end}}
+  {{- end}}
+error: null
\ No newline at end of file
diff --git 
a/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-scource-label.yml
 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-scource-label.yml
new file mode 100644
index 0000000000..b1119ebff8
--- /dev/null
+++ 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-scource-label.yml
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+type: TIME_SERIES_VALUES
+results:
+  {{- contains .results }}
+  - metric:
+      labels:
+        - key: _
+          value: "Produce"
+    values:
+      {{- contains .values }}
+      - id: {{ notEmpty .id }}
+        value: {{ .value }}
+        traceid: null
+      - id: {{ notEmpty .id }}
+        value: null
+        traceid: null
+      {{- end}}
+  - metric:
+      labels:
+        - key: _
+          value: "FetchFollower"
+    values:
+      { { - contains .values } }
+      - id: { { notEmpty .id } }
+        value: { { .value } }
+        traceid: null
+      - id: { { notEmpty .id } }
+        value: null
+        traceid: null
+      { { - end } }
+  - metric:
+      labels:
+        - key: _
+          value: "FetchConsumer"
+    values:
+      { { - contains .values } }
+      - id: { { notEmpty .id } }
+        value: { { .value } }
+        traceid: null
+      - id: { { notEmpty .id } }
+        value: null
+        traceid: null
+      { { - end } }
+  {{- end}}
+error: null
\ No newline at end of file
diff --git 
a/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-service-label.yml
 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-service-label.yml
new file mode 100644
index 0000000000..c39b4ecf04
--- /dev/null
+++ 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value-service-label.yml
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+type: TIME_SERIES_VALUES
+results:
+  {{- contains .results }}
+  - metric:
+      labels:
+      - key: _
+        value: broker1:7071
+    values:
+      {{- contains .values }}
+      - id: {{ notEmpty .id }}
+        value: {{ .value }}
+        traceid: null
+      - id: {{ notEmpty .id }}
+        value: null
+        traceid: null
+      {{- end}}
+  {{- end}}
+error: null
\ No newline at end of file
diff --git 
a/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value.yml 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value.yml
new file mode 100644
index 0000000000..c4bbbafa3b
--- /dev/null
+++ b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/metrics-has-value.yml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+type: TIME_SERIES_VALUES
+results:
+  {{- contains .results }}
+  - metric:
+      labels: []
+    values:
+      {{- contains .values }}
+      - id: {{ notEmpty .id }}
+        value: {{ .value }}
+        traceid: null
+      - id: {{ notEmpty .id }}
+        value: null
+        traceid: null
+      {{- end}}
+  {{- end}}
+error: null
\ No newline at end of file
diff --git a/test/e2e-v2/cases/kafka/kafka-monitoring/expected/service.yml 
b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/service.yml
new file mode 100644
index 0000000000..6427a9e8fe
--- /dev/null
+++ b/test/e2e-v2/cases/kafka/kafka-monitoring/expected/service.yml
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+  {{- contains . }}
+- id: {{ b64enc "kafka::kafka-cluster" }}.1
+  name: kafka::kafka-cluster
+  group: kafka
+  shortname: kafka-cluster
+  layers:
+    - KAFKA
+  normal: true
+  {{- end }}
\ No newline at end of file
diff --git a/test/e2e-v2/cases/kafka/kafka-monitoring/kafka-cases.yaml 
b/test/e2e-v2/cases/kafka/kafka-monitoring/kafka-cases.yaml
new file mode 100644
index 0000000000..2696944734
--- /dev/null
+++ b/test/e2e-v2/cases/kafka/kafka-monitoring/kafka-cases.yaml
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used 
to test.
+
+cases:
+  # service cases
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql service ls
+    expected: expected/service.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_under_replicated_partitions 
--service-name=kafka::kafka-cluster
+    expected: expected/metrics-has-value-service-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_offline_partitions_count 
--service-name=kafka::kafka-cluster
+    expected: expected/metrics-has-value-service-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_partition_count --service-name=kafka::kafka-cluster
+    expected: expected/metrics-has-value-service-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_leader_count --service-name=kafka::kafka-cluster
+    expected: expected/metrics-has-value-service-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_active_controller_count 
--service-name=kafka::kafka-cluster
+    expected: expected/metrics-has-value-service-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_leader_election_rate 
--service-name=kafka::kafka-cluster
+    expected: expected/metrics-has-value-service-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_unclean_leader_elections_per_second 
--service-name=kafka::kafka-cluster
+    expected: expected/metrics-has-value-service-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_max_lag --service-name=kafka::kafka-cluster
+    expected: expected/metrics-has-value-service-label.yml
+
+  # instance cases
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql instance ls 
--service-name=kafka::kafka-cluster
+    expected: expected/instance.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_cpu_time_total 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_memory_usage_percentage 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value-jvm-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_messages_per_second 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_bytes_in_per_second 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_bytes_out_per_second 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_replication_bytes_in_per_second 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_replication_bytes_out_per_second 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_under_replicated_partitions 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_under_min_isr_partition_count 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_partition_count 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_leader_count 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_isr_shrinks_per_second 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_isr_expands_per_second 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_max_lag --service-name=kafka::kafka-cluster 
--instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_purgatory_size 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value-instance-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_garbage_collector_count 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value-gc-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_request_queue_time_ms 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value-scource-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_remote_time_ms 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value-scource-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_response_queue_time_ms 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value-scource-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_response_send_time_ms 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value-scource-label.yml
+  - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics exec 
--expression=meter_kafka_broker_network_processor_avg_idle_percent 
--service-name=kafka::kafka-cluster --instance-name=broker1:7071
+    expected: expected/metrics-has-value.yml
+
+
diff --git 
a/test/e2e-v2/cases/kafka/kafka-monitoring/otel-collector-config.yaml 
b/test/e2e-v2/cases/kafka/kafka-monitoring/otel-collector-config.yaml
new file mode 100644
index 0000000000..11dd27a1d3
--- /dev/null
+++ b/test/e2e-v2/cases/kafka/kafka-monitoring/otel-collector-config.yaml
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+receivers:
+  prometheus:
+    config:
+      scrape_configs:
+        - job_name: "kafka-monitoring"
+          scrape_interval: 30s
+          static_configs:
+            - targets: 
+                - broker1:7071
+                - broker2:7072
+          relabel_configs:
+            - source_labels: [ ]
+              target_label: cluster
+              replacement: kafka-cluster
+            - source_labels: [ __address__ ]
+              regex: (.+)
+              target_label: broker
+              replacement: $$1
+
+processors:
+  batch:
+
+exporters:
+  otlp:
+    # The OAP Server address
+    endpoint: "oap:11800"
+    # The config format of OTEL version prior to 0.34.0, eg. 0.29.0, should be:
+    # insecure: true
+    tls:
+      insecure: true
+    #insecure: true
+  # Exports data to the console
+  logging:
+    logLevel: debug
+
+service:
+  pipelines:
+    metrics:
+      receivers: [prometheus]
+      processors: [batch]
+      exporters: [otlp, logging]
\ No newline at end of file

Reply via email to