This is an automated email from the ASF dual-hosted git repository. shenghang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push: new af3eabdd1c [mqtt] refact the MQTT based on the Paho SDK and support both unidirectional and bidirectional MQTT over TLS connections. (#3474) af3eabdd1c is described below commit af3eabdd1c6ecfa9d474a67492815713333643e4 Author: yy549159265 <40821310+yy549159...@users.noreply.github.com> AuthorDate: Fri Jul 11 13:36:10 2025 +0800 [mqtt] refact the MQTT based on the Paho SDK and support both unidirectional and bidirectional MQTT over TLS connections. (#3474) Signed-off-by: yy549159265 <40821310+yy549159...@users.noreply.github.com> Co-authored-by: aias00 <liuhon...@apache.org> Co-authored-by: Logic <zqr10...@dromara.org> Co-authored-by: Calvin <zhengqi...@apache.org> Co-authored-by: tomsun28 <tomsu...@outlook.com> --- .github/workflows/backend-build-test.yml | 9 + .../hertzbeat-collector-basic/pom.xml | 15 +- .../collect/mqtt/CertificateFormatter.java | 195 ++++++++++++ .../collector/collect/mqtt/MqttCollectImpl.java | 338 ++++++++++++--------- .../collector/collect/mqtt/MqttSslFactory.java | 186 ++++++++++++ .../collector/collect/mqtt/MqttCollectTest.java | 137 ++++----- .../common/entity/job/protocol/MqttProtocol.java | 77 +++-- .../src/main/resources/define/app-mqtt.yml | 180 ++++++----- 8 files changed, 815 insertions(+), 322 deletions(-) diff --git a/.github/workflows/backend-build-test.yml b/.github/workflows/backend-build-test.yml index daf5678c92..1c8976375f 100644 --- a/.github/workflows/backend-build-test.yml +++ b/.github/workflows/backend-build-test.yml @@ -48,6 +48,15 @@ jobs: - name: Build with Maven run: mvnd clean -B package -Prelease -Dmaven.test.skip=false --file pom.xml + - name: Upload test reports + if: failure() + uses: actions/upload-artifact@v4 + with: + name: test-reports-${{ github.run_id }} + path: | + **/target/surefire-reports + **/target/failsafe-reports + - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v4.0.1 with: diff --git a/hertzbeat-collector/hertzbeat-collector-basic/pom.xml b/hertzbeat-collector/hertzbeat-collector-basic/pom.xml index df5ea8044a..e418cde5aa 100644 --- a/hertzbeat-collector/hertzbeat-collector-basic/pom.xml +++ b/hertzbeat-collector/hertzbeat-collector-basic/pom.xml @@ -33,7 +33,7 @@ <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <mqtt.version>1.3.3</mqtt.version> + <mqtt.version>1.2.5</mqtt.version> </properties> <dependencies> @@ -140,10 +140,19 @@ </dependency> <!-- mqtt --> <dependency> - <groupId>com.hivemq</groupId> - <artifactId>hivemq-mqtt-client</artifactId> + <groupId>org.eclipse.paho</groupId> + <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>${mqtt.version}</version> </dependency> + <!--Bouncy Castle--> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpkix-jdk15on</artifactId> + <version>1.68</version> + </dependency> + + + <!--plc--> <dependency> <groupId>org.apache.plc4x</groupId> diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/CertificateFormatter.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/CertificateFormatter.java new file mode 100644 index 0000000000..312d35d94e --- /dev/null +++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/CertificateFormatter.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.collector.collect.mqtt; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Formats the private key and certificate, supporting concatenation of multiple certificates in PEM format. + */ +public class CertificateFormatter { + + public static String formatCertificateChain(String input) { + if (input == null || input.trim().isEmpty()) { + return input; + } + + String normalized = normalizeInput(input); + + List<String> certificates = extractCertificates(normalized); + + if (certificates.isEmpty()) { + return formatAsSingleCertificate(normalized); + } + + StringBuilder formattedChain = new StringBuilder(); + for (String cert : certificates) { + if (cert.trim().isEmpty()) continue; + + String formatted = formatPemBlock(cert); + formattedChain.append(formatted).append("\n"); + } + + return formattedChain.toString().trim(); + } + + private static String normalizeInput(String input) { + return input + .replace("\r\n", "\n") + .replace("\r", "\n") + .replaceAll("\\s*\\\\n\\s*", "\n") + .replaceAll("(?m)^\\s+|\\s+$", "") + .trim(); + } + + private static List<String> extractCertificates(String input) { + List<String> certificates = new ArrayList<>(); + String regex = "(-----BEGIN\\s+[\\w\\s]+?-----)[\\s\\S]*?(-----END\\s+[\\w\\s]+?-----)"; + + + Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE); + Matcher matcher = pattern.matcher(input); + + int lastEnd = 0; + while (matcher.find()) { + + if (matcher.start() > lastEnd) { + String gap = input.substring(lastEnd, matcher.start()); + if (!gap.trim().isEmpty()) { + certificates.add(gap); + } + } + + certificates.add(matcher.group()); + lastEnd = matcher.end(); + } + + + if (lastEnd < input.length()) { + certificates.add(input.substring(lastEnd)); + } + + return certificates; + } + + private static String formatPemBlock(String block) { + try { + Pattern pattern = Pattern.compile( + "(-----BEGIN\\s+[\\w\\s]+?-----)(.*?)(-----END\\s+[\\w\\s]+?-----)", + Pattern.DOTALL | Pattern.CASE_INSENSITIVE + ); + + Matcher matcher = pattern.matcher(block); + if (matcher.find()) { + String header = matcher.group(1).trim(); + String body = matcher.group(2); + String footer = matcher.group(3).trim(); + + + if (body == null) body = ""; + + String cleanBody = body + .replaceAll("\\s", "") + .replaceAll("\"", "") + .trim(); + + + if (cleanBody.isEmpty() && body != null && !body.trim().isEmpty()) { + + cleanBody = body.replaceAll("[^a-zA-Z0-9+/=]", "").trim(); + } + + String formattedBody = formatBase64Body(cleanBody); + + return header + "\n" + formattedBody + "\n" + footer; + } else { + + return formatAsCertificate(block); + } + } catch (Exception e) { + + return block; + } + } + + private static String formatAsCertificate(String content) { + + String cleanContent = content.replaceAll("[^a-zA-Z0-9+/=]", "").trim(); + + if (cleanContent.isEmpty()) { + return content; + } + + + String formattedBody = formatBase64Body(cleanContent); + + + if (cleanContent.toLowerCase().contains("private")) { + if (cleanContent.startsWith("MII") || cleanContent.length() > 1000) { + return "-----BEGIN PRIVATE KEY-----\n" + formattedBody + "\n-----END PRIVATE KEY-----"; + } else { + return "-----BEGIN RSA PRIVATE KEY-----\n" + formattedBody + "\n-----END RSA PRIVATE KEY-----"; + } + } else { + return "-----BEGIN CERTIFICATE-----\n" + formattedBody + "\n-----END CERTIFICATE-----"; + } + } + + private static String formatAsSingleCertificate(String input) { + String cleanContent = input.replaceAll("[^a-zA-Z0-9+/=]", "").trim(); + return formatAsCertificate(cleanContent); + } + + private static String formatBase64Body(String body) { + + StringBuilder formatted = new StringBuilder(); + int index = 0; + while (index < body.length()) { + int end = Math.min(index + 64, body.length()); + formatted.append(body.substring(index, end)); + if (end < body.length()) { + formatted.append("\n"); + } + index = end; + } + return formatted.toString().trim(); + } + + public static String formatPrivateKey(String input) { + if (input == null || input.trim().isEmpty()) { + return input; + } + + + String normalized = normalizeInput(input); + + + if (isPemEncapsulated(normalized)) { + return formatPemBlock(normalized); + } + + return formatAsCertificate(normalized); + } + + private static boolean isPemEncapsulated(String block) { + return block.contains("-----BEGIN") && block.contains("-----END"); + } +} diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java index 8bf96cc019..befd864209 100644 --- a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java +++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java @@ -17,26 +17,7 @@ package org.apache.hertzbeat.collector.collect.mqtt; -import com.hivemq.client.mqtt.MqttVersion; -import com.hivemq.client.mqtt.datatypes.MqttQos; -import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; -import com.hivemq.client.mqtt.mqtt3.Mqtt3Client; -import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder; -import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck; -import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; -import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; -import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder; -import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Consumer; -import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.hertzbeat.collector.collect.AbstractCollect; import org.apache.hertzbeat.collector.constants.CollectorConstants; @@ -46,13 +27,27 @@ import org.apache.hertzbeat.common.entity.job.Metrics; import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol; import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.hertzbeat.common.entity.message.CollectRep.MetricsData.Builder; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttToken; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.springframework.util.StopWatch; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + /** - * collect mqtt metrics + * collect mqtt metrics using Eclipse Paho */ public class MqttCollectImpl extends AbstractCollect { @@ -61,138 +56,224 @@ public class MqttCollectImpl extends AbstractCollect { private static final Logger logger = LoggerFactory.getLogger(MqttCollectImpl.class); + @Override + public String supportProtocol() { + return DispatchConstants.PROTOCOL_MQTT; + } + @Override public void preCheck(Metrics metrics) throws IllegalArgumentException { MqttProtocol mqttProtocol = metrics.getMqtt(); Assert.hasText(mqttProtocol.getHost(), "MQTT protocol host is required"); Assert.hasText(mqttProtocol.getPort(), "MQTT protocol port is required"); - Assert.hasText(mqttProtocol.getProtocolVersion(), "MQTT protocol version is required"); + + if ("mqtts".equalsIgnoreCase(mqttProtocol.getProtocol())) { + if (Boolean.parseBoolean(mqttProtocol.getEnableMutualAuth())) { + Assert.hasText(mqttProtocol.getCaCert(), "CA certificate is required for mutual auth"); + Assert.hasText(mqttProtocol.getClientCert(), "Client certificate is required for mutual auth"); + Assert.hasText(mqttProtocol.getClientKey(), "Client private key is required for mutual auth"); + } + } } @Override public void collect(Builder builder, Metrics metrics) { - MqttProtocol mqtt = metrics.getMqtt(); - String protocolVersion = mqtt.getProtocolVersion(); - MqttVersion mqttVersion = MqttVersion.valueOf(protocolVersion); - if (mqttVersion == MqttVersion.MQTT_3_1_1) { - collectWithVersion3(metrics, builder); - } else if (mqttVersion == MqttVersion.MQTT_5_0) { - collectWithVersion5(metrics, builder); + MqttProtocol mqttProtocol = metrics.getMqtt(); + Map<Object, String> data = new HashMap<>(); + + try { + MqttAsyncClient client = buildMqttClient(mqttProtocol); + long responseTime = connectClient(client, mqttProtocol); + testSubscribeAndPublish(client, mqttProtocol, data); + convertToMetricsData(builder, metrics, responseTime, data); + client.disconnect(); + } catch (Exception e) { + logger.error("MQTT collection error: {}", e.getMessage(), e); + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("Collection failed: " + e.getMessage()); } } - @Override - public String supportProtocol() { - return DispatchConstants.PROTOCOL_MQTT; + private MqttAsyncClient buildMqttClient(MqttProtocol protocol) throws Exception { + String clientId = protocol.getClientId(); + + String serverUri = String.format("%s://%s:%s", + StringUtils.equals(protocol.getProtocol(), "MQTT") ? "tcp" : "ssl", + protocol.getHost(), + protocol.getPort()); + + MqttClientPersistence persistence = new MemoryPersistence(); + + return new MqttAsyncClient(serverUri, clientId, persistence); } - /** - * collecting data of MQTT 5 - */ - private void collectWithVersion5(Metrics metrics, Builder builder) { - MqttProtocol mqttProtocol = metrics.getMqtt(); - Map<Object, String> data = new HashMap<>(); - Mqtt5AsyncClient client = buildMqtt5Client(mqttProtocol); - long responseTime = connectClient(client, mqtt5AsyncClient -> { - CompletableFuture<Mqtt5ConnAck> connectFuture = mqtt5AsyncClient.connect(); - try { - connectFuture.get(Long.parseLong(mqttProtocol.getTimeout()), TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - builder.setCode(CollectRep.Code.FAIL); - builder.setMsg(getErrorMessage(e.getMessage())); + private long connectClient(MqttAsyncClient client, MqttProtocol protocol) throws Exception { + MqttConnectOptions connOpts = new MqttConnectOptions(); + + if (protocol.hasAuth()) { + connOpts.setUserName(protocol.getUsername()); + connOpts.setPassword(protocol.getPassword().toCharArray()); + } + + connOpts.setKeepAliveInterval(Integer.parseInt(protocol.getKeepalive())); + connOpts.setConnectionTimeout(Integer.parseInt(protocol.getTimeout()) / 1000); + connOpts.setCleanSession(true); + connOpts.setAutomaticReconnect(false); + if ("mqtts".equalsIgnoreCase(protocol.getProtocol())) { + boolean insecureSkipVerify = Boolean.parseBoolean(protocol.getInsecureSkipVerify()); + if (insecureSkipVerify) { + connOpts.setHttpsHostnameVerificationEnabled(false); } - }); - testDescribeAndPublish5(client, mqttProtocol, data); - convertToMetricsData(builder, metrics, responseTime, data); - client.disconnect(); + if (Boolean.parseBoolean(protocol.getEnableMutualAuth())) { + connOpts.setSocketFactory(MqttSslFactory.getMslSocketFactory(protocol, insecureSkipVerify)); + } else { + connOpts.setSocketFactory(MqttSslFactory.getSslSocketFactory(protocol, insecureSkipVerify)); + } + } + + + StopWatch connectWatch = new StopWatch(); + connectWatch.start(); + + client.connect(connOpts).waitForCompletion(Long.parseLong(protocol.getTimeout())); + connectWatch.stop(); + return connectWatch.getTotalTimeMillis(); } + /** - * collecting data of MQTT 3.1.1 + * Test MQTT subscribe and publish capabilities */ - private void collectWithVersion3(Metrics metrics, Builder builder) { - MqttProtocol mqttProtocol = metrics.getMqtt(); - Map<Object, String> data = new HashMap<>(); - Mqtt3AsyncClient client = buildMqtt3Client(mqttProtocol); - long responseTime = connectClient(client, mqtt3AsyncClient -> { - CompletableFuture<Mqtt3ConnAck> connectFuture = mqtt3AsyncClient.connect(); - try { - connectFuture.get(Long.parseLong(mqttProtocol.getTimeout()), TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - builder.setCode(CollectRep.Code.FAIL); - builder.setMsg(getErrorMessage(e.getMessage())); + private void testSubscribeAndPublish(MqttAsyncClient client, MqttProtocol protocol, Map<Object, String> data) { + + // 1 test subscribe + if (StringUtils.isNotBlank(protocol.getTopic())) { + String subscribe = testSubscribe(client, protocol.getTopic()); + if (StringUtils.isBlank(subscribe)) { + data.put("canSubscribe", "Subscription successful"); + } else { + data.put("canSubscribe", String.format("Subscription failed: %s", subscribe)); } - }); - testDescribeAndPublish3(client, mqttProtocol, data); - convertToMetricsData(builder, metrics, responseTime, data); - client.disconnect(); - } - private void testDescribeAndPublish3(Mqtt3AsyncClient client, MqttProtocol mqttProtocol, Map<Object, String> data) { - data.put("canDescribe", test(() -> { - client.subscribeWith().topicFilter(mqttProtocol.getTopic()).qos(MqttQos.AT_LEAST_ONCE).send(); - client.unsubscribeWith().topicFilter(mqttProtocol.getTopic()).send(); - }, "subscribe").toString()); - - data.put("canPublish", !mqttProtocol.testPublish() ? Boolean.FALSE.toString() : test(() -> { - client.publishWith().topic(mqttProtocol.getTopic()) - .payload(mqttProtocol.getTestMessage().getBytes(StandardCharsets.UTF_8)) - .qos(MqttQos.AT_LEAST_ONCE).send(); - data.put("canPublish", Boolean.TRUE.toString()); - }, "publish").toString()); - } + } else { + data.put("canSubscribe", "No topic, subscription test skipped"); + } + + + // 2 test publish + if (StringUtils.isNotBlank(protocol.getTestMessage())) { + String publish = testPublish(client, protocol.getTopic(), protocol.getTestMessage()); + if (StringUtils.isBlank(publish)) { + data.put("canPublish", "Message published successfully"); + + // 3 test receive message + String receivedData = getReceivedData(client, protocol.getTopic()); + data.put("canReceive", receivedData); + } else { + data.put("canPublish", String.format("Message publishing failed: %s", publish)); + data.put("canReceive", "Message reception skipped due to failed publish"); + } + } else { + data.put("canPublish", "No test message, publish test skipped"); + data.put("canReceive", "No test message, receive test skipped"); + } + - private void testDescribeAndPublish5(Mqtt5AsyncClient client, MqttProtocol mqttProtocol, Map<Object, String> data) { - data.put("canDescribe", test(() -> { - client.subscribeWith().topicFilter(mqttProtocol.getTopic()).qos(MqttQos.AT_LEAST_ONCE).send(); - client.unsubscribeWith().topicFilter(mqttProtocol.getTopic()).send(); - }, "subscribe").toString()); - - data.put("canPublish", !mqttProtocol.testPublish() ? Boolean.FALSE.toString() : test(() -> { - client.publishWith().topic(mqttProtocol.getTopic()) - .payload(mqttProtocol.getTestMessage().getBytes(StandardCharsets.UTF_8)) - .qos(MqttQos.AT_LEAST_ONCE).send(); - data.put("canPublish", Boolean.TRUE.toString()); - }, "publish").toString()); + // 4 test unsubscribe + if (StringUtils.isNotBlank(protocol.getTopic())) { + String subscribe = testUnSubscribe(client, protocol.getTopic()); + if (StringUtils.isBlank(subscribe)) { + data.put("canUnSubscribe", "Unsubscription successful"); + } else { + data.put("canUnSubscribe", String.format("Unsubscription failed: %s", subscribe)); + } + } else { + data.put("canUnSubscribe", "No topic, unsubscription test skipped"); + } } - private Mqtt5AsyncClient buildMqtt5Client(MqttProtocol mqttProtocol) { - Mqtt5ClientBuilder mqtt5ClientBuilder = Mqtt5Client.builder() - .serverHost(mqttProtocol.getHost()) - .identifier(mqttProtocol.getClientId()) - .serverPort(Integer.parseInt(mqttProtocol.getPort())); + private String getReceivedData(MqttAsyncClient client, String topic) { + final CountDownLatch latch = new CountDownLatch(1); + final StringBuilder messageHolder = new StringBuilder(); + + + client.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + latch.countDown(); + } - if (mqttProtocol.hasAuth()) { - mqtt5ClientBuilder.simpleAuth().username(mqttProtocol.getUsername()) - .password(mqttProtocol.getPassword().getBytes(StandardCharsets.UTF_8)) - .applySimpleAuth(); + @Override + public void messageArrived(String arrivedTopic, MqttMessage message) { + + if (topic.equals(arrivedTopic)) { + messageHolder.append(new String(message.getPayload())); + latch.countDown(); + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + }); + + try { + boolean received = latch.await(5, TimeUnit.SECONDS); + if (messageHolder.length() > 0) { + return messageHolder.toString(); + } else if (!received) { + return "Message reception timed out after 5 seconds"; + } else { + return "No valid message received"; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return e.getMessage(); + } finally { + client.setCallback(null); } - return mqtt5ClientBuilder.buildAsync(); } - private Mqtt3AsyncClient buildMqtt3Client(MqttProtocol mqttProtocol) { + private String testSubscribe(MqttAsyncClient client, String topic) { + try { + IMqttToken subToken = client.subscribe(topic, 1); + subToken.waitForCompletion(5000); + return ""; + } catch (MqttException e) { + logger.warn("MQTT subscribe test failed: {}", e.getMessage()); + return e.getMessage(); + } + } + + private String testPublish(MqttAsyncClient client, String topic, String message) { + try { + MqttMessage mqttMessage = new MqttMessage(message.getBytes()); + mqttMessage.setQos(1); - Mqtt3ClientBuilder mqtt3ClientBuilder = Mqtt3Client.builder() - .serverHost(mqttProtocol.getHost()) - .identifier(mqttProtocol.getClientId()) - .serverPort(Integer.parseInt(mqttProtocol.getPort())); + IMqttToken pubToken = client.publish(topic, mqttMessage); + pubToken.waitForCompletion(5000); - if (mqttProtocol.hasAuth()) { - mqtt3ClientBuilder.simpleAuth().username(mqttProtocol.getUsername()) - .password(mqttProtocol.getPassword().getBytes(StandardCharsets.UTF_8)) - .applySimpleAuth(); + return ""; + } catch (MqttException e) { + logger.warn("MQTT publish test failed: {}", e.getMessage()); + return e.getMessage(); } - return mqtt3ClientBuilder.buildAsync(); } - public <T> long connectClient(T client, Consumer<T> connect) { - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - connect.accept(client); - stopWatch.stop(); - return stopWatch.getTotalTimeMillis(); + private String testUnSubscribe(MqttAsyncClient client, String topic) { + try { + IMqttToken unsubToken = client.unsubscribe(topic); + unsubToken.waitForCompletion(5000); + return ""; + } catch (MqttException e) { + logger.warn("MQTT unsubscribe test failed: {}", e.getMessage()); + return e.getMessage(); + } } + /** + * Convert collected data to MetricsData + */ private void convertToMetricsData(Builder builder, Metrics metrics, long responseTime, Map<Object, String> data) { CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); for (String column : metrics.getAliasFields()) { @@ -207,25 +288,4 @@ public class MqttCollectImpl extends AbstractCollect { builder.addValueRow(valueRowBuilder.build()); } - private Boolean test(Runnable runnable, String operationName) { - try { - runnable.run(); - return true; - } catch (Exception e) { - logger.error("{} fail", operationName, e); - } - return false; - } - - private String getErrorMessage(String errorMessage) { - if (StringUtils.isBlank(errorMessage)) { - return "connect failed"; - } - String[] split = errorMessage.split(":"); - if (split.length > 1) { - return Arrays.stream(split).skip(1).collect(Collectors.joining(":")); - } - return errorMessage; - } - } diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttSslFactory.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttSslFactory.java new file mode 100644 index 0000000000..3c78bf8fc3 --- /dev/null +++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttSslFactory.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.collector.collect.mqtt; + +import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol; +import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.openssl.PEMKeyPair; +import org.bouncycastle.openssl.PEMParser; +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; + +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.StringReader; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.Security; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.util.Collection; + +/** + * Support MQTT SSL Factory + */ +public class MqttSslFactory { + + /** + * Get MSL Socket Factory + */ + public static SSLSocketFactory getMslSocketFactory(MqttProtocol mqttProtocol, boolean insecureSkipVerify) { + try { + Security.addProvider(new BouncyCastleProvider()); + + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(null, null); + + Certificate[] chain = null; + if (mqttProtocol.getClientCert() != null && !mqttProtocol.getClientCert().isEmpty()) { + String formatClientCert = CertificateFormatter.formatCertificateChain(mqttProtocol.getClientCert()); + try (InputStream certIn = new ByteArrayInputStream(formatClientCert.getBytes())) { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + Collection<? extends Certificate> certs = cf.generateCertificates(certIn); + chain = certs.toArray(new Certificate[0]); + } + } + + PrivateKey privateKey; + if (mqttProtocol.getClientKey() != null && !mqttProtocol.getClientKey().isEmpty()) { + String formatClientKey = CertificateFormatter.formatPrivateKey(mqttProtocol.getClientKey()); + try (PEMParser pemParser = new PEMParser(new StringReader(formatClientKey))) { + JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC"); + Object object = pemParser.readObject(); + + if (object instanceof PEMKeyPair) { + privateKey = converter.getPrivateKey(((PEMKeyPair) object).getPrivateKeyInfo()); + } else if (object instanceof PrivateKeyInfo) { + privateKey = converter.getPrivateKey((PrivateKeyInfo) object); + } else { + throw new IllegalArgumentException("Unsupported private key type"); + } + + ks.setKeyEntry("private-key", privateKey, "".toCharArray(), chain); + } + } + + TrustManager[] trustManagers; + if (insecureSkipVerify) { + trustManagers = createInsecureTrustManager(); + } else { + String formatCaCert = CertificateFormatter.formatCertificateChain(mqttProtocol.getCaCert()); + KeyStore trustStore = createMergedTrustStore(formatCaCert); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(trustStore); + trustManagers = tmf.getTrustManagers(); + } + + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, "".toCharArray()); + + + SSLContext context = SSLContext.getInstance(mqttProtocol.getTlsVersion()); + context.init(kmf.getKeyManagers(), trustManagers, null); + + return context.getSocketFactory(); + } catch (Exception e) { + throw new RuntimeException("Fails to SSL initialize: " + e.getMessage(), e); + } + } + + /** + * Get SSL Socket Factory + */ + public static SSLSocketFactory getSslSocketFactory(MqttProtocol mqttProtocol, boolean insecureSkipVerify) { + try { + Security.addProvider(new BouncyCastleProvider()); + + + TrustManager[] trustManagers; + if (insecureSkipVerify) { + trustManagers = createInsecureTrustManager(); + } else { + + String formatCaCert = CertificateFormatter.formatCertificateChain(mqttProtocol.getCaCert()); + KeyStore trustStore = createMergedTrustStore(formatCaCert); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(trustStore); + trustManagers = tmf.getTrustManagers(); + } + + SSLContext sslContext = SSLContext.getInstance(mqttProtocol.getTlsVersion()); + sslContext.init(null, trustManagers, null); + + return sslContext.getSocketFactory(); + } catch (Exception e) { + throw new RuntimeException("Fails to SSL initialize: " + e.getMessage(), e); + } + } + + private static TrustManager[] createInsecureTrustManager() { + return new TrustManager[]{ + new X509TrustManager() { + public void checkClientTrusted(X509Certificate[] chain, String authType) { + } + + public void checkServerTrusted(X509Certificate[] chain, String authType) { + } + + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } + }; + } + + private static KeyStore createMergedTrustStore(String caCertPem) throws Exception { + KeyStore mergedKs = KeyStore.getInstance(KeyStore.getDefaultType()); + mergedKs.load(null, null); + + + TrustManagerFactory systemTmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + systemTmf.init((KeyStore) null); + X509TrustManager systemTm = (X509TrustManager) systemTmf.getTrustManagers()[0]; + + int systemIndex = 1; + for (X509Certificate cert : systemTm.getAcceptedIssuers()) { + mergedKs.setCertificateEntry("system-ca-" + systemIndex++, cert); + } + + + if (caCertPem != null && !caCertPem.isEmpty()) { + try (InputStream caIn = new ByteArrayInputStream(caCertPem.getBytes())) { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + Collection<? extends Certificate> customCerts = cf.generateCertificates(caIn); + + int customIndex = 1; + for (Certificate cert : customCerts) { + mergedKs.setCertificateEntry("custom-ca-" + customIndex++, cert); + } + } + } + + return mergedKs; + } +} diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectTest.java b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectTest.java index 9d48eca044..588cccc86d 100644 --- a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectTest.java +++ b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectTest.java @@ -17,108 +17,91 @@ package org.apache.hertzbeat.collector.collect.mqtt; -import com.hivemq.client.mqtt.MqttVersion; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + import org.apache.hertzbeat.collector.dispatch.DispatchConstants; import org.apache.hertzbeat.common.entity.job.Metrics; import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol; -import org.apache.hertzbeat.common.entity.message.CollectRep; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.ArrayList; - -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertThrows; - /** * Test case for {@link MqttCollectImpl} */ -public class MqttCollectTest { +class MqttCollectTest { private MqttCollectImpl mqttCollect; private Metrics metrics; - private CollectRep.MetricsData.Builder builder; + private MqttProtocol.MqttProtocolBuilder mqttBuilder; @BeforeEach - public void setup() { + void setup() { mqttCollect = new MqttCollectImpl(); - MqttProtocol mqtt = MqttProtocol.builder().build(); - metrics = Metrics.builder() - .mqtt(mqtt) - .build(); - builder = CollectRep.MetricsData.newBuilder(); + metrics = new Metrics(); + + // Initialize base MQTT parameters for test cases + mqttBuilder = MqttProtocol.builder() + .host("example.com") + .port("1883") + .protocol("mqtt") + .timeout("5000") + .keepalive("60"); } - @Test - void preCheck() { - // host is empty - assertThrows(IllegalArgumentException.class, () -> { - mqttCollect.preCheck(metrics); - }); - - // port is empty - assertThrows(IllegalArgumentException.class, () -> { - MqttProtocol mqtt = MqttProtocol.builder().build(); - mqtt.setHost("example.com"); - metrics.setMqtt(mqtt); - mqttCollect.preCheck(metrics); - }); - - // protocol version is empty - assertThrows(IllegalArgumentException.class, () -> { - MqttProtocol mqtt = MqttProtocol.builder().build(); - mqtt.setHost("example.com"); - mqtt.setPort("1883"); - metrics.setMqtt(mqtt); - mqttCollect.preCheck(metrics); - }); + // Region: preCheck validation tests - // everything is ok - assertDoesNotThrow(() -> { - MqttProtocol mqtt = MqttProtocol.builder().build(); - mqtt.setHost("example.com"); - mqtt.setPort("1883"); - metrics.setMqtt(mqtt); - mqtt.setProtocolVersion("3.1.1"); - mqttCollect.preCheck(metrics); - }); + @Test + // Verify preCheck throws exception when host is missing + void preCheckShouldThrowWhenHostMissing() { + metrics.setMqtt(mqttBuilder.host("").build()); + assertThrows(IllegalArgumentException.class, () -> mqttCollect.preCheck(metrics)); } @Test - void supportProtocol() { - Assertions.assertEquals(DispatchConstants.PROTOCOL_MQTT, mqttCollect.supportProtocol()); + // Verify preCheck throws exception when port is missing + void preCheckShouldThrowWhenPortMissing() { + metrics.setMqtt(mqttBuilder.port("").build()); + assertThrows(IllegalArgumentException.class, () -> mqttCollect.preCheck(metrics)); } @Test - void collect() { - // with version 3.1.1 - assertDoesNotThrow(() -> { - MqttProtocol mqtt = MqttProtocol.builder().build(); - mqtt.setHost("example.com"); - mqtt.setPort("1883"); - mqtt.setClientId("clientid"); - mqtt.setTimeout("1"); - mqtt.setProtocolVersion(MqttVersion.MQTT_3_1_1.name()); - - metrics.setMqtt(mqtt); - metrics.setAliasFields(new ArrayList<>()); - - mqttCollect.collect(builder, metrics); - }); + // Verify preCheck throws exception when MQTTS mutual auth is enabled but CA cert is missing + void preCheckShouldThrowWhenMqttsMutualAuthMissingCerts() { + metrics.setMqtt(mqttBuilder + .protocol("mqtts") + .enableMutualAuth("true") + .caCert("") + .clientCert("client.crt") + .clientKey("client.key") + .build()); + assertThrows(IllegalArgumentException.class, () -> mqttCollect.preCheck(metrics)); + } - - assertDoesNotThrow(() -> { - MqttProtocol mqtt = MqttProtocol.builder().build(); - mqtt.setHost("example.com"); - mqtt.setPort("1883"); - mqtt.setClientId("clientid"); - mqtt.setTimeout("1"); - mqtt.setProtocolVersion(MqttVersion.MQTT_5_0.name()); + @Test + // Verify preCheck succeeds with valid standard MQTT parameters + void preCheckShouldSucceedWithValidMqttParams() { + metrics.setMqtt(mqttBuilder.build()); + assertDoesNotThrow(() -> mqttCollect.preCheck(metrics)); + } - metrics.setMqtt(mqtt); - metrics.setAliasFields(new ArrayList<>()); + @Test + // Verify preCheck succeeds with valid MQTTS parameters including mutual authentication + void preCheckShouldSucceedWithValidMqttsMutualAuth() { + metrics.setMqtt(mqttBuilder + .protocol("mqtts") + .enableMutualAuth("true") + .caCert("ca.pem") + .clientCert("client.crt") + .clientKey("client.key") + .build()); + assertDoesNotThrow(() -> mqttCollect.preCheck(metrics)); + } + // End region - mqttCollect.collect(builder, metrics); - }); + @Test + // Verify supportProtocol method returns correct MQTT constant + void supportProtocolShouldReturnMqttConstant() { + assertEquals(DispatchConstants.PROTOCOL_MQTT, mqttCollect.supportProtocol()); } } diff --git a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java index 6db798b4df..d0d1fa7dd9 100644 --- a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java +++ b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java @@ -33,49 +33,73 @@ import org.apache.commons.lang3.StringUtils; public class MqttProtocol implements CommonRequestProtocol, Protocol { /** - * ip address or domain name of the peer host + * mqtt client id + */ + private String clientId; + /** + * mqtt username + */ + private String username; + /** + * mqtt password + */ + private String password; + /** + * mqtt host */ private String host; - /** - * peer host port + * mqtt port */ private String port; - /** - * username + * mqtt protocol version + * MQTT,MQTTS */ - private String username; - + private String protocol; /** - * password + * mqtt connect timeout + * the maximum time to wait for a connection to be established */ - private String password; - + private String timeout; /** - * time out period + * mqtt keepalive + * between ping requests to the broker to keep the connection alive */ - private String timeout; - + private String keepalive; /** - * client id + * mqtt topic name */ - private String clientId; - + private String topic; /** - * message used to test whether the mqtt connection can be pushed normally + * mqtt publish message */ private String testMessage; - /** - * protocol version of mqtt + * mqtt tls version + * TLSv1.2, TLSv1.3 */ - private String protocolVersion; - + private String tlsVersion; /** - * monitor topic + * mqtt tls insecure skip verify server certificate */ - private String topic; + private String insecureSkipVerify; + /** + * mqtt tls ca cert + */ + private String caCert; + /** + * mqtt tls enable mutual auth + */ + private String enableMutualAuth; + /** + * mqtt tls client cert + */ + private String clientCert; + /** + * mqtt tls client key + */ + private String clientKey; /** * Determine whether authentication is required @@ -85,11 +109,4 @@ public class MqttProtocol implements CommonRequestProtocol, Protocol { return StringUtils.isNotBlank(this.username) && StringUtils.isNotBlank(this.password); } - /** - * Determine whether you need to test whether messages can be pushed normally - * @return turn if it has test message - */ - public boolean testPublish(){ - return StringUtils.isNotBlank(this.testMessage); - } } diff --git a/hertzbeat-manager/src/main/resources/define/app-mqtt.yml b/hertzbeat-manager/src/main/resources/define/app-mqtt.yml index 3cfbe758a7..81b1bd7247 100644 --- a/hertzbeat-manager/src/main/resources/define/app-mqtt.yml +++ b/hertzbeat-manager/src/main/resources/define/app-mqtt.yml @@ -13,15 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -# The monitoring type category:service-application service monitoring db-database monitoring mid-middleware custom-custom monitoring os-operating system monitoring + category: service -# The monitoring type eg: linux windows tomcat mysql aws... app: mqtt -# The app api i18n name name: zh-CN: MQTT 连接 en-US: MQTT Connection -# The description and help of this monitoring type help: zh-CN: HertzBeat 对 MQTT 连接进行监测。<br>您可以点击 “<i>新建 MQTT 连接</i>” 并进行配置,或者选择“<i>更多操作</i>”,导入已有配置。 en-US: HertzBeat monitors MQTT connections. <br>You can click "<i>New MQTT connection</i>" and configure it, or select "<i>More actions</i>" to import an existing configuration. @@ -29,83 +26,121 @@ help: helpLink: zh-CN: https://hertzbeat.apache.org/zh-cn/docs/help/mqtt en-US: https://hertzbeat.apache.org/docs/help/mqtt -# Input params define for monitoring(render web ui by the definition) params: - # field-param field key + + - field: clientId + name: + zh-CN: 客户端ID + en-US: Client Id + type: text + defaultValue: hertzbeat-mqtt-client + required: true + - field: username + name: + zh-CN: 用户名 + en-US: Username + type: text + required: false + - field: password + name: + zh-CN: 密码 + en-US: Password + type: password + required: false + - field: host - # name-param field display i18n name name: zh-CN: MQTT的Host en-US: Target Host - # type-param field type(most mapping the html input type) type: host - # required-true or false required: true - # field-param field key - field: port - # name-param field display i18n name name: zh-CN: 端口 en-US: Port - # type-param field type(most mapping the html input type) type: number - # when type is number, range is required range: '[0,65535]' - # required-true or false required: true - # default value 1883 defaultValue: 1883 - - field: protocolVersion + - field: protocol name: - zh-CN: 协议版本 - en-US: Protocol version + zh-CN: 连接协议 + en-US: Protocol type: radio options: - - label: MQTT 3.1.1 - value: MQTT_3_1_1 - - label: MQTT 5.0 - value: MQTT_5_0 + - label: MQTT + value: MQTT + - label: MQTTS + value: MQTTS required: true - defaultValue: MQTT_3_1_1 - # field-param field key + defaultValue: MQTT + - field: timeout - # name-param field display i18n name name: zh-CN: 连接超时时间(ms) en-US: Connect Timeout(ms) - # type-param field type(most mapping the html input type) type: number - # when type is number, range is required range: '[0,100000]' - # required-true or false required: true - # default value 6000 - defaultValue: 6000 - # field-param field key - - field: username + defaultValue: 10000 + - field: keepalive name: - zh-CN: 用户名 - en-US: Username - type: text - hide: true - # required-true or false + zh-CN: 心跳检测时间(s) + en-US: Keep Alive(s) + type: number + range: '[0,100000]' + required: true + defaultValue: 30 + + - field: tlsVersion + name: + zh-CN: TLS版本 + en-US: TLS Version + type: radio + options: + - label: TLSv1.2 + value: TLSv1.2 + - label: TLSv1.3 + value: TLSv1.3 + defaultValue: TLSv1.2 required: false - - field: password + hide: true + - field: insecureSkipVerify name: - zh-CN: 密码 - en-US: Password + zh-CN: 跳过证书验证 + en-US: Skip Certificate Verification + type: boolean + defaultValue: false + hide: true + - field: caCert + name: + zh-CN: CA证书 + en-US: CA Certificate type: text + required: false hide: true - # required-true or false + - field: enableMutualAuth + name: + zh-CN: 双向认证 + en-US: Enable Mutual Auth + type: boolean + defaultValue: false + hide: true + - field: clientCert + name: + zh-CN: 客户端证书 + en-US: Client Certificate + type: text required: false - - field: clientId + hide: true + - field: clientKey name: - zh-CN: 客户端ID - en-US: Client Id + zh-CN: 客户端私钥 + en-US: Client Private Key type: text - defaultValue: hertzbeat-mqtt-client - # required-true or false - required: true + required: false + hide: true + - field: topic name: @@ -119,17 +154,12 @@ params: en-US: Test message type: text required: false -# collect metrics config list metrics: - # metrics - summary - name: summary i18n: zh-CN: 概要 en-US: Summary - # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel - # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue priority: 0 - # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field fields: - field: responseTime type: 0 @@ -137,37 +167,41 @@ metrics: i18n: zh-CN: 响应时间 en-US: Response Time - - field: canDescribe + - field: canSubscribe type: 1 i18n: - zh-CN: 正常订阅 - en-US: Normal subscription + zh-CN: 订阅状态 + en-US: Normal subscribe - field: canPublish type: 1 i18n: - zh-CN: 正常推送 + zh-CN: 发布状态 en-US: Normal publish - # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, sdk + - field: canReceive + type: 1 + i18n: + zh-CN: 接收数据 + en-US: Receive data + - field: canUnSubscribe + type: 1 + i18n: + zh-CN: 取消订阅状态 + en-US: Normal unsubscribe protocol: mqtt - # Specific collection configuration when protocol is telnet protocol mqtt: - # telnet host + clientId: ^_^clientId^_^ + username: ^_^username^_^ + password: ^_^password^_^ host: ^_^host^_^ - # port port: ^_^port^_^ - # timeout + protocol: ^_^protocol^_^ timeout: ^_^timeout^_^ - # email + keepalive: ^_^keepalive^_^ + tlsVersion: ^_^tlsVersion^_^ + insecureSkipVerify: ^_^insecureSkipVerify^_^ + caCert: ^_^caCert^_^ + enableMutualAuth: ^_^enableMutualAuth^_^ + clientCert: ^_^clientCert^_^ + clientKey: ^_^clientKey^_^ topic: ^_^topic^_^ - # clientId - clientId: ^_^clientId^_^ - # protocolVersion - protocolVersion: ^_^protocolVersion^_^ - # username - username: ^_^username^_^ - # password - password: ^_^password^_^ - # testMessage testMessage: ^_^testMessage^_^ - - --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@hertzbeat.apache.org For additional commands, e-mail: notifications-h...@hertzbeat.apache.org