This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new d3727b7a9f Enhance LAL to support convert LogData to 
DatabaseSlowStatement. (#9575)
d3727b7a9f is described below

commit d3727b7a9f0ac9127b52e88b3996db8ae3b09452
Author: yswdqz <[email protected]>
AuthorDate: Fri Sep 16 12:42:43 2022 +0800

    Enhance LAL to support convert LogData to DatabaseSlowStatement. (#9575)
---
 .github/workflows/skywalking.yaml                  |  10 +-
 docs/en/changes/changes.md                         |   1 +
 docs/en/concepts-and-designs/lal.md                |  96 ++++++++
 .../listener/DatabaseSlowStatementBuilder.java     |   4 +-
 oap-server/analyzer/log-analyzer/pom.xml           |   5 +
 .../skywalking/oap/log/analyzer/dsl/Binding.java   |  13 ++
 .../oap/log/analyzer/dsl/spec/AbstractSpec.java    |  11 +
 .../analyzer/dsl/spec/extractor/ExtractorSpec.java |  79 ++++++-
 .../dsl/spec/extractor/slowsql/SlowSqlSpec.java    |  65 ++++++
 .../log/analyzer/dsl/spec/filter/FilterSpec.java   |   4 +-
 .../skywalking/oap/log/analyzer/dsl/DSLTest.java   | 251 +++++++++++----------
 .../src/main/resources/application.yml             |   2 +-
 .../src/main/resources/lal/mysql-slowsql.yaml      |  42 ++--
 .../expected/{service.yml => db-has-value.yml}     |  15 +-
 test/e2e-v2/cases/mysql/expected/service.yml       |   8 +-
 test/e2e-v2/cases/mysql/mysql-cases.yaml           |  35 +--
 .../docker-compose.yaml}                           |  65 ++++--
 .../e2e.yaml                                       |   4 +-
 .../fluent-bit-parser.conf}                        |  17 +-
 .../mysql/mysql-slowsql/fluent-bit-script.lua      | 116 ++++++++++
 .../cases/mysql/mysql-slowsql/fluent-bit.conf      |  52 +++++
 .../mock.sql                                       |   5 +
 .../{expected/service.yml => mysql-slowsql/my.cnf} |  19 +-
 .../otel-collector-config.yaml                     |   2 +-
 test/e2e-v2/script/env                             |   2 +-
 25 files changed, 707 insertions(+), 216 deletions(-)

diff --git a/.github/workflows/skywalking.yaml 
b/.github/workflows/skywalking.yaml
index 64ef8cff19..ddcb13a2e3 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -550,8 +550,8 @@ jobs:
             config: test/e2e-v2/cases/vm/prometheus-node-exporter/e2e.yaml
           - name: So11y
             config: test/e2e-v2/cases/so11y/e2e.yaml
-          - name: MySQL Prometheus
-            config: test/e2e-v2/cases/mysql/prometheus-mysql-exporter/e2e.yaml
+          - name: MySQL Prometheus and slowsql
+            config: test/e2e-v2/cases/mysql/mysql-slowsql/e2e.yaml
           - name: PostgreSQL Prometheus
             config: test/e2e-v2/cases/postgresql/postgres-exporter/e2e.yaml
 
@@ -595,7 +595,7 @@ jobs:
         run: |
           echo "${{ matrix.test.env }}"  >> $GITHUB_ENV
       - name: ${{ matrix.test.name }}
-        uses: 
apache/skywalking-infra-e2e@afdf1cca0519d65bc480d8680b7a27f9b41fc421
+        uses: 
apache/skywalking-infra-e2e@81fab7dbf5bfb201166d7d8e0089f99d2dc761f3
         with:
           e2e-file: $GITHUB_WORKSPACE/${{ matrix.test.config }}
       - uses: actions/upload-artifact@v2
@@ -653,7 +653,7 @@ jobs:
         with:
           go-version: "1.16"
       - name: ${{ matrix.test.name }}
-        uses: 
apache/skywalking-infra-e2e@afdf1cca0519d65bc480d8680b7a27f9b41fc421
+        uses: 
apache/skywalking-infra-e2e@81fab7dbf5bfb201166d7d8e0089f99d2dc761f3
         env:
           ISTIO_VERSION: ${{ matrix.versions.istio }}
           KUBERNETES_VERSION: ${{ matrix.versions.kubernetes }}
@@ -696,7 +696,7 @@ jobs:
         with:
           go-version: "1.16"
       - name: Java version ${{ matrix.java-version }}
-        uses: 
apache/skywalking-infra-e2e@afdf1cca0519d65bc480d8680b7a27f9b41fc421
+        uses: 
apache/skywalking-infra-e2e@81fab7dbf5bfb201166d7d8e0089f99d2dc761f3
         env:
           SW_AGENT_JDK_VERSION: ${{ matrix.java-version }}
         with:
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index c8ee9a190b..3eb591c6db 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -9,6 +9,7 @@
 * Use prepareStatement in H2SQLExecutor#getByIDs.(No function change).
 * Bump up snakeyaml to 1.31 for fixing CVE-2022-25857.
 * Fix `DurationUtils.convertToTimeBucket` missed verify date format.
+* Enhance LAL to support converting LogData to DatabaseSlowStatement.
 * [**Breaking Change**] Change the LAL script format(Add layer property).
 * Adapt ElasticSearch 8.1+, migrate from removed APIs to recommended APIs.
 
diff --git a/docs/en/concepts-and-designs/lal.md 
b/docs/en/concepts-and-designs/lal.md
index 6fbb2605cd..9511748b31 100644
--- a/docs/en/concepts-and-designs/lal.md
+++ b/docs/en/concepts-and-designs/lal.md
@@ -43,6 +43,38 @@ filter {
 Note that when you put `regexp` in an `if` statement, you need to surround the 
expression with `()`
 like `regexp(<the expression>)`, instead of `regexp <the expression>`.
 
+- `tag`
+
+`tag` function provide a convenient way to get the value of a tag key.
+
+We can add tags like following:
+``` JSON
+[
+   {
+      "tags":{
+         "data":[
+            {
+               "key":"TEST_KEY",
+               "value":"TEST_VALUE"
+            }
+         ]
+      },
+      "body":{
+         ...
+      }
+      ...
+   }
+]
+``` 
+And we can use this method to get the value of the tag key `TEST_KEY`.
+```groovy
+filter {
+    if (tag("TEST_KEY") == "TEST_VALUE") {
+         ...   
+    }
+}
+```
+
 ### Parser
 
 Parsers are responsible for parsing the raw logs into structured data in 
SkyWalking for further processing. There are 3
@@ -243,6 +275,70 @@ metrics:
     exp: http_response_time.sum(['le', 'service', 
'instance']).increase('PT5M').histogram().histogram_percentile([50,70,90,99])
 ```
 
+- `slowSql`
+
+`slowSql` aims to convert LogData to DatabaseSlowStatement. It extracts data 
from `parsed` result and save them as DatabaseSlowStatement. SlowSql will not 
abort or edit logs, you can use other LAL for further processing.
+SlowSql will reuse `service`, `layer` and `timestamp` of extractor, so it is 
necessary to use `SlowSQL` after setting these.
+We require a log tag `"LOG_KIND" = "SLOW_SQL"` to make OAP distinguish slow 
SQL logs from other log reports.
+An example of JSON sent to OAP is as following:
+``` json
+[
+   {
+      "tags":{
+         "data":[
+            {
+               "key":"LOG_KIND",
+               "value":"SLOW_SQL"
+            }
+         ]
+      },
+      "layer":"MYSQL",
+      "body":{
+         "json":{
+            
"json":"{\"time\":\"1663063011\",\"id\":\"cb92c1a5b-2691e-fb2f-457a-9c72a392d9ed\",\"service\":\"root[root]@[localhost]\",\"statement\":\"select
 sleep(2);\",\"layer\":\"MYSQL\",\"query_time\":2000}"
+         }
+      },
+      "service":"root[root]@[localhost]"
+   }
+]
+```
+
+- `statement`
+
+`statement` extracts the SQL statement from the `parsed` result, and set it 
into the `DatabaseSlowStatement`, which will be
+persisted (if not dropped) and is used to associate with TopNDatabaseStatement.
+
+- `latency`
+
+`latency` extracts the latency from the `parsed` result, and set it into the 
`DatabaseSlowStatement`, which will be
+persisted (if not dropped) and is used to associate with TopNDatabaseStatement.
+
+- `id`
+
+`id` extracts the id from the `parsed` result, and set it into the 
`DatabaseSlowStatement`, which will be persisted (if not
+dropped) and is used to associate with TopNDatabaseStatement.
+
+A Example of LAL to distinguish slow logs:
+
+```groovy
+filter {
+  json{
+  }
+  extractor{
+    layer parsed.layer as String
+    service parsed.service as String
+    timestamp parsed.time as String
+    if (tag("LOG_KIND") == "SLOW_SQL") {
+      slowSql {
+        id parsed.id as String
+        statement parsed.statement as String
+        latency parsed.query_time as Long
+      }
+    }
+  }
+}
+```
+
 ### Sink
 
 Sinks are the persistent layer of the LAL. By default, all the logs of each 
filter are persisted into the storage.
diff --git 
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java
 
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java
index 7a0c48f0d3..6a9e1e9d0a 100644
--- 
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java
+++ 
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java
@@ -52,11 +52,11 @@ public class DatabaseSlowStatementBuilder {
     @Setter
     private long timeBucket;
 
-    void prepare() {
+    public void prepare() {
         this.serviceName = namingControl.formatServiceName(serviceName);
     }
 
-    DatabaseSlowStatement toDatabaseSlowStatement() {
+    public DatabaseSlowStatement toDatabaseSlowStatement() {
         DatabaseSlowStatement dbSlowStat = new DatabaseSlowStatement();
         dbSlowStat.setId(id);
         dbSlowStat.setTraceId(traceId);
diff --git a/oap-server/analyzer/log-analyzer/pom.xml 
b/oap-server/analyzer/log-analyzer/pom.xml
index edec538e98..5a1dd3855e 100644
--- a/oap-server/analyzer/log-analyzer/pom.xml
+++ b/oap-server/analyzer/log-analyzer/pom.xml
@@ -38,6 +38,11 @@
             <artifactId>meter-analyzer</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>agent-analyzer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy</artifactId>
diff --git 
a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java
 
b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java
index 00b67c5381..ff80fec594 100644
--- 
a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java
+++ 
b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java
@@ -30,6 +30,8 @@ import java.util.regex.Matcher;
 import lombok.Getter;
 import org.apache.skywalking.apm.network.logging.v3.LogData;
 import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
+import 
org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.DatabaseSlowStatementBuilder;
+
 import org.apache.skywalking.oap.server.core.source.Log;
 
 /**
@@ -49,6 +51,8 @@ public class Binding extends groovy.lang.Binding {
 
     public static final String KEY_LOG_CONTAINER = "log_container";
 
+    public static final String KEY_DATABASE_SLOW_STATEMENT = 
"database_slow_statement";
+
     public Binding() {
         setProperty(KEY_PARSED, new Parsed());
     }
@@ -94,6 +98,15 @@ public class Binding extends groovy.lang.Binding {
         return (Parsed) getProperty(KEY_PARSED);
     }
 
+    public DatabaseSlowStatementBuilder databaseSlowStatement() {
+        return (DatabaseSlowStatementBuilder) 
getProperty(KEY_DATABASE_SLOW_STATEMENT);
+    }
+
+    public Binding databaseSlowStatement(DatabaseSlowStatementBuilder 
databaseSlowStatementBuilder) {
+        setProperty(KEY_DATABASE_SLOW_STATEMENT, databaseSlowStatementBuilder);
+        return this;
+    }
+
     public Binding save() {
         setProperty(KEY_SAVE, true);
         return this;
diff --git 
a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/AbstractSpec.java
 
b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/AbstractSpec.java
index 260d61f488..d815095776 100644
--- 
a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/AbstractSpec.java
+++ 
b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/AbstractSpec.java
@@ -22,6 +22,7 @@ import groovy.lang.Closure;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.experimental.Accessors;
+import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
 import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
 import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
@@ -49,4 +50,14 @@ public abstract class AbstractSpec {
     public Object propertyMissing(final String name) {
         return BINDING.get().getVariable(name);
     }
+
+    @SuppressWarnings("unused")
+    public String tag(String key) {
+        return BINDING.get().log().getTags().getDataList()
+                .stream()
+                .filter(data -> key.equals(data.getKey()))
+                .map(KeyStringValuePair::getValue)
+                .findFirst()
+                .orElse("");
+    }
 }
diff --git 
a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
 
b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
index 51422c8c17..7eb6ce48e7 100644
--- 
a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
+++ 
b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
@@ -22,36 +22,60 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import groovy.lang.Closure;
 import groovy.lang.DelegatesTo;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.List;
+
 import lombok.experimental.Delegate;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
 import org.apache.skywalking.apm.network.logging.v3.LogData;
 import org.apache.skywalking.apm.network.logging.v3.TraceContext;
 import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+import 
org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor.slowsql.SlowSqlSpec;
 import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
 import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
 import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
 import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamilyBuilder;
+import 
org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.DatabaseSlowStatementBuilder;
 import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
+import org.apache.skywalking.oap.server.core.config.NamingControl;
+import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement;
+import org.apache.skywalking.oap.server.core.source.ServiceMeta;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.nonNull;
 import static 
org.apache.skywalking.oap.server.library.util.StringUtil.isNotBlank;
 
 public class ExtractorSpec extends AbstractSpec {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SlowSqlSpec.class);
 
     private final List<MetricConvert> metricConverts;
 
+    private final SlowSqlSpec slowSql;
+
+    private final NamingControl namingControl;
+
+    private final SourceReceiver sourceReceiver;
+
+    private static final DateTimeFormatter DTF = 
DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
+
     public ExtractorSpec(final ModuleManager moduleManager,
                          final LogAnalyzerModuleConfig moduleConfig) throws 
ModuleStartException {
         super(moduleManager, moduleConfig);
@@ -63,6 +87,14 @@ public class ExtractorSpec extends AbstractSpec {
                                      .stream()
                                      .map(it -> new MetricConvert(it, 
meterSystem))
                                      .collect(Collectors.toList());
+
+        slowSql = new SlowSqlSpec(moduleManager(), moduleConfig());
+
+        namingControl = moduleManager.find(CoreModule.NAME)
+                .provider()
+                .getService(NamingControl.class);
+
+        sourceReceiver = 
moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
     }
 
     @SuppressWarnings("unused")
@@ -215,6 +247,51 @@ public class ExtractorSpec extends AbstractSpec {
         }
     }
 
+    @SuppressWarnings("unused")
+    public void slowSql(@DelegatesTo(SlowSqlSpec.class) final Closure<?> cl) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        LogData.Builder log = BINDING.get().log();
+        if (log.getLayer() == null
+                || log.getService() == null
+                || log.getTimestamp() < 1) {
+            LOGGER.warn("SlowSql extracts failed, maybe something is not 
configured.");
+            return;
+        }
+        DatabaseSlowStatementBuilder builder = new 
DatabaseSlowStatementBuilder(namingControl);
+        builder.setLayer(Layer.nameOf(log.getLayer()));
+
+        LocalDateTime localDateTime = 
Instant.ofEpochSecond(log.getTimestamp()).atZone(ZoneId.systemDefault()).toLocalDateTime();
+        String timeBucket = DTF.format(localDateTime);
+        builder.setTimeBucket(Long.parseLong(timeBucket));
+        builder.setServiceName(log.getService());
+
+        ServiceMeta serviceMeta = new ServiceMeta();
+        serviceMeta.setName(namingControl.formatServiceName(log.getService()));
+        serviceMeta.setLayer(builder.getLayer());
+        serviceMeta.setTimeBucket(builder.getTimeBucket());
+        BINDING.get().databaseSlowStatement(builder);
+
+        cl.setDelegate(slowSql);
+        cl.call();
+
+        if (builder.getId() == null
+                || builder.getLatency() < 1
+                || builder.getStatement() == null) {
+            LOGGER.warn("SlowSql extracts failed, maybe something is not 
configured.");
+            return;
+        }
+
+        String entityId = serviceMeta.getEntityId();
+        builder.prepare();
+        DatabaseSlowStatement databaseSlowStatement = 
builder.toDatabaseSlowStatement();
+        databaseSlowStatement.setDatabaseServiceId(entityId);
+
+        sourceReceiver.receive(databaseSlowStatement);
+        sourceReceiver.receive(serviceMeta);
+    }
+
     public static class SampleBuilder {
         @Delegate
         private final Sample.SampleBuilder sampleBuilder = Sample.builder();
diff --git 
a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/slowsql/SlowSqlSpec.java
 
b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/slowsql/SlowSqlSpec.java
new file mode 100644
index 0000000000..5230352528
--- /dev/null
+++ 
b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/slowsql/SlowSqlSpec.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor.slowsql;
+
+import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
+
+import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
+import 
org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.DatabaseSlowStatementBuilder;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+import static java.util.Objects.nonNull;
+
+public class SlowSqlSpec extends AbstractSpec {
+
+    public SlowSqlSpec(final ModuleManager moduleManager,
+                       final LogAnalyzerModuleConfig moduleConfig) {
+        super(moduleManager, moduleConfig);
+    }
+
+    public void latency(final Long latency) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (nonNull(latency)) {
+            final DatabaseSlowStatementBuilder databaseSlowStatementBuilder = 
BINDING.get().databaseSlowStatement();
+            databaseSlowStatementBuilder.setLatency(latency);
+        }
+    }
+
+    public void statement(final String statement) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (nonNull(statement)) {
+            final DatabaseSlowStatementBuilder databaseSlowStatementBuilder = 
BINDING.get().databaseSlowStatement();
+            databaseSlowStatementBuilder.setStatement(statement);
+        }
+    }
+
+    public void id(final String id) {
+        if (BINDING.get().shouldAbort()) {
+            return;
+        }
+        if (nonNull(id)) {
+            final DatabaseSlowStatementBuilder databaseSlowStatementBuilder = 
BINDING.get().databaseSlowStatement();
+            databaseSlowStatementBuilder.setId(id);
+        }
+    }
+}
diff --git 
a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java
 
b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java
index c5a0fed13a..7fb7557b75 100644
--- 
a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java
+++ 
b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/filter/FilterSpec.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.skywalking.apm.network.logging.v3.LogData;
 import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
 import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
@@ -41,6 +42,7 @@ import 
org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogSinkListe
 import 
org.apache.skywalking.oap.log.analyzer.provider.log.listener.RecordSinkListener;
 import 
org.apache.skywalking.oap.log.analyzer.provider.log.listener.TrafficSinkListener;
 import org.apache.skywalking.oap.server.core.source.Log;
+
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.slf4j.Logger;
@@ -127,7 +129,7 @@ public class FilterSpec extends AbstractSpec {
         final LogData.Builder logData = BINDING.get().log();
         try {
             final Map<String, Object> parsed = yamlParser.create().load(
-                logData.getBody().getYaml().getYaml()
+                    logData.getBody().getYaml().getYaml()
             );
 
             BINDING.get().parsed(parsed);
diff --git 
a/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLTest.java
 
b/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLTest.java
index 6a5a733fe0..c0296359cf 100644
--- 
a/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLTest.java
+++ 
b/oap-server/analyzer/log-analyzer/src/test/java/org/apache/skywalking/oap/log/analyzer/dsl/DSLTest.java
@@ -45,120 +45,139 @@ public class DSLTest {
     @Parameterized.Parameters(name = "{index}: {0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(
-            new String[] {
-                "parser",
-                "filter {\n" +
-                    "  json {\n" +
-                    "    abortOnFailure false // for test purpose, we want to 
persist all logs\n" +
-                    "  }\n" +
-                    "  text {\n" +
-                    "    abortOnFailure false // for test purpose, we want to 
persist all logs\n" +
-                    "    regexp $/(?s)(?<timestamp>\\d{4}-\\d{2}-\\d{2} 
\\d{2}:\\d{2}:\\d{2}.\\d{3}) \\[TID:(?<tid>.+?)] \\[(?<thread>.+?)] 
(?<level>\\w{4,}) (?<logger>.{1,36}) (?<msg>.+)/$" +
-                    "  }\n" +
-                    "  yaml {\n" +
-                    "    abortOnFailure false // for test purpose, we want to 
persist all logs\n" +
-                    "  }" +
-                    "}",
+                new String[] {
+                        "parser",
+                        "filter {\n" +
+                                "  json {\n" +
+                                "    abortOnFailure false // for test purpose, 
we want to persist all logs\n" +
+                                "  }\n" +
+                                "  text {\n" +
+                                "    abortOnFailure false // for test purpose, 
we want to persist all logs\n" +
+                                "    regexp 
$/(?s)(?<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}) 
\\[TID:(?<tid>.+?)] \\[(?<thread>.+?)] (?<level>\\w{4,}) (?<logger>.{1,36}) 
(?<msg>.+)/$" +
+                                "  }\n" +
+                                "  yaml {\n" +
+                                "    abortOnFailure false // for test purpose, 
we want to persist all logs\n" +
+                                "  }" +
+                                "}",
                 },
-            new String[] {
-                "extractor",
-                "filter {\n" +
-                    "  extractor {\n" +
-                    "    service \"test\"\n" +
-                    "    instance \"test\"\n" +
-                    "    endpoint \"test\"\n" +
-                    "    layer \"mesh\"\n" +
-                    "    traceId \"123\"\n" +
-                    "    segmentId \"123\"\n" +
-                    "    spanId \"123\"\n" +
-                    "    timestamp \"123\"\n" +
-                    "    metrics {\n" +
-                    "      name \"metricsName\"\n" +
-                    "      value 123\n" +
-                    "      timestamp \"123\"\n" +
-                    "      labels \"k1\": \"v1\"\n" +
-                    "    }\n" +
-                    "  }\n" +
-                    "}",
+                new String[] {
+                        "extractor",
+                        "filter {\n" +
+                                "  extractor {\n" +
+                                "    service \"test\"\n" +
+                                "    instance \"test\"\n" +
+                                "    endpoint \"test\"\n" +
+                                "    layer \"mesh\"\n" +
+                                "    traceId \"123\"\n" +
+                                "    segmentId \"123\"\n" +
+                                "    spanId \"123\"\n" +
+                                "    timestamp \"123\"\n" +
+                                "    metrics {\n" +
+                                "      name \"metricsName\"\n" +
+                                "      value 123\n" +
+                                "      timestamp \"123\"\n" +
+                                "      labels \"k1\": \"v1\"\n" +
+                                "    }\n" +
+                                "  }\n" +
+                                "}",
                 },
-            new String[] {
-                "sink",
-                "filter {\n" +
-                    "  sink {\n" +
-                    "    enforcer {\n" +
-                    "    }\n" +
-                    "    dropper {\n" +
-                    "    }\n" +
-                    "    sampler {\n" +
-                    "      if (parsed?.commonProperties?.responseFlags) {\n" +
-                    "        // use service:errorCode as sampler id so that 
each service:errorCode has its own sampler,\n" +
-                    "        // e.g. 
checkoutservice:[upstreamConnectionFailure], 
checkoutservice:[upstreamRetryLimitExceeded]\n" +
-                    "        
rateLimit(\"${log.service}:${log.body.json.json}:${log.tags.getData(0).key}:${parsed?.commonProperties?.responseFlags}\")
 {\n" +
-                    "          rpm 100\n" +
-                    "        }\n" +
-                    "      } else {\n" +
-                    "        // use service:responseCode as sampler id so that 
each service:responseCode has its own sampler,\n" +
-                    "        // e.g. checkoutservice:500, 
checkoutservice:404.\n" +
-                    "        
rateLimit(\"${log.service}:${log.body?.type}:${log.traceContext?.traceId}:${parsed?.response?.responseCode}\")
 {\n" +
-                    "          rpm 100\n" +
-                    "        }\n" +
-                    "      }\n" +
-                    "    }\n" +
-                    "  }\n" +
-                    "}",
+                new String[] {
+                        "sink",
+                        "filter {\n" +
+                                "  sink {\n" +
+                                "    enforcer {\n" +
+                                "    }\n" +
+                                "    dropper {\n" +
+                                "    }\n" +
+                                "    sampler {\n" +
+                                "      if 
(parsed?.commonProperties?.responseFlags) {\n" +
+                                "        // use service:errorCode as sampler 
id so that each service:errorCode has its own sampler,\n" +
+                                "        // e.g. 
checkoutservice:[upstreamConnectionFailure], 
checkoutservice:[upstreamRetryLimitExceeded]\n" +
+                                "        
rateLimit(\"${log.service}:${log.body.json.json}:${log.tags.getData(0).key}:${parsed?.commonProperties?.responseFlags}\")
 {\n" +
+                                "          rpm 100\n" +
+                                "        }\n" +
+                                "      } else {\n" +
+                                "        // use service:responseCode as 
sampler id so that each service:responseCode has its own sampler,\n" +
+                                "        // e.g. checkoutservice:500, 
checkoutservice:404.\n" +
+                                "        
rateLimit(\"${log.service}:${log.body?.type}:${log.traceContext?.traceId}:${parsed?.response?.responseCode}\")
 {\n" +
+                                "          rpm 100\n" +
+                                "        }\n" +
+                                "      }\n" +
+                                "    }\n" +
+                                "  }\n" +
+                                "}",
                 },
-            new String[] {
-                "e2e",
-                "filter {\n" +
-                    "  text {\n" +
-                    "    abortOnFailure false // for test purpose, we want to 
persist all logs\n" +
-                    "    regexp $/(?s)(?<timestamp>\\d{4}-\\d{2}-\\d{2} 
\\d{2}:\\d{2}:\\d{2}.\\d{3}) \\[TID:(?<tid>.+?)] \\[(?<thread>.+?)] 
(?<level>\\w{4,}) (?<logger>.{1,36}) (?<msg>.+)/$\n" +
-                    "  }\n" +
-                    "  extractor {\n" +
-                    "    metrics {\n" +
-                    "      timestamp \"${log.timestamp}\"\n" +
-                    "      labels level: parsed.level, service: log.service, 
instance: log.serviceInstance\n" +
-                    "      name \"log_count\"\n" +
-                    "      value 1\n" +
-                    "    }\n" +
-                    "  }\n" +
-                    "  sink {\n" +
-                    "  }\n" +
-                    "}\n",
+                new String[] {
+                        "e2e",
+                        "filter {\n" +
+                                "  text {\n" +
+                                "    abortOnFailure false // for test purpose, 
we want to persist all logs\n" +
+                                "    regexp 
$/(?s)(?<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}) 
\\[TID:(?<tid>.+?)] \\[(?<thread>.+?)] (?<level>\\w{4,}) (?<logger>.{1,36}) 
(?<msg>.+)/$\n" +
+                                "  }\n" +
+                                "  extractor {\n" +
+                                "    metrics {\n" +
+                                "      timestamp \"${log.timestamp}\"\n" +
+                                "      labels level: parsed.level, service: 
log.service, instance: log.serviceInstance\n" +
+                                "      name \"log_count\"\n" +
+                                "      value 1\n" +
+                                "    }\n" +
+                                "  }\n" +
+                                "  sink {\n" +
+                                "  }\n" +
+                                "}\n",
                 },
-            new String[] {
-                "e2e",
-                "filter {\n" +
-                    "  json {\n" +
-                    "  }\n" +
-                    "  // only collect abnormal logs (http status code >= 300, 
or commonProperties?.responseFlags is not empty)\n" +
-                    "  if (parsed?.response?.responseCode as Integer < 400 && 
!parsed?.commonProperties?.responseFlags) {\n" +
-                    "    abort {}\n" +
-                    "  }\n" +
-                    "  extractor {\n" +
-                    "    if (parsed?.response?.responseCode) {\n" +
-                    "      tag 'status.code': parsed?.response?.responseCode 
as int\n" +
-                    "    }\n" +
-                    "    tag 'response.flag': 
(parsed?.commonProperties?.responseFlags as Map)?.keySet()\n" +
-                    "  }\n" +
-                    "  sink {\n" +
-                    "    sampler {\n" +
-                    "      if (parsed?.commonProperties?.responseFlags) {\n" +
-                    "        // use service:errorCode as sampler id so that 
each service:errorCode has its own sampler,\n" +
-                    "        // e.g. 
checkoutservice:[upstreamConnectionFailure], 
checkoutservice:[upstreamRetryLimitExceeded]\n" +
-                    "        
rateLimit(\"${log.service}:${(parsed?.commonProperties?.responseFlags as 
Map)?.keySet()}\") {\n" +
-                    "          rpm 100\n" +
-                    "        }\n" +
-                    "      } else {\n" +
-                    "        // use service:responseCode as sampler id so that 
each service:responseCode has its own sampler,\n" +
-                    "        // e.g. checkoutservice:500, 
checkoutservice:404.\n" +
-                    "        
rateLimit(\"${log.service}:${parsed?.response?.responseCode}\") {\n" +
-                    "          rpm 100\n" +
-                    "        }\n" +
-                    "      }\n" +
-                    "    }\n" +
-                    "  }\n" +
-                    "}\n",
+                new String[] {
+                        "e2e",
+                        "filter {\n" +
+                                "  json {\n" +
+                                "  }\n" +
+                                "  // only collect abnormal logs (http status 
code >= 300, or commonProperties?.responseFlags is not empty)\n" +
+                                "  if (parsed?.response?.responseCode as 
Integer < 400 && !parsed?.commonProperties?.responseFlags) {\n" +
+                                "    abort {}\n" +
+                                "  }\n" +
+                                "  extractor {\n" +
+                                "    if (parsed?.response?.responseCode) {\n" +
+                                "      tag 'status.code': 
parsed?.response?.responseCode as int\n" +
+                                "    }\n" +
+                                "    tag 'response.flag': 
(parsed?.commonProperties?.responseFlags as Map)?.keySet()\n" +
+                                "  }\n" +
+                                "  sink {\n" +
+                                "    sampler {\n" +
+                                "      if 
(parsed?.commonProperties?.responseFlags) {\n" +
+                                "        // use service:errorCode as sampler 
id so that each service:errorCode has its own sampler,\n" +
+                                "        // e.g. 
checkoutservice:[upstreamConnectionFailure], 
checkoutservice:[upstreamRetryLimitExceeded]\n" +
+                                "        
rateLimit(\"${log.service}:${(parsed?.commonProperties?.responseFlags as 
Map)?.keySet()}\") {\n" +
+                                "          rpm 100\n" +
+                                "        }\n" +
+                                "      } else {\n" +
+                                "        // use service:responseCode as 
sampler id so that each service:responseCode has its own sampler,\n" +
+                                "        // e.g. checkoutservice:500, 
checkoutservice:404.\n" +
+                                "        
rateLimit(\"${log.service}:${parsed?.response?.responseCode}\") {\n" +
+                                "          rpm 100\n" +
+                                "        }\n" +
+                                "      }\n" +
+                                "    }\n" +
+                                "  }\n" +
+                                "}\n",
+                },
+                new String[] {
+                        "extractor-slowSql",
+                        "filter {\n" +
+                                "        json{\n" +
+                                "        }\n" +
+                                "        extractor{\n" +
+                                "          layer parsed.layer as String\n" +
+                                "          service parsed.service as String\n" 
+
+                                "          timestamp parsed.time as String\n" +
+                                "          if (tag(\"LOG_KIND\") == 
\"SLOW_SQL\") {\n" +
+                                "             slowSql {\n" +
+                                "                      id parsed.id as 
String\n" +
+                                "                      statement 
parsed.statement as String\n" +
+                                "                      latency 
parsed.query_time as Long\n" +
+                                "                     }\n" +
+                                "          }\n" +
+                                "        }\n" +
+                                "      }"
                 }
         );
     }
@@ -177,14 +196,14 @@ public class DSLTest {
         
when(manager.find(anyString())).thenReturn(mock(ModuleProviderHolder.class));
         
when(manager.find(CoreModule.NAME).provider()).thenReturn(mock(ModuleServiceHolder.class));
         
when(manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class))
-            .thenReturn(mock(SourceReceiver.class));
+                .thenReturn(mock(SourceReceiver.class));
         
when(manager.find(CoreModule.NAME).provider().getService(ConfigService.class))
-            .thenReturn(mock(ConfigService.class));
+                .thenReturn(mock(ConfigService.class));
         when(manager.find(CoreModule.NAME)
-                    .provider()
-                    .getService(ConfigService.class)
-                    .getSearchableLogsTags())
-            .thenReturn("");
+                .provider()
+                .getService(ConfigService.class)
+                .getSearchableLogsTags())
+                .thenReturn("");
     }
 
     @Test
diff --git a/oap-server/server-starter/src/main/resources/application.yml 
b/oap-server/server-starter/src/main/resources/application.yml
index 9d6488e4d9..f52ecafabc 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -239,7 +239,7 @@ agent-analyzer:
 log-analyzer:
   selector: ${SW_LOG_ANALYZER:default}
   default:
-    lalFiles: ${SW_LOG_LAL_FILES:default}
+    lalFiles: ${SW_LOG_LAL_FILES:envoy-als,mysql-slowsql,default}
     malFiles: ${SW_LOG_MAL_FILES:""}
 
 event-analyzer:
diff --git a/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/mock.sql 
b/oap-server/server-starter/src/main/resources/lal/mysql-slowsql.yaml
similarity index 61%
copy from test/e2e-v2/cases/mysql/prometheus-mysql-exporter/mock.sql
copy to oap-server/server-starter/src/main/resources/lal/mysql-slowsql.yaml
index a3d60701ed..774da2955d 100644
--- a/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/mock.sql
+++ b/oap-server/server-starter/src/main/resources/lal/mysql-slowsql.yaml
@@ -13,25 +13,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-CREATE DATABASE IF NOT EXISTS test;
-USE test;
-CREATE TABLE IF NOT EXISTS `t1`(
-   `te1` VARCHAR(100) NOT NULL
-)ENGINE=InnoDB DEFAULT CHARSET=utf8;
-SET GLOBAL event_scheduler = 1;
-
-CREATE EVENT `event_1` 
-ON SCHEDULE EVERY 1 SECOND 
-DO INSERT INTO t1 values('test');
-
-CREATE EVENT `event_2` 
-ON SCHEDULE EVERY 1 SECOND 
-DO UPDATE t1 SET `te1` = 1;
-
-CREATE EVENT `event_3` 
-ON SCHEDULE EVERY 1 SECOND 
-DO DELETE FROM t1;
-
-CREATE EVENT `event_4` 
-ON SCHEDULE EVERY 1 SECOND 
-DO COMMIT;
+rules:
+  - name: mysql-slowsql
+    layer: MYSQL
+    dsl: |
+      filter {
+        json{
+        }
+        extractor{
+          layer parsed.layer as String
+          service parsed.service as String
+          timestamp parsed.time as String
+          if (tag("LOG_KIND") == "SLOW_SQL") {
+             slowSql {
+               id parsed.id as String
+               statement parsed.statement as String
+               latency parsed.query_time as Long
+             }
+          }
+        }
+      }
diff --git a/test/e2e-v2/cases/mysql/expected/service.yml 
b/test/e2e-v2/cases/mysql/expected/db-has-value.yml
similarity index 84%
copy from test/e2e-v2/cases/mysql/expected/service.yml
copy to test/e2e-v2/cases/mysql/expected/db-has-value.yml
index f499c9d32e..a9ec0add46 100644
--- a/test/e2e-v2/cases/mysql/expected/service.yml
+++ b/test/e2e-v2/cases/mysql/expected/db-has-value.yml
@@ -14,11 +14,10 @@
 # limitations under the License.
 
 {{- contains . }}
-- id: {{ b64enc "mysql::showcase" }}.1
-  name: mysql::showcase
-  shortname: showcase
-  group: mysql
-  normal: true
-  layers:
-    - MYSQL
-{{- end }}
\ No newline at end of file
+- key: 0
+  value:
+  {{- contains .value }}
+  - key: {{ notEmpty .key }}
+    value: {{ notEmpty .value }}
+  {{- end }}
+{{- end }}
diff --git a/test/e2e-v2/cases/mysql/expected/service.yml 
b/test/e2e-v2/cases/mysql/expected/service.yml
index f499c9d32e..4b2fb10958 100644
--- a/test/e2e-v2/cases/mysql/expected/service.yml
+++ b/test/e2e-v2/cases/mysql/expected/service.yml
@@ -14,11 +14,11 @@
 # limitations under the License.
 
 {{- contains . }}
-- id: {{ b64enc "mysql::showcase" }}.1
-  name: mysql::showcase
-  shortname: showcase
-  group: mysql
+- id: {{ b64enc "mysql::root[root]" }}.1
+  name: mysql::root[root]
+  shortname: root[root]
   normal: true
+  group: "mysql"
   layers:
     - MYSQL
 {{- end }}
\ No newline at end of file
diff --git a/test/e2e-v2/cases/mysql/mysql-cases.yaml 
b/test/e2e-v2/cases/mysql/mysql-cases.yaml
index 7f4a79d69d..ea3d7839f9 100644
--- a/test/e2e-v2/cases/mysql/mysql-cases.yaml
+++ b/test/e2e-v2/cases/mysql/mysql-cases.yaml
@@ -20,36 +20,39 @@
     - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql service ls
       expected:  expected/service.yml
     # metrics
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_uptime --service-name=mysql::showcase |yq e 'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_uptime --service-name=mysql::root[root] |yq e 'to_entries' -
       expected:  expected/metrics-has-value.yml
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_innodb_buffer_pool_size --service-name=mysql::showcase |yq e 
'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_innodb_buffer_pool_size --service-name=mysql::root[root] |yq 
e 'to_entries' -
       expected:  expected/metrics-has-value.yml
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_max_connections --service-name=mysql::showcase |yq e 
'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_max_connections --service-name=mysql::root[root] |yq e 
'to_entries' -
       expected:  expected/metrics-has-value.yml
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_thread_cache_size --service-name=mysql::showcase |yq e 
'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_thread_cache_size --service-name=mysql::root[root] |yq e 
'to_entries' -
       expected:  expected/metrics-has-value.yml
       
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_commands_select_rate --service-name=mysql::showcase |yq e 
'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_commands_select_rate --service-name=mysql::root[root] |yq e 
'to_entries' -
       expected:  expected/metrics-has-value.yml
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_commands_insert_rate --service-name=mysql::showcase |yq e 
'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_commands_insert_rate --service-name=mysql::root[root] |yq e 
'to_entries' -
       expected:  expected/metrics-has-value.yml
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_commands_update_rate --service-name=mysql::showcase |yq e 
'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_commands_update_rate --service-name=mysql::root[root] |yq e 
'to_entries' -
       expected:  expected/metrics-has-value.yml
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_commands_delete_rate --service-name=mysql::showcase |yq e 
'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_commands_delete_rate --service-name=mysql::root[root] |yq e 
'to_entries' -
       expected:  expected/metrics-has-value.yml
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_qps --service-name=mysql::showcase |yq e 'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_qps --service-name=mysql::root[root] |yq e 'to_entries' -
       expected:  expected/metrics-has-value.yml
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_tps --service-name=mysql::showcase |yq e 'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_tps --service-name=mysql::root[root] |yq e 'to_entries' -
       expected:  expected/metrics-has-value.yml
 
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_threads_connected --service-name=mysql::showcase |yq e 
'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_threads_connected --service-name=mysql::root[root] |yq e 
'to_entries' -
       expected:  expected/metrics-has-value.yml
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_threads_created --service-name=mysql::showcase |yq e 
'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_threads_created --service-name=mysql::root[root] |yq e 
'to_entries' -
       expected:  expected/metrics-has-value.yml
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_threads_running --service-name=mysql::showcase |yq e 
'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_threads_running --service-name=mysql::root[root] |yq e 
'to_entries' -
       expected:  expected/metrics-has-value.yml
 
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_connects_aborted --service-name=mysql::showcase |yq e 
'to_entries' -
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_connects_aborted --service-name=mysql::root[root] |yq e 
'to_entries' -
       expected:  expected/metrics-has-value.yml
-    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_connects_available --service-name=mysql::showcase |yq e 
'to_entries' -
-      expected:  expected/metrics-has-value.yml
\ No newline at end of file
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics linear 
--name=meter_mysql_connects_available --service-name=mysql::root[root] |yq e 
'to_entries' -
+      expected:  expected/metrics-has-value.yml
+    # slow sql
+    - query: swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql metrics sampled-record 
--name=top_n_database_statement |yq e 'to_entries | with(.[] ; .value=(.value | 
to_entries))' -
+      expected:  expected/db-has-value.yml
\ No newline at end of file
diff --git 
a/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/docker-compose.yml 
b/test/e2e-v2/cases/mysql/mysql-slowsql/docker-compose.yaml
similarity index 52%
rename from test/e2e-v2/cases/mysql/prometheus-mysql-exporter/docker-compose.yml
rename to test/e2e-v2/cases/mysql/mysql-slowsql/docker-compose.yaml
index 44b09559e6..2a950de0b9 100644
--- a/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/docker-compose.yml
+++ b/test/e2e-v2/cases/mysql/mysql-slowsql/docker-compose.yaml
@@ -20,34 +20,66 @@ services:
     extends:
       file: ../../../script/docker-compose/base-compose.yml
       service: oap
-    environment:
-      SW_OTEL_RECEIVER: default
-      SW_OTEL_RECEIVER_ENABLED_OTEL_RULES: mysql
     ports:
       - 12800
+    entrypoint: ['sh', '-c', '/download-mysql.sh && chmod 777 
/skywalking/docker-entrypoint.sh && /skywalking/docker-entrypoint.sh']
+    networks:
+      - e2e
+    environment:
+      - TZ=Asia/Shanghai
+      - SW_STORAGE=mysql
+      - SW_DATA_SOURCE_USER=root
+      - SW_DATA_SOURCE_PASSWORD=password
+      - SW_JDBC_URL=jdbc:mysql://mysql:3306/swtest
+      - SW_CORE_TOPN_REPORT_PERIOD=2
+      - SW_OTEL_RECEIVER=default
+      - SW_OTEL_RECEIVER_ENABLED_OTEL_RULES=mysql
+    depends_on:
+      mysql:
+        condition: service_healthy
   mysql:
-    image: mysql:8.0.30
+    image: mysql:8.0.13
     networks:
       - e2e
-    ports:
-      - 3306
+    volumes:
+      - ../mysql-slowsql/log:/var/lib/mysql
+      - ../mysql-slowsql/my.cnf:/etc/my.cnf
     environment:
-      - MYSQL_ROOT_PASSWORD=password
-      - MYSQL_DATABASE=test2
-
+      - "MYSQL_ROOT_PASSWORD=password"
+      - "MYSQL_DATABASE=swtest"
+    ports:
+      - 3306:3306
+    healthcheck:
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/3306"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
   mysql-load:
-    image: mysql:8.0.30
+    image: mysql:8.0.13
     depends_on:
-      - otel-collector
+      oap:
+        condition: service_healthy
     networks:
       - e2e
     entrypoint: bash
     volumes:
-     - ../prometheus-mysql-exporter:/docker
+     - ./mock.sql:/docker/mock.sql
     command:
       - -c
       - "mysql -h mysql -uroot -ppassword < /docker/mock.sql"
-
+  fluentbit:
+    image: fluent/fluent-bit:1.9
+    environment:
+      - TZ=Asia/Shanghai
+    volumes:
+      - ../mysql-slowsql/log:/tmp/skywalking-logs
+      - ./fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf
+      - ./fluent-bit-parser.conf:/fluent-bit/etc/fluent-bit-parser.conf
+      - ./fluent-bit-script.lua:/fluent-bit/etc/fluent-bit-script.lua
+    networks:
+      - e2e
+    depends_on:
+      - mysql
   mysql-service:
     image: prom/mysqld-exporter:v0.14.0
     ports:
@@ -58,20 +90,17 @@ services:
       - e2e
     depends_on:
       - mysql
-
-
   otel-collector:
     image: otel/opentelemetry-collector:0.50.0
     networks:
       - e2e
     command: [ "--config=/etc/otel-collector-config.yaml" ]
     volumes:
-      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
+      - 
../prometheus-mysql-exporter/otel-collector-config.yaml:/etc/otel-collector-config.yaml
     expose:
       - 55678
     depends_on:
       oap:
         condition: service_healthy
-
 networks:
-  e2e:
+  e2e:
\ No newline at end of file
diff --git a/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/e2e.yaml 
b/test/e2e-v2/cases/mysql/mysql-slowsql/e2e.yaml
similarity index 96%
rename from test/e2e-v2/cases/mysql/prometheus-mysql-exporter/e2e.yaml
rename to test/e2e-v2/cases/mysql/mysql-slowsql/e2e.yaml
index 87b91a1c02..6cc194ae42 100644
--- a/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/e2e.yaml
+++ b/test/e2e-v2/cases/mysql/mysql-slowsql/e2e.yaml
@@ -17,7 +17,7 @@
 
 setup:
   env: compose
-  file: docker-compose.yml
+  file: docker-compose.yaml
   timeout: 20m
   init-system-environment: ../../../script/env
   steps:
@@ -30,7 +30,7 @@ setup:
 
 verify:
   retry:
-    count: 20
+    count: 60
     interval: 3s
   cases:
     - includes:
diff --git a/test/e2e-v2/cases/mysql/expected/service.yml 
b/test/e2e-v2/cases/mysql/mysql-slowsql/fluent-bit-parser.conf
similarity index 68%
copy from test/e2e-v2/cases/mysql/expected/service.yml
copy to test/e2e-v2/cases/mysql/mysql-slowsql/fluent-bit-parser.conf
index f499c9d32e..205793d9ed 100644
--- a/test/e2e-v2/cases/mysql/expected/service.yml
+++ b/test/e2e-v2/cases/mysql/mysql-slowsql/fluent-bit-parser.conf
@@ -13,12 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-{{- contains . }}
-- id: {{ b64enc "mysql::showcase" }}.1
-  name: mysql::showcase
-  shortname: showcase
-  group: mysql
-  normal: true
-  layers:
-    - MYSQL
-{{- end }}
\ No newline at end of file
+[MULTILINE_PARSER]
+    name          my-log-format
+    type          regex
+    flush_timeout 1000
+    rule      "start_state"   "^(# Time: .*)"  "second_state"
+    rule      "second_state"  "^(# User@Host.*)"  "third_state"
+    rule      "third_state"   "^(# Query_time: .*)"    "statement"
+    rule      "statement"     "^\w+.*"             "statement"
\ No newline at end of file
diff --git a/test/e2e-v2/cases/mysql/mysql-slowsql/fluent-bit-script.lua 
b/test/e2e-v2/cases/mysql/mysql-slowsql/fluent-bit-script.lua
new file mode 100644
index 0000000000..c01d3ad5ae
--- /dev/null
+++ b/test/e2e-v2/cases/mysql/mysql-slowsql/fluent-bit-script.lua
@@ -0,0 +1,116 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+function rewrite_body(tag, timestamp, record)
+    log = record["log"]
+    record["log"] = nil
+    record["date"] = nil
+    record["tags"] = {data={{key="LOG_KIND", value="SLOW_SQL"}}}
+    arr = split(log,"\n")
+    re1 = {}
+    
+    time = string.sub(arr[1], 9)
+    time = string.sub(time,1,19)
+    time = string.gsub(time,"-","");
+    time = string.gsub(time,"T","");
+    time = string.gsub(time,":","");
+    y1 = string.sub(time,1,4)
+    m1 = string.sub(time,5,6)
+    d1 = string.sub(time,7,8)
+    h1 = string.sub(time,9,10)
+    min1 = string.sub(time,11,12)
+    s1 = string.sub(time,13,14)
+    re1["time"] = os.time()
+
+    re1["layer"] = "MYSQL"
+    record["layer"] = "MYSQL"
+    id1,_ = string.find(arr[2],"Id:")
+    service = string.sub(arr[2],14,id1-1)
+    service = string.gsub(service," ","");
+    service = string.sub(service,1,10)
+    service = "mysql::"..service
+    record["service"]=service
+    re1["service"]= service
+
+    f1,_ = string.find(arr[3],"Lock")
+    query_time = string.sub(arr[3],15,f1-3)
+    local qt,_ = math.modf(query_time*1000)
+    re1["query_time"] = qt
+    re1["statement"] = ""
+
+    re1["id"] = uuid()
+
+    for i=4,#arr,1 do
+        re1["statement"] = re1["statement"]..arr[i]
+    end
+    jsonstr = table2json(re1)
+    record["body"]={json={}}
+    record["body"]["json"]["json"] = jsonstr
+    return 1, timestamp, record
+end
+function split(input, delimiter)
+    input = tostring(input)
+    delimiter = tostring(delimiter)
+    if (delimiter == "") then return false end
+    local pos, arr = 0, {}
+    for st, sp in function() return string.find(input, delimiter, pos, true) 
end do
+        table.insert(arr, string.sub(input, pos, st - 1))
+        pos = sp + 1
+    end
+    table.insert(arr, string.sub(input, pos))
+    return arr
+end
+
+function uuid()
+    local 
seed={'e','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'}
+    local tb={}
+    for i=1,32 do
+        table.insert(tb,seed[math.random(1,16)])
+    end
+    local sid=table.concat(tb)
+    return string.format('%s-%s-%s-%s-%s',
+        string.sub(sid,1,8),
+        string.sub(sid,9,12),
+        string.sub(sid,13,16),
+        string.sub(sid,17,20),
+        string.sub(sid,21,32)
+    )
+end
+
+function table2json(t)
+  local function serialize(tbl)
+    local tmp = {}
+    for k, v in pairs(tbl) do
+      local k_type = type(k)
+      local v_type = type(v)
+      local key = (k_type == "string" and '"' .. k .. '":') or (k_type == 
"number" and "")
+      local value =
+        (v_type == "table" and serialize(v)) or (v_type == "boolean" and 
tostring(v)) or
+        (v_type == "string" and '"' .. v .. '"') or
+        (v_type == "number" and v)
+      tmp[#tmp + 1] = key and value and tostring(key) .. tostring(value) or nil
+    end
+    if table.maxn(tbl) == 0 then
+      return "{" .. table.concat(tmp, ",") .. "}"
+    else
+      return "[" .. table.concat(tmp, ",") .. "]"
+    end
+  end
+  assert(type(t) == "table")
+  return serialize(t)
+end
+
diff --git a/test/e2e-v2/cases/mysql/mysql-slowsql/fluent-bit.conf 
b/test/e2e-v2/cases/mysql/mysql-slowsql/fluent-bit.conf
new file mode 100644
index 0000000000..547d4263e0
--- /dev/null
+++ b/test/e2e-v2/cases/mysql/mysql-slowsql/fluent-bit.conf
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+[SERVICE]
+    flush          1
+    log_level      info
+    parsers_File   fluent-bit-parser.conf
+[INPUT]
+    name             tail
+    path             /tmp/skywalking-logs/slow.log
+    read_from_head   true
+    multiline.parser my-log-format
+[FILTER]
+    name   grep
+    match  *
+    exclude  log mysqld, Version:.*
+[FILTER]
+    name   grep
+    match  *
+    exclude  log Tcp port: 0.*
+[FILTER]
+    name   grep
+    match  *
+    exclude  log Time      .*
+[FILTER]
+    name           lua
+    match          *
+    script         fluent-bit-script.lua
+    call           rewrite_body
+[OUTPUT]
+    name            stdout
+    match           *
+    format          json
+[OUTPUT]
+    name            http
+    match           *
+    host            oap
+    port            12800
+    uri             /v3/logs
+    format          json
diff --git a/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/mock.sql 
b/test/e2e-v2/cases/mysql/mysql-slowsql/mock.sql
similarity index 95%
rename from test/e2e-v2/cases/mysql/prometheus-mysql-exporter/mock.sql
rename to test/e2e-v2/cases/mysql/mysql-slowsql/mock.sql
index a3d60701ed..ec4c385640 100644
--- a/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/mock.sql
+++ b/test/e2e-v2/cases/mysql/mysql-slowsql/mock.sql
@@ -35,3 +35,8 @@ DO DELETE FROM t1;
 CREATE EVENT `event_4` 
 ON SCHEDULE EVERY 1 SECOND 
 DO COMMIT;
+
+use mysql;
+select sleep(3);
+select sleep(4);
+select sleep(5);
\ No newline at end of file
diff --git a/test/e2e-v2/cases/mysql/expected/service.yml 
b/test/e2e-v2/cases/mysql/mysql-slowsql/my.cnf
similarity index 78%
copy from test/e2e-v2/cases/mysql/expected/service.yml
copy to test/e2e-v2/cases/mysql/mysql-slowsql/my.cnf
index f499c9d32e..cf18eed18b 100644
--- a/test/e2e-v2/cases/mysql/expected/service.yml
+++ b/test/e2e-v2/cases/mysql/mysql-slowsql/my.cnf
@@ -13,12 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-{{- contains . }}
-- id: {{ b64enc "mysql::showcase" }}.1
-  name: mysql::showcase
-  shortname: showcase
-  group: mysql
-  normal: true
-  layers:
-    - MYSQL
-{{- end }}
\ No newline at end of file
+[mysqld]
+init_connect='SET NAMES utf8'
+slow_query_log=ON
+event_scheduler=ON
+long_query_time=1
+slow_query_log_file=/var/lib/mysql/slow.log
+[client]
+default-character-set=utf8
+[mysql]
+default-character-set=utf8
\ No newline at end of file
diff --git 
a/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/otel-collector-config.yaml 
b/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/otel-collector-config.yaml
index 77844c95e4..c0b2f270be 100644
--- 
a/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/otel-collector-config.yaml
+++ 
b/test/e2e-v2/cases/mysql/prometheus-mysql-exporter/otel-collector-config.yaml
@@ -22,7 +22,7 @@ receivers:
          static_configs:
            - targets: ['mysql-service:9104']
              labels:
-               host_name: showcase
+               host_name: root[root]
 processors:
   batch:
 
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 94bf5980ee..79303a1af6 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -25,4 +25,4 @@ 
SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
 SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
 SW_BANYANDB_COMMIT=5a326d7e36a008c5ea10e3ae506309cb29733c53
 
-SW_CTL_COMMIT=9f750c8ffe4d4dcea63e811dc1f5f857756a443c
+SW_CTL_COMMIT=a7c4ccae106e4a5a0b6296f860396f289444058d

Reply via email to