This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new af3eabdd1c [mqtt] refact the MQTT based on the Paho SDK and support 
both unidirectional and bidirectional MQTT over TLS connections. (#3474)
af3eabdd1c is described below

commit af3eabdd1c6ecfa9d474a67492815713333643e4
Author: yy549159265 <40821310+yy549159...@users.noreply.github.com>
AuthorDate: Fri Jul 11 13:36:10 2025 +0800

    [mqtt] refact the MQTT based on the Paho SDK and support both 
unidirectional and bidirectional MQTT over TLS connections. (#3474)
    
    Signed-off-by: yy549159265 <40821310+yy549159...@users.noreply.github.com>
    Co-authored-by: aias00 <liuhon...@apache.org>
    Co-authored-by: Logic <zqr10...@dromara.org>
    Co-authored-by: Calvin <zhengqi...@apache.org>
    Co-authored-by: tomsun28 <tomsu...@outlook.com>
---
 .github/workflows/backend-build-test.yml           |   9 +
 .../hertzbeat-collector-basic/pom.xml              |  15 +-
 .../collect/mqtt/CertificateFormatter.java         | 195 ++++++++++++
 .../collector/collect/mqtt/MqttCollectImpl.java    | 338 ++++++++++++---------
 .../collector/collect/mqtt/MqttSslFactory.java     | 186 ++++++++++++
 .../collector/collect/mqtt/MqttCollectTest.java    | 137 ++++-----
 .../common/entity/job/protocol/MqttProtocol.java   |  77 +++--
 .../src/main/resources/define/app-mqtt.yml         | 180 ++++++-----
 8 files changed, 815 insertions(+), 322 deletions(-)

diff --git a/.github/workflows/backend-build-test.yml 
b/.github/workflows/backend-build-test.yml
index daf5678c92..1c8976375f 100644
--- a/.github/workflows/backend-build-test.yml
+++ b/.github/workflows/backend-build-test.yml
@@ -48,6 +48,15 @@ jobs:
       - name: Build with Maven
         run: mvnd clean -B package -Prelease -Dmaven.test.skip=false --file 
pom.xml
 
+      - name: Upload test reports
+        if: failure() 
+        uses: actions/upload-artifact@v4
+        with:
+          name: test-reports-${{ github.run_id }}
+          path: |
+            **/target/surefire-reports
+            **/target/failsafe-reports
+
       - name: Upload coverage reports to Codecov
         uses: codecov/codecov-action@v4.0.1
         with:
diff --git a/hertzbeat-collector/hertzbeat-collector-basic/pom.xml 
b/hertzbeat-collector/hertzbeat-collector-basic/pom.xml
index df5ea8044a..e418cde5aa 100644
--- a/hertzbeat-collector/hertzbeat-collector-basic/pom.xml
+++ b/hertzbeat-collector/hertzbeat-collector-basic/pom.xml
@@ -33,7 +33,7 @@
         <maven.compiler.source>17</maven.compiler.source>
         <maven.compiler.target>17</maven.compiler.target>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <mqtt.version>1.3.3</mqtt.version>
+        <mqtt.version>1.2.5</mqtt.version>
     </properties>
 
     <dependencies>
@@ -140,10 +140,19 @@
         </dependency>
         <!-- mqtt -->
         <dependency>
-            <groupId>com.hivemq</groupId>
-            <artifactId>hivemq-mqtt-client</artifactId>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
             <version>${mqtt.version}</version>
         </dependency>
+        <!--Bouncy Castle-->
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcpkix-jdk15on</artifactId>
+            <version>1.68</version>
+        </dependency>
+
+
+
         <!--plc-->
         <dependency>
             <groupId>org.apache.plc4x</groupId>
diff --git 
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/CertificateFormatter.java
 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/CertificateFormatter.java
new file mode 100644
index 0000000000..312d35d94e
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/CertificateFormatter.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.mqtt;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Formats the private key and certificate, supporting concatenation of 
multiple certificates in PEM format.
+ */
+public class CertificateFormatter {
+
+    public static String formatCertificateChain(String input) {
+        if (input == null || input.trim().isEmpty()) {
+            return input;
+        }
+
+        String normalized = normalizeInput(input);
+
+        List<String> certificates = extractCertificates(normalized);
+
+        if (certificates.isEmpty()) {
+            return formatAsSingleCertificate(normalized);
+        }
+
+        StringBuilder formattedChain = new StringBuilder();
+        for (String cert : certificates) {
+            if (cert.trim().isEmpty()) continue;
+
+            String formatted = formatPemBlock(cert);
+            formattedChain.append(formatted).append("\n");
+        }
+
+        return formattedChain.toString().trim();
+    }
+
+    private static String normalizeInput(String input) {
+        return input
+                .replace("\r\n", "\n")
+                .replace("\r", "\n")
+                .replaceAll("\\s*\\\\n\\s*", "\n")
+                .replaceAll("(?m)^\\s+|\\s+$", "")
+                .trim();
+    }
+
+    private static List<String> extractCertificates(String input) {
+        List<String> certificates = new ArrayList<>();
+        String regex = 
"(-----BEGIN\\s+[\\w\\s]+?-----)[\\s\\S]*?(-----END\\s+[\\w\\s]+?-----)";
+
+
+        Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(input);
+
+        int lastEnd = 0;
+        while (matcher.find()) {
+
+            if (matcher.start() > lastEnd) {
+                String gap = input.substring(lastEnd, matcher.start());
+                if (!gap.trim().isEmpty()) {
+                    certificates.add(gap);
+                }
+            }
+
+            certificates.add(matcher.group());
+            lastEnd = matcher.end();
+        }
+
+
+        if (lastEnd < input.length()) {
+            certificates.add(input.substring(lastEnd));
+        }
+
+        return certificates;
+    }
+
+    private static String formatPemBlock(String block) {
+        try {
+            Pattern pattern = Pattern.compile(
+                    
"(-----BEGIN\\s+[\\w\\s]+?-----)(.*?)(-----END\\s+[\\w\\s]+?-----)",
+                    Pattern.DOTALL | Pattern.CASE_INSENSITIVE
+            );
+
+            Matcher matcher = pattern.matcher(block);
+            if (matcher.find()) {
+                String header = matcher.group(1).trim();
+                String body = matcher.group(2);
+                String footer = matcher.group(3).trim();
+
+
+                if (body == null) body = "";
+
+                String cleanBody = body
+                        .replaceAll("\\s", "")
+                        .replaceAll("\"", "")
+                        .trim();
+
+
+                if (cleanBody.isEmpty() && body != null && 
!body.trim().isEmpty()) {
+
+                    cleanBody = body.replaceAll("[^a-zA-Z0-9+/=]", "").trim();
+                }
+
+                String formattedBody = formatBase64Body(cleanBody);
+
+                return header + "\n" + formattedBody + "\n" + footer;
+            } else {
+
+                return formatAsCertificate(block);
+            }
+        } catch (Exception e) {
+
+            return block;
+        }
+    }
+
+    private static String formatAsCertificate(String content) {
+
+        String cleanContent = content.replaceAll("[^a-zA-Z0-9+/=]", "").trim();
+
+        if (cleanContent.isEmpty()) {
+            return content;
+        }
+
+
+        String formattedBody = formatBase64Body(cleanContent);
+
+
+        if (cleanContent.toLowerCase().contains("private")) {
+            if (cleanContent.startsWith("MII") || cleanContent.length() > 
1000) {
+                return "-----BEGIN PRIVATE KEY-----\n" + formattedBody + 
"\n-----END PRIVATE KEY-----";
+            } else {
+                return "-----BEGIN RSA PRIVATE KEY-----\n" + formattedBody + 
"\n-----END RSA PRIVATE KEY-----";
+            }
+        } else {
+            return "-----BEGIN CERTIFICATE-----\n" + formattedBody + 
"\n-----END CERTIFICATE-----";
+        }
+    }
+
+    private static String formatAsSingleCertificate(String input) {
+        String cleanContent = input.replaceAll("[^a-zA-Z0-9+/=]", "").trim();
+        return formatAsCertificate(cleanContent);
+    }
+
+    private static String formatBase64Body(String body) {
+
+        StringBuilder formatted = new StringBuilder();
+        int index = 0;
+        while (index < body.length()) {
+            int end = Math.min(index + 64, body.length());
+            formatted.append(body.substring(index, end));
+            if (end < body.length()) {
+                formatted.append("\n");
+            }
+            index = end;
+        }
+        return formatted.toString().trim();
+    }
+
+    public static String formatPrivateKey(String input) {
+        if (input == null || input.trim().isEmpty()) {
+            return input;
+        }
+
+
+        String normalized = normalizeInput(input);
+
+
+        if (isPemEncapsulated(normalized)) {
+            return formatPemBlock(normalized);
+        }
+
+        return formatAsCertificate(normalized);
+    }
+
+    private static boolean isPemEncapsulated(String block) {
+        return block.contains("-----BEGIN") && block.contains("-----END");
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java
 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java
index 8bf96cc019..befd864209 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java
@@ -17,26 +17,7 @@
 
 package org.apache.hertzbeat.collector.collect.mqtt;
 
-import com.hivemq.client.mqtt.MqttVersion;
-import com.hivemq.client.mqtt.datatypes.MqttQos;
-import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
-import com.hivemq.client.mqtt.mqtt3.Mqtt3Client;
-import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder;
-import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck;
-import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
-import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
-import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
-import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hertzbeat.collector.collect.AbstractCollect;
 import org.apache.hertzbeat.collector.constants.CollectorConstants;
@@ -46,13 +27,27 @@ import org.apache.hertzbeat.common.entity.job.Metrics;
 import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import 
org.apache.hertzbeat.common.entity.message.CollectRep.MetricsData.Builder;
+import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.IMqttToken;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 import org.springframework.util.StopWatch;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 /**
- * collect mqtt metrics
+ * collect mqtt metrics using Eclipse Paho
  */
 public class MqttCollectImpl extends AbstractCollect {
 
@@ -61,138 +56,224 @@ public class MqttCollectImpl extends AbstractCollect {
 
     private static final Logger logger = 
LoggerFactory.getLogger(MqttCollectImpl.class);
 
+    @Override
+    public String supportProtocol() {
+        return DispatchConstants.PROTOCOL_MQTT;
+    }
+
     @Override
     public void preCheck(Metrics metrics) throws IllegalArgumentException {
         MqttProtocol mqttProtocol = metrics.getMqtt();
         Assert.hasText(mqttProtocol.getHost(), "MQTT protocol host is 
required");
         Assert.hasText(mqttProtocol.getPort(), "MQTT protocol port is 
required");
-        Assert.hasText(mqttProtocol.getProtocolVersion(), "MQTT protocol 
version is required");
+
+        if ("mqtts".equalsIgnoreCase(mqttProtocol.getProtocol())) {
+            if (Boolean.parseBoolean(mqttProtocol.getEnableMutualAuth())) {
+                Assert.hasText(mqttProtocol.getCaCert(), "CA certificate is 
required for mutual auth");
+                Assert.hasText(mqttProtocol.getClientCert(), "Client 
certificate is required for mutual auth");
+                Assert.hasText(mqttProtocol.getClientKey(), "Client private 
key is required for mutual auth");
+            }
+        }
     }
 
     @Override
     public void collect(Builder builder, Metrics metrics) {
-        MqttProtocol mqtt = metrics.getMqtt();
-        String protocolVersion = mqtt.getProtocolVersion();
-        MqttVersion mqttVersion = MqttVersion.valueOf(protocolVersion);
-        if (mqttVersion == MqttVersion.MQTT_3_1_1) {
-            collectWithVersion3(metrics, builder);
-        } else if (mqttVersion == MqttVersion.MQTT_5_0) {
-            collectWithVersion5(metrics, builder);
+        MqttProtocol mqttProtocol = metrics.getMqtt();
+        Map<Object, String> data = new HashMap<>();
+
+        try {
+            MqttAsyncClient client = buildMqttClient(mqttProtocol);
+            long responseTime = connectClient(client, mqttProtocol);
+            testSubscribeAndPublish(client, mqttProtocol, data);
+            convertToMetricsData(builder, metrics, responseTime, data);
+            client.disconnect();
+        } catch (Exception e) {
+            logger.error("MQTT collection error: {}", e.getMessage(), e);
+            builder.setCode(CollectRep.Code.FAIL);
+            builder.setMsg("Collection failed: " + e.getMessage());
         }
     }
 
-    @Override
-    public String supportProtocol() {
-        return DispatchConstants.PROTOCOL_MQTT;
+    private MqttAsyncClient buildMqttClient(MqttProtocol protocol) throws 
Exception {
+        String clientId = protocol.getClientId();
+
+        String serverUri = String.format("%s://%s:%s",
+                StringUtils.equals(protocol.getProtocol(), "MQTT") ? "tcp" : 
"ssl",
+                protocol.getHost(),
+                protocol.getPort());
+
+        MqttClientPersistence persistence = new MemoryPersistence();
+
+        return new MqttAsyncClient(serverUri, clientId, persistence);
     }
 
-    /**
-     * collecting data of MQTT 5
-     */
-    private void collectWithVersion5(Metrics metrics, Builder builder) {
-        MqttProtocol mqttProtocol = metrics.getMqtt();
-        Map<Object, String> data = new HashMap<>();
-        Mqtt5AsyncClient client = buildMqtt5Client(mqttProtocol);
-        long responseTime = connectClient(client, mqtt5AsyncClient -> {
-            CompletableFuture<Mqtt5ConnAck> connectFuture = 
mqtt5AsyncClient.connect();
-            try {
-                connectFuture.get(Long.parseLong(mqttProtocol.getTimeout()), 
TimeUnit.MILLISECONDS);
-            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                builder.setCode(CollectRep.Code.FAIL);
-                builder.setMsg(getErrorMessage(e.getMessage()));
+    private long connectClient(MqttAsyncClient client, MqttProtocol protocol) 
throws Exception {
+        MqttConnectOptions connOpts = new MqttConnectOptions();
+
+        if (protocol.hasAuth()) {
+            connOpts.setUserName(protocol.getUsername());
+            connOpts.setPassword(protocol.getPassword().toCharArray());
+        }
+
+        
connOpts.setKeepAliveInterval(Integer.parseInt(protocol.getKeepalive()));
+        connOpts.setConnectionTimeout(Integer.parseInt(protocol.getTimeout()) 
/ 1000);
+        connOpts.setCleanSession(true);
+        connOpts.setAutomaticReconnect(false);
+        if ("mqtts".equalsIgnoreCase(protocol.getProtocol())) {
+            boolean insecureSkipVerify = 
Boolean.parseBoolean(protocol.getInsecureSkipVerify());
+            if (insecureSkipVerify) {
+                connOpts.setHttpsHostnameVerificationEnabled(false);
             }
-        });
-        testDescribeAndPublish5(client, mqttProtocol, data);
-        convertToMetricsData(builder, metrics, responseTime, data);
-        client.disconnect();
+            if (Boolean.parseBoolean(protocol.getEnableMutualAuth())) {
+                
connOpts.setSocketFactory(MqttSslFactory.getMslSocketFactory(protocol, 
insecureSkipVerify));
+            } else {
+                
connOpts.setSocketFactory(MqttSslFactory.getSslSocketFactory(protocol, 
insecureSkipVerify));
+            }
+        }
+
+
+        StopWatch connectWatch = new StopWatch();
+        connectWatch.start();
+
+        
client.connect(connOpts).waitForCompletion(Long.parseLong(protocol.getTimeout()));
+        connectWatch.stop();
+        return connectWatch.getTotalTimeMillis();
     }
 
+
     /**
-     * collecting data of MQTT 3.1.1
+     * Test MQTT subscribe and publish capabilities
      */
-    private void collectWithVersion3(Metrics metrics, Builder builder) {
-        MqttProtocol mqttProtocol = metrics.getMqtt();
-        Map<Object, String> data = new HashMap<>();
-        Mqtt3AsyncClient client = buildMqtt3Client(mqttProtocol);
-        long responseTime = connectClient(client, mqtt3AsyncClient -> {
-            CompletableFuture<Mqtt3ConnAck> connectFuture = 
mqtt3AsyncClient.connect();
-            try {
-                connectFuture.get(Long.parseLong(mqttProtocol.getTimeout()), 
TimeUnit.MILLISECONDS);
-            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                builder.setCode(CollectRep.Code.FAIL);
-                builder.setMsg(getErrorMessage(e.getMessage()));
+    private void testSubscribeAndPublish(MqttAsyncClient client, MqttProtocol 
protocol, Map<Object, String> data) {
+
+        // 1 test subscribe
+        if (StringUtils.isNotBlank(protocol.getTopic())) {
+            String subscribe = testSubscribe(client, protocol.getTopic());
+            if (StringUtils.isBlank(subscribe)) {
+                data.put("canSubscribe", "Subscription successful");
+            } else {
+                data.put("canSubscribe", String.format("Subscription failed: 
%s", subscribe));
             }
-        });
-        testDescribeAndPublish3(client, mqttProtocol, data);
-        convertToMetricsData(builder, metrics, responseTime, data);
-        client.disconnect();
-    }
 
-    private void testDescribeAndPublish3(Mqtt3AsyncClient client, MqttProtocol 
mqttProtocol, Map<Object, String> data) {
-        data.put("canDescribe", test(() -> {
-            
client.subscribeWith().topicFilter(mqttProtocol.getTopic()).qos(MqttQos.AT_LEAST_ONCE).send();
-            
client.unsubscribeWith().topicFilter(mqttProtocol.getTopic()).send();
-        }, "subscribe").toString());
-
-        data.put("canPublish", !mqttProtocol.testPublish() ? 
Boolean.FALSE.toString() : test(() -> {
-            client.publishWith().topic(mqttProtocol.getTopic())
-                
.payload(mqttProtocol.getTestMessage().getBytes(StandardCharsets.UTF_8))
-                .qos(MqttQos.AT_LEAST_ONCE).send();
-            data.put("canPublish", Boolean.TRUE.toString());
-        }, "publish").toString());
-    }
+        } else {
+            data.put("canSubscribe", "No topic, subscription test skipped");
+        }
+
+
+        // 2 test publish
+        if (StringUtils.isNotBlank(protocol.getTestMessage())) {
+            String publish = testPublish(client, protocol.getTopic(), 
protocol.getTestMessage());
+            if (StringUtils.isBlank(publish)) {
+                data.put("canPublish", "Message published successfully");
+
+                // 3 test receive message
+                String receivedData = getReceivedData(client, 
protocol.getTopic());
+                data.put("canReceive", receivedData);
+            } else {
+                data.put("canPublish", String.format("Message publishing 
failed: %s", publish));
+                data.put("canReceive", "Message reception skipped due to 
failed publish");
+            }
+        } else {
+            data.put("canPublish", "No test message, publish test skipped");
+            data.put("canReceive", "No test message, receive test skipped");
+        }
+
 
-    private void testDescribeAndPublish5(Mqtt5AsyncClient client, MqttProtocol 
mqttProtocol, Map<Object, String> data) {
-        data.put("canDescribe", test(() -> {
-            
client.subscribeWith().topicFilter(mqttProtocol.getTopic()).qos(MqttQos.AT_LEAST_ONCE).send();
-            
client.unsubscribeWith().topicFilter(mqttProtocol.getTopic()).send();
-        }, "subscribe").toString());
-
-        data.put("canPublish", !mqttProtocol.testPublish() ? 
Boolean.FALSE.toString() : test(() -> {
-            client.publishWith().topic(mqttProtocol.getTopic())
-                
.payload(mqttProtocol.getTestMessage().getBytes(StandardCharsets.UTF_8))
-                .qos(MqttQos.AT_LEAST_ONCE).send();
-            data.put("canPublish", Boolean.TRUE.toString());
-        }, "publish").toString());
+        // 4 test unsubscribe
+        if (StringUtils.isNotBlank(protocol.getTopic())) {
+            String subscribe = testUnSubscribe(client, protocol.getTopic());
+            if (StringUtils.isBlank(subscribe)) {
+                data.put("canUnSubscribe", "Unsubscription successful");
+            } else {
+                data.put("canUnSubscribe", String.format("Unsubscription 
failed: %s", subscribe));
+            }
+        } else {
+            data.put("canUnSubscribe", "No topic, unsubscription test 
skipped");
+        }
     }
 
-    private Mqtt5AsyncClient buildMqtt5Client(MqttProtocol mqttProtocol) {
-        Mqtt5ClientBuilder mqtt5ClientBuilder = Mqtt5Client.builder()
-            .serverHost(mqttProtocol.getHost())
-            .identifier(mqttProtocol.getClientId())
-            .serverPort(Integer.parseInt(mqttProtocol.getPort()));
+    private String getReceivedData(MqttAsyncClient client, String topic) {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final StringBuilder messageHolder = new StringBuilder();
+
+
+        client.setCallback(new MqttCallback() {
+            @Override
+            public void connectionLost(Throwable cause) {
+                latch.countDown();
+            }
 
-        if (mqttProtocol.hasAuth()) {
-            
mqtt5ClientBuilder.simpleAuth().username(mqttProtocol.getUsername())
-                
.password(mqttProtocol.getPassword().getBytes(StandardCharsets.UTF_8))
-                .applySimpleAuth();
+            @Override
+            public void messageArrived(String arrivedTopic, MqttMessage 
message) {
+
+                if (topic.equals(arrivedTopic)) {
+                    messageHolder.append(new String(message.getPayload()));
+                    latch.countDown();
+                }
+            }
+
+            @Override
+            public void deliveryComplete(IMqttDeliveryToken token) {
+            }
+        });
+
+        try {
+            boolean received = latch.await(5, TimeUnit.SECONDS);
+            if (messageHolder.length() > 0) {
+                return messageHolder.toString();
+            } else if (!received) {
+                return "Message reception timed out after 5 seconds";
+            } else {
+                return "No valid message received";
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return e.getMessage();
+        } finally {
+            client.setCallback(null);
         }
-        return mqtt5ClientBuilder.buildAsync();
     }
 
-    private Mqtt3AsyncClient buildMqtt3Client(MqttProtocol mqttProtocol) {
+    private String testSubscribe(MqttAsyncClient client, String topic) {
+        try {
+            IMqttToken subToken = client.subscribe(topic, 1);
+            subToken.waitForCompletion(5000);
+            return "";
+        } catch (MqttException e) {
+            logger.warn("MQTT subscribe test failed: {}", e.getMessage());
+            return e.getMessage();
+        }
+    }
+
+    private String testPublish(MqttAsyncClient client, String topic, String 
message) {
+        try {
+            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
+            mqttMessage.setQos(1);
 
-        Mqtt3ClientBuilder mqtt3ClientBuilder = Mqtt3Client.builder()
-            .serverHost(mqttProtocol.getHost())
-            .identifier(mqttProtocol.getClientId())
-            .serverPort(Integer.parseInt(mqttProtocol.getPort()));
+            IMqttToken pubToken = client.publish(topic, mqttMessage);
+            pubToken.waitForCompletion(5000);
 
-        if (mqttProtocol.hasAuth()) {
-            
mqtt3ClientBuilder.simpleAuth().username(mqttProtocol.getUsername())
-                
.password(mqttProtocol.getPassword().getBytes(StandardCharsets.UTF_8))
-                .applySimpleAuth();
+            return "";
+        } catch (MqttException e) {
+            logger.warn("MQTT publish test failed: {}", e.getMessage());
+            return e.getMessage();
         }
-        return mqtt3ClientBuilder.buildAsync();
     }
 
-    public <T> long connectClient(T client, Consumer<T> connect) {
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        connect.accept(client);
-        stopWatch.stop();
-        return stopWatch.getTotalTimeMillis();
+    private String testUnSubscribe(MqttAsyncClient client, String topic) {
+        try {
+            IMqttToken unsubToken = client.unsubscribe(topic);
+            unsubToken.waitForCompletion(5000);
+            return "";
+        } catch (MqttException e) {
+            logger.warn("MQTT unsubscribe test failed: {}", e.getMessage());
+            return e.getMessage();
+        }
     }
 
+    /**
+     * Convert collected data to MetricsData
+     */
     private void convertToMetricsData(Builder builder, Metrics metrics, long 
responseTime, Map<Object, String> data) {
         CollectRep.ValueRow.Builder valueRowBuilder = 
CollectRep.ValueRow.newBuilder();
         for (String column : metrics.getAliasFields()) {
@@ -207,25 +288,4 @@ public class MqttCollectImpl extends AbstractCollect {
         builder.addValueRow(valueRowBuilder.build());
     }
 
-    private Boolean test(Runnable runnable, String operationName) {
-        try {
-            runnable.run();
-            return true;
-        } catch (Exception e) {
-            logger.error("{} fail", operationName, e);
-        }
-        return false;
-    }
-
-    private String getErrorMessage(String errorMessage) {
-        if (StringUtils.isBlank(errorMessage)) {
-            return "connect failed";
-        }
-        String[] split = errorMessage.split(":");
-        if (split.length > 1) {
-            return 
Arrays.stream(split).skip(1).collect(Collectors.joining(":"));
-        }
-        return errorMessage;
-    }
-
 }
diff --git 
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttSslFactory.java
 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttSslFactory.java
new file mode 100644
index 0000000000..3c78bf8fc3
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttSslFactory.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.mqtt;
+
+import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol;
+import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.openssl.PEMKeyPair;
+import org.bouncycastle.openssl.PEMParser;
+import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
+
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.StringReader;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.Security;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.Collection;
+
+/**
+ * Support MQTT SSL Factory
+ */
+public class MqttSslFactory {
+
+    /**
+     * Get MSL Socket Factory
+     */
+    public static SSLSocketFactory getMslSocketFactory(MqttProtocol 
mqttProtocol, boolean insecureSkipVerify) {
+        try {
+            Security.addProvider(new BouncyCastleProvider());
+
+            KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+            ks.load(null, null);
+
+            Certificate[] chain = null;
+            if (mqttProtocol.getClientCert() != null && 
!mqttProtocol.getClientCert().isEmpty()) {
+                String formatClientCert = 
CertificateFormatter.formatCertificateChain(mqttProtocol.getClientCert());
+                try (InputStream certIn = new 
ByteArrayInputStream(formatClientCert.getBytes())) {
+                    CertificateFactory cf = 
CertificateFactory.getInstance("X.509");
+                    Collection<? extends Certificate> certs = 
cf.generateCertificates(certIn);
+                    chain = certs.toArray(new Certificate[0]);
+                }
+            }
+
+            PrivateKey privateKey;
+            if (mqttProtocol.getClientKey() != null && 
!mqttProtocol.getClientKey().isEmpty()) {
+                String formatClientKey = 
CertificateFormatter.formatPrivateKey(mqttProtocol.getClientKey());
+                try (PEMParser pemParser = new PEMParser(new 
StringReader(formatClientKey))) {
+                    JcaPEMKeyConverter converter = new 
JcaPEMKeyConverter().setProvider("BC");
+                    Object object = pemParser.readObject();
+
+                    if (object instanceof PEMKeyPair) {
+                        privateKey = converter.getPrivateKey(((PEMKeyPair) 
object).getPrivateKeyInfo());
+                    } else if (object instanceof PrivateKeyInfo) {
+                        privateKey = converter.getPrivateKey((PrivateKeyInfo) 
object);
+                    } else {
+                        throw new IllegalArgumentException("Unsupported 
private key type");
+                    }
+
+                    ks.setKeyEntry("private-key", privateKey, 
"".toCharArray(), chain);
+                }
+            }
+
+            TrustManager[] trustManagers;
+            if (insecureSkipVerify) {
+                trustManagers = createInsecureTrustManager();
+            } else {
+                String formatCaCert = 
CertificateFormatter.formatCertificateChain(mqttProtocol.getCaCert());
+                KeyStore trustStore = createMergedTrustStore(formatCaCert);
+                TrustManagerFactory tmf = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+                tmf.init(trustStore);
+                trustManagers = tmf.getTrustManagers();
+            }
+
+            KeyManagerFactory kmf = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+            kmf.init(ks, "".toCharArray());
+
+
+            SSLContext context = 
SSLContext.getInstance(mqttProtocol.getTlsVersion());
+            context.init(kmf.getKeyManagers(), trustManagers, null);
+
+            return context.getSocketFactory();
+        } catch (Exception e) {
+            throw new RuntimeException("Fails to SSL initialize: " + 
e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Get SSL Socket Factory
+     */
+    public static SSLSocketFactory getSslSocketFactory(MqttProtocol 
mqttProtocol, boolean insecureSkipVerify) {
+        try {
+            Security.addProvider(new BouncyCastleProvider());
+
+
+            TrustManager[] trustManagers;
+            if (insecureSkipVerify) {
+                trustManagers = createInsecureTrustManager();
+            } else {
+
+                String formatCaCert = 
CertificateFormatter.formatCertificateChain(mqttProtocol.getCaCert());
+                KeyStore trustStore = createMergedTrustStore(formatCaCert);
+                TrustManagerFactory tmf = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+                tmf.init(trustStore);
+                trustManagers = tmf.getTrustManagers();
+            }
+
+            SSLContext sslContext = 
SSLContext.getInstance(mqttProtocol.getTlsVersion());
+            sslContext.init(null, trustManagers, null);
+
+            return sslContext.getSocketFactory();
+        } catch (Exception e) {
+            throw new RuntimeException("Fails to SSL initialize: " + 
e.getMessage(), e);
+        }
+    }
+
+    private static TrustManager[] createInsecureTrustManager() {
+        return new TrustManager[]{
+                new X509TrustManager() {
+                    public void checkClientTrusted(X509Certificate[] chain, 
String authType) {
+                    }
+
+                    public void checkServerTrusted(X509Certificate[] chain, 
String authType) {
+                    }
+
+                    public X509Certificate[] getAcceptedIssuers() {
+                        return new X509Certificate[0];
+                    }
+                }
+        };
+    }
+
+    private static KeyStore createMergedTrustStore(String caCertPem) throws 
Exception {
+        KeyStore mergedKs = KeyStore.getInstance(KeyStore.getDefaultType());
+        mergedKs.load(null, null);
+
+
+        TrustManagerFactory systemTmf = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        systemTmf.init((KeyStore) null);
+        X509TrustManager systemTm = (X509TrustManager) 
systemTmf.getTrustManagers()[0];
+
+        int systemIndex = 1;
+        for (X509Certificate cert : systemTm.getAcceptedIssuers()) {
+            mergedKs.setCertificateEntry("system-ca-" + systemIndex++, cert);
+        }
+
+
+        if (caCertPem != null && !caCertPem.isEmpty()) {
+            try (InputStream caIn = new 
ByteArrayInputStream(caCertPem.getBytes())) {
+                CertificateFactory cf = 
CertificateFactory.getInstance("X.509");
+                Collection<? extends Certificate> customCerts = 
cf.generateCertificates(caIn);
+
+                int customIndex = 1;
+                for (Certificate cert : customCerts) {
+                    mergedKs.setCertificateEntry("custom-ca-" + customIndex++, 
cert);
+                }
+            }
+        }
+
+        return mergedKs;
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectTest.java
 
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectTest.java
index 9d48eca044..588cccc86d 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectTest.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectTest.java
@@ -17,108 +17,91 @@
 
 package org.apache.hertzbeat.collector.collect.mqtt;
 
-import com.hivemq.client.mqtt.MqttVersion;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
 import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
 import org.apache.hertzbeat.common.entity.job.Metrics;
 import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol;
-import org.apache.hertzbeat.common.entity.message.CollectRep;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.ArrayList;
-
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
 /**
  * Test case for {@link MqttCollectImpl}
  */
-public class MqttCollectTest {
+class MqttCollectTest {
     private MqttCollectImpl mqttCollect;
     private Metrics metrics;
-    private CollectRep.MetricsData.Builder builder;
+    private MqttProtocol.MqttProtocolBuilder mqttBuilder;
 
     @BeforeEach
-    public void setup() {
+    void setup() {
         mqttCollect = new MqttCollectImpl();
-        MqttProtocol mqtt = MqttProtocol.builder().build();
-        metrics = Metrics.builder()
-                .mqtt(mqtt)
-                .build();
-        builder = CollectRep.MetricsData.newBuilder();
+        metrics = new Metrics();
+
+        // Initialize base MQTT parameters for test cases
+        mqttBuilder = MqttProtocol.builder()
+                .host("example.com")
+                .port("1883")
+                .protocol("mqtt")
+                .timeout("5000")
+                .keepalive("60");
     }
 
-    @Test
-    void preCheck() {
-        // host is empty
-        assertThrows(IllegalArgumentException.class, () -> {
-            mqttCollect.preCheck(metrics);
-        });
-
-        // port is empty
-        assertThrows(IllegalArgumentException.class, () -> {
-            MqttProtocol mqtt = MqttProtocol.builder().build();
-            mqtt.setHost("example.com");
-            metrics.setMqtt(mqtt);
-            mqttCollect.preCheck(metrics);
-        });
-
-        // protocol version is empty
-        assertThrows(IllegalArgumentException.class, () -> {
-            MqttProtocol mqtt = MqttProtocol.builder().build();
-            mqtt.setHost("example.com");
-            mqtt.setPort("1883");
-            metrics.setMqtt(mqtt);
-            mqttCollect.preCheck(metrics);
-        });
+    // Region: preCheck validation tests
 
-        // everything is ok
-        assertDoesNotThrow(() -> {
-            MqttProtocol mqtt = MqttProtocol.builder().build();
-            mqtt.setHost("example.com");
-            mqtt.setPort("1883");
-            metrics.setMqtt(mqtt);
-            mqtt.setProtocolVersion("3.1.1");
-            mqttCollect.preCheck(metrics);
-        });
+    @Test
+    // Verify preCheck throws exception when host is missing
+    void preCheckShouldThrowWhenHostMissing() {
+        metrics.setMqtt(mqttBuilder.host("").build());
+        assertThrows(IllegalArgumentException.class, () -> 
mqttCollect.preCheck(metrics));
     }
 
     @Test
-    void supportProtocol() {
-        Assertions.assertEquals(DispatchConstants.PROTOCOL_MQTT, 
mqttCollect.supportProtocol());
+    // Verify preCheck throws exception when port is missing
+    void preCheckShouldThrowWhenPortMissing() {
+        metrics.setMqtt(mqttBuilder.port("").build());
+        assertThrows(IllegalArgumentException.class, () -> 
mqttCollect.preCheck(metrics));
     }
 
     @Test
-    void collect() {
-        // with version 3.1.1
-        assertDoesNotThrow(() -> {
-            MqttProtocol mqtt = MqttProtocol.builder().build();
-            mqtt.setHost("example.com");
-            mqtt.setPort("1883");
-            mqtt.setClientId("clientid");
-            mqtt.setTimeout("1");
-            mqtt.setProtocolVersion(MqttVersion.MQTT_3_1_1.name());
-
-            metrics.setMqtt(mqtt);
-            metrics.setAliasFields(new ArrayList<>());
-
-            mqttCollect.collect(builder, metrics);
-        });
+    // Verify preCheck throws exception when MQTTS mutual auth is enabled but 
CA cert is missing
+    void preCheckShouldThrowWhenMqttsMutualAuthMissingCerts() {
+        metrics.setMqtt(mqttBuilder
+                .protocol("mqtts")
+                .enableMutualAuth("true")
+                .caCert("")
+                .clientCert("client.crt")
+                .clientKey("client.key")
+                .build());
+        assertThrows(IllegalArgumentException.class, () -> 
mqttCollect.preCheck(metrics));
+    }
 
-        
-        assertDoesNotThrow(() -> {
-            MqttProtocol mqtt = MqttProtocol.builder().build();
-            mqtt.setHost("example.com");
-            mqtt.setPort("1883");
-            mqtt.setClientId("clientid");
-            mqtt.setTimeout("1");
-            mqtt.setProtocolVersion(MqttVersion.MQTT_5_0.name());
+    @Test
+    // Verify preCheck succeeds with valid standard MQTT parameters
+    void preCheckShouldSucceedWithValidMqttParams() {
+        metrics.setMqtt(mqttBuilder.build());
+        assertDoesNotThrow(() -> mqttCollect.preCheck(metrics));
+    }
 
-            metrics.setMqtt(mqtt);
-            metrics.setAliasFields(new ArrayList<>());
+    @Test
+    // Verify preCheck succeeds with valid MQTTS parameters including mutual 
authentication
+    void preCheckShouldSucceedWithValidMqttsMutualAuth() {
+        metrics.setMqtt(mqttBuilder
+                .protocol("mqtts")
+                .enableMutualAuth("true")
+                .caCert("ca.pem")
+                .clientCert("client.crt")
+                .clientKey("client.key")
+                .build());
+        assertDoesNotThrow(() -> mqttCollect.preCheck(metrics));
+    }
+    // End region
 
-            mqttCollect.collect(builder, metrics);
-        });
+    @Test
+    // Verify supportProtocol method returns correct MQTT constant
+    void supportProtocolShouldReturnMqttConstant() {
+        assertEquals(DispatchConstants.PROTOCOL_MQTT, 
mqttCollect.supportProtocol());
     }
 }
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java
index 6db798b4df..d0d1fa7dd9 100644
--- 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java
@@ -33,49 +33,73 @@ import org.apache.commons.lang3.StringUtils;
 public class MqttProtocol implements CommonRequestProtocol, Protocol {
 
     /**
-     * ip address or domain name of the peer host
+     * mqtt client id
+     */
+    private String clientId;
+    /**
+     * mqtt username
+     */
+    private String username;
+    /**
+     * mqtt password
+     */
+    private String password;
+    /**
+     * mqtt host
      */
     private String host;
-
     /**
-     * peer host port
+     * mqtt port
      */
     private String port;
-
     /**
-     * username
+     * mqtt protocol version
+     * MQTT,MQTTS
      */
-    private String username;
-
+    private String protocol;
     /**
-     * password
+     * mqtt connect timeout
+     * the maximum time to wait for a connection to be established
      */
-    private String password;
-
+    private String timeout;
     /**
-     * time out period
+     * mqtt keepalive
+     * between ping requests to the broker to keep the connection alive
      */
-    private String timeout;
-
+    private String keepalive;
     /**
-     * client id
+     * mqtt topic name
      */
-    private String clientId;
-
+    private String topic;
     /**
-     * message used to test whether the mqtt connection can be pushed normally
+     * mqtt publish message
      */
     private String testMessage;
-
     /**
-     * protocol version of mqtt
+     * mqtt tls version
+     * TLSv1.2, TLSv1.3
      */
-    private String protocolVersion;
-
+    private String tlsVersion;
     /**
-     * monitor topic
+     * mqtt tls insecure skip verify server certificate
      */
-    private String topic;
+    private String insecureSkipVerify;
+    /**
+     * mqtt tls ca cert
+     */
+    private String caCert;
+    /**
+     * mqtt tls enable mutual auth
+     */
+    private String enableMutualAuth;
+    /**
+     * mqtt tls client cert
+     */
+    private String clientCert;
+    /**
+     * mqtt tls client key
+     */
+    private String clientKey;
 
     /**
      * Determine whether authentication is required
@@ -85,11 +109,4 @@ public class MqttProtocol implements CommonRequestProtocol, 
Protocol {
         return StringUtils.isNotBlank(this.username) && 
StringUtils.isNotBlank(this.password);
     }
 
-    /**
-     * Determine whether you need to test whether messages can be pushed 
normally
-     * @return turn if it has test message
-     */
-    public boolean testPublish(){
-        return StringUtils.isNotBlank(this.testMessage);
-    }
 }
diff --git a/hertzbeat-manager/src/main/resources/define/app-mqtt.yml 
b/hertzbeat-manager/src/main/resources/define/app-mqtt.yml
index 3cfbe758a7..81b1bd7247 100644
--- a/hertzbeat-manager/src/main/resources/define/app-mqtt.yml
+++ b/hertzbeat-manager/src/main/resources/define/app-mqtt.yml
@@ -13,15 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# The monitoring type category:service-application service monitoring 
db-database monitoring mid-middleware custom-custom monitoring os-operating 
system monitoring
+
 category: service
-# The monitoring type eg: linux windows tomcat mysql aws...
 app: mqtt
-# The app api i18n name
 name:
   zh-CN: MQTT 连接
   en-US: MQTT Connection
-# The description and help of this monitoring type
 help:
   zh-CN: HertzBeat 对 MQTT 连接进行监测。<br>您可以点击 “<i>新建 MQTT 连接</i>” 
并进行配置,或者选择“<i>更多操作</i>”,导入已有配置。
   en-US: HertzBeat monitors MQTT connections. <br>You can click "<i>New MQTT 
connection</i>" and configure it, or select "<i>More actions</i>" to import an 
existing configuration.
@@ -29,83 +26,121 @@ help:
 helpLink:
   zh-CN: https://hertzbeat.apache.org/zh-cn/docs/help/mqtt
   en-US: https://hertzbeat.apache.org/docs/help/mqtt
-# Input params define for monitoring(render web ui by the definition)
 params:
-  # field-param field key
+
+  - field: clientId
+    name:
+      zh-CN: 客户端ID
+      en-US: Client Id
+    type: text
+    defaultValue: hertzbeat-mqtt-client
+    required: true
+  - field: username
+    name:
+      zh-CN: 用户名
+      en-US: Username
+    type: text
+    required: false
+  - field: password
+    name:
+      zh-CN: 密码
+      en-US: Password
+    type: password
+    required: false
+
   - field: host
-    # name-param field display i18n name
     name:
       zh-CN: MQTT的Host
       en-US: Target Host
-    # type-param field type(most mapping the html input type)
     type: host
-    # required-true or false
     required: true
-  # field-param field key
   - field: port
-    # name-param field display i18n name
     name:
       zh-CN: 端口
       en-US: Port
-    # type-param field type(most mapping the html input type)
     type: number
-    # when type is number, range is required
     range: '[0,65535]'
-    # required-true or false
     required: true
-    # default value 1883
     defaultValue: 1883
-  - field: protocolVersion
+  - field: protocol
     name:
-      zh-CN: 协议版本
-      en-US: Protocol version
+      zh-CN: 连接协议
+      en-US: Protocol
     type: radio
     options:
-      - label: MQTT 3.1.1
-        value: MQTT_3_1_1
-      - label: MQTT 5.0
-        value: MQTT_5_0
+      - label: MQTT
+        value: MQTT
+      - label: MQTTS
+        value: MQTTS
     required: true
-    defaultValue: MQTT_3_1_1
-  # field-param field key
+    defaultValue: MQTT
+
   - field: timeout
-    # name-param field display i18n name
     name:
       zh-CN: 连接超时时间(ms)
       en-US: Connect Timeout(ms)
-    # type-param field type(most mapping the html input type)
     type: number
-    # when type is number, range is required
     range: '[0,100000]'
-    # required-true or false
     required: true
-    # default value 6000
-    defaultValue: 6000
-  # field-param field key
-  - field: username
+    defaultValue: 10000
+  - field: keepalive
     name:
-      zh-CN: 用户名
-      en-US: Username
-    type: text
-    hide: true
-    # required-true or false
+      zh-CN: 心跳检测时间(s)
+      en-US: Keep Alive(s)
+    type: number
+    range: '[0,100000]'
+    required: true
+    defaultValue: 30
+
+  - field: tlsVersion
+    name:
+      zh-CN: TLS版本
+      en-US: TLS Version
+    type: radio
+    options:
+      - label: TLSv1.2
+        value: TLSv1.2
+      - label: TLSv1.3
+        value: TLSv1.3
+    defaultValue: TLSv1.2
     required: false
-  - field: password
+    hide: true
+  - field: insecureSkipVerify
     name:
-      zh-CN: 密码
-      en-US: Password
+      zh-CN: 跳过证书验证
+      en-US: Skip Certificate Verification
+    type: boolean
+    defaultValue: false
+    hide: true
+  - field: caCert
+    name:
+      zh-CN: CA证书
+      en-US: CA Certificate
     type: text
+    required: false
     hide: true
-    # required-true or false
+  - field: enableMutualAuth
+    name:
+      zh-CN: 双向认证
+      en-US: Enable Mutual Auth
+    type: boolean
+    defaultValue: false
+    hide: true
+  - field: clientCert
+    name:
+      zh-CN: 客户端证书
+      en-US: Client Certificate
+    type: text
     required: false
-  - field: clientId
+    hide: true
+  - field: clientKey
     name:
-      zh-CN: 客户端ID
-      en-US: Client Id
+      zh-CN: 客户端私钥
+      en-US: Client Private Key
     type: text
-    defaultValue: hertzbeat-mqtt-client
-    # required-true or false
-    required: true
+    required: false
+    hide: true
+
 
   - field: topic
     name:
@@ -119,17 +154,12 @@ params:
       en-US: Test message
     type: text
     required: false
-# collect metrics config list
 metrics:
-  # metrics - summary
   - name: summary
     i18n:
       zh-CN: 概要
       en-US: Summary
-    # metrics scheduling priority(0->127)->(high->low), metrics with the same 
priority will be scheduled in parallel
-    # priority 0's metrics is availability metrics, it will be scheduled 
first, only availability metrics collect success will the scheduling continue
     priority: 0
-    # field-metric name, type-metric type(0-number,1-string), unit-metric 
unit('%','ms','MB'), label-whether it is a metrics label field
     fields:
       - field: responseTime
         type: 0
@@ -137,37 +167,41 @@ metrics:
         i18n:
           zh-CN: 响应时间
           en-US: Response Time
-      - field: canDescribe
+      - field: canSubscribe
         type: 1
         i18n:
-          zh-CN: 正常订阅
-          en-US: Normal subscription
+          zh-CN: 订阅状态
+          en-US: Normal subscribe
       - field: canPublish
         type: 1
         i18n:
-          zh-CN: 正常推送
+          zh-CN: 发布状态
           en-US: Normal publish
-    # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, 
sdk
+      - field: canReceive
+        type: 1
+        i18n:
+          zh-CN: 接收数据
+          en-US: Receive data
+      - field: canUnSubscribe
+        type: 1
+        i18n:
+          zh-CN: 取消订阅状态
+          en-US: Normal unsubscribe
     protocol: mqtt
-    # Specific collection configuration when protocol is telnet protocol
     mqtt:
-      # telnet host
+      clientId: ^_^clientId^_^
+      username: ^_^username^_^
+      password: ^_^password^_^
       host: ^_^host^_^
-      # port
       port: ^_^port^_^
-      # timeout
+      protocol: ^_^protocol^_^
       timeout: ^_^timeout^_^
-      # email
+      keepalive: ^_^keepalive^_^
+      tlsVersion: ^_^tlsVersion^_^
+      insecureSkipVerify: ^_^insecureSkipVerify^_^
+      caCert: ^_^caCert^_^
+      enableMutualAuth: ^_^enableMutualAuth^_^
+      clientCert: ^_^clientCert^_^
+      clientKey: ^_^clientKey^_^
       topic: ^_^topic^_^
-      # clientId
-      clientId: ^_^clientId^_^
-      # protocolVersion
-      protocolVersion: ^_^protocolVersion^_^
-      # username
-      username: ^_^username^_^
-      # password
-      password: ^_^password^_^
-      # testMessage
       testMessage: ^_^testMessage^_^
-
-


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@hertzbeat.apache.org
For additional commands, e-mail: notifications-h...@hertzbeat.apache.org

Reply via email to