This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new ddd2e40921a CAMEL-20754: Support for MQTT 5 Publish Properties (#14175)
ddd2e40921a is described below

commit ddd2e40921afb570a1dd78c34bd495bfc2b83ac2
Author: Henning Sudbrock <git...@hsudbrock.de>
AuthorDate: Fri May 17 12:34:34 2024 +0200

    CAMEL-20754: Support for MQTT 5 Publish Properties (#14175)
    
    Adds a message header for MQTT 5 Publish Properties to the Paho MQTT 5 
component. This enables support for the MQTT message headers introduced with 
MQTT 5 (like, e.g., content-type, response-topic, message-expiry-interval, or 
user properties; cf. section 3.3.2.3 of the MQTT 5 spec, 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html).
    
    When consuming a message, the contents of the Camel message header are 
taken from the properties of the incoming MQT message; when producing an MQTT 
message, the properties are taken from the Camel message header if available.
---
 .../camel/catalog/components/paho-mqtt5.json       |  3 +-
 .../camel/component/paho/mqtt5/paho-mqtt5.json     |  3 +-
 .../component/paho/mqtt5/PahoMqtt5Constants.java   |  5 ++++
 .../component/paho/mqtt5/PahoMqtt5Consumer.java    |  1 +
 .../component/paho/mqtt5/PahoMqtt5Producer.java    |  4 +++
 .../integration/PahoMqtt5ComponentMqtt5IT.java     | 34 ++++++++++++++++++++++
 .../dsl/PahoMqtt5EndpointBuilderFactory.java       | 14 +++++++++
 7 files changed, 62 insertions(+), 2 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json
index 54161f7f611..1038353b6a3 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json
@@ -61,7 +61,8 @@
     "CamelMqttQoS": { "index": 1, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The quality of service of the incoming 
message.", "constantName": 
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#MQTT_QOS" },
     "CamelPahoMqtt5Qos": { "index": 2, "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The client quality of service level (0-2).", 
"constantName": 
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_QOS" },
     "CamelPahoMqtt5Retained": { "index": 3, "kind": "header", "displayName": 
"", "group": "producer", "label": "producer", "required": false, "javaType": 
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Retain option.", "constantName": 
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_RETAINED"
 },
-    "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The name of topic to override and send 
to instead of topic specified on endpoint.", "constantName": 
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC"
 }
+    "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The name of topic to override and send 
to instead of topic specified on endpoint.", "constantName": 
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC"
 },
+    "CamelPahoMqtt5MsgProperties": { "index": 5, "kind": "header", 
"displayName": "", "group": "producer", "label": "consumer,producer", 
"required": false, "javaType": 
"org.eclipse.paho.mqttv5.common.packet.MqttProperties", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": 
"Consumer: The properties set on the incoming message. Producer: The properties 
to be set on the outgoing message.", "constantName": 
"org.apache.camel.component.paho.mqtt5. [...]
   },
   "properties": {
     "topic": { "index": 0, "kind": "path", "displayName": "Topic", "group": 
"common", "label": "", "required": true, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Name of the topic" },
diff --git 
a/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json
 
b/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json
index 54161f7f611..1038353b6a3 100644
--- 
a/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json
+++ 
b/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json
@@ -61,7 +61,8 @@
     "CamelMqttQoS": { "index": 1, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The quality of service of the incoming 
message.", "constantName": 
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#MQTT_QOS" },
     "CamelPahoMqtt5Qos": { "index": 2, "kind": "header", "displayName": "", 
"group": "producer", "label": "producer", "required": false, "javaType": 
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The client quality of service level (0-2).", 
"constantName": 
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_QOS" },
     "CamelPahoMqtt5Retained": { "index": 3, "kind": "header", "displayName": 
"", "group": "producer", "label": "producer", "required": false, "javaType": 
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "Retain option.", "constantName": 
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_RETAINED"
 },
-    "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The name of topic to override and send 
to instead of topic specified on endpoint.", "constantName": 
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC"
 }
+    "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The name of topic to override and send 
to instead of topic specified on endpoint.", "constantName": 
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC"
 },
+    "CamelPahoMqtt5MsgProperties": { "index": 5, "kind": "header", 
"displayName": "", "group": "producer", "label": "consumer,producer", 
"required": false, "javaType": 
"org.eclipse.paho.mqttv5.common.packet.MqttProperties", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": 
"Consumer: The properties set on the incoming message. Producer: The properties 
to be set on the outgoing message.", "constantName": 
"org.apache.camel.component.paho.mqtt5. [...]
   },
   "properties": {
     "topic": { "index": 0, "kind": "path", "displayName": "Topic", "group": 
"common", "label": "", "required": true, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Name of the topic" },
diff --git 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java
 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java
index 3317fa364b1..4e95fe474de 100644
--- 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java
+++ 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java
@@ -47,6 +47,11 @@ public final class PahoMqtt5Constants {
               javaType = "String")
     public static final String CAMEL_PAHO_OVERRIDE_TOPIC = CAMEL_PAHO + 
"OverrideTopic";
 
+    @Metadata(label = "consumer,producer",
+              description = "Consumer: The properties set on the incoming 
message. Producer: The properties to be set on the outgoing message.",
+              javaType = 
"org.eclipse.paho.mqttv5.common.packet.MqttProperties")
+    public static final String CAMEL_PAHO_MSG_PROPERTIES = CAMEL_PAHO + 
"MsgProperties";
+
     private PahoMqtt5Constants() {
     }
 }
diff --git 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
index 224c070134d..140736a7c25 100644
--- 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
+++ 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
@@ -152,6 +152,7 @@ public class PahoMqtt5Consumer extends DefaultConsumer {
         paho.setBody(mqttMessage.getPayload());
         paho.setHeader(PahoMqtt5Constants.MQTT_TOPIC, topic);
         paho.setHeader(PahoMqtt5Constants.MQTT_QOS, mqttMessage.getQos());
+        paho.setHeader(PahoMqtt5Constants.CAMEL_PAHO_MSG_PROPERTIES, 
mqttMessage.getProperties());
 
         exchange.setIn(paho);
         return exchange;
diff --git 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java
 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java
index 6b26d8ce612..9a407742893 100644
--- 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java
+++ 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java
@@ -21,6 +21,7 @@ import org.apache.camel.support.DefaultProducer;
 import org.eclipse.paho.mqttv5.client.MqttClient;
 import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
 import org.eclipse.paho.mqttv5.common.MqttMessage;
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,11 +46,14 @@ public class PahoMqtt5Producer extends DefaultProducer {
                 getEndpoint().getConfiguration().getQos(), Integer.class);
         boolean retained = 
exchange.getIn().getHeader(PahoMqtt5Constants.CAMEL_PAHO_MSG_RETAINED,
                 getEndpoint().getConfiguration().isRetained(), Boolean.class);
+        MqttProperties properties
+                = 
exchange.getIn().getHeader(PahoMqtt5Constants.CAMEL_PAHO_MSG_PROPERTIES, 
MqttProperties.class);
         byte[] payload = exchange.getIn().getBody(byte[].class);
 
         MqttMessage message = new MqttMessage(payload);
         message.setQos(qos);
         message.setRetained(retained);
+        message.setProperties(properties);
 
         LOG.debug("Publishing to topic: {}, qos: {}, retrained: {}", topic, 
qos, retained);
         client.publish(topic, message);
diff --git 
a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java
 
b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java
index 3940d785951..bb8e87a1afd 100644
--- 
a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java
+++ 
b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.paho.mqtt5.integration;
 
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
@@ -28,9 +29,12 @@ import 
org.apache.camel.component.paho.mqtt5.PahoMqtt5Endpoint;
 import org.apache.camel.component.paho.mqtt5.PahoMqtt5Message;
 import org.apache.camel.component.paho.mqtt5.PahoMqtt5Persistence;
 import org.eclipse.paho.mqttv5.common.MqttMessage;
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
+import org.eclipse.paho.mqttv5.common.packet.UserProperty;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants.CAMEL_PAHO_MSG_PROPERTIES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
@@ -114,6 +118,28 @@ public class PahoMqtt5ComponentMqtt5IT extends 
PahoMqtt5ITSupport {
         mock.assertIsSatisfied();
     }
 
+    @Test
+    public void shouldSendAndReadMessagePropertiesFromMqtt() throws 
InterruptedException {
+        // Given
+        String msg = "msg";
+        MqttProperties publishedMqttProperties = createMqttProperties(
+                "text/plain",
+                "some-response-topic",
+                List.of(new UserProperty("key1", "value1"), new 
UserProperty("key2", "value2")));
+        mock.expectedBodiesReceived(msg);
+
+        // When
+        template.sendBodyAndHeader("direct:test", msg, 
CAMEL_PAHO_MSG_PROPERTIES, publishedMqttProperties);
+
+        // Then
+        mock.assertIsSatisfied();
+        MqttProperties receivedMqttProperties
+                = 
mock.getExchanges().get(0).getIn().getHeader(CAMEL_PAHO_MSG_PROPERTIES, 
MqttProperties.class);
+        assertEquals(receivedMqttProperties.getContentType(), 
publishedMqttProperties.getContentType());
+        assertEquals(receivedMqttProperties.getResponseTopic(), 
publishedMqttProperties.getResponseTopic());
+        assertEquals(receivedMqttProperties.getUserProperties(), 
publishedMqttProperties.getUserProperties());
+    }
+
     @Test
     public void shouldNotReadMessageFromUnregisteredTopic() throws 
InterruptedException {
         // Given
@@ -189,4 +215,12 @@ public class PahoMqtt5ComponentMqtt5IT extends 
PahoMqtt5ITSupport {
         mock.assertIsSatisfied();
     }
 
+    private MqttProperties createMqttProperties(String contentType, String 
responseTopic, List<UserProperty> userProperties) {
+        MqttProperties properties = new MqttProperties();
+        properties.setContentType(contentType);
+        properties.setResponseTopic(responseTopic);
+        properties.setUserProperties(userProperties);
+        return properties;
+    }
+
 }
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java
index 5fc5855ddbf..a8c6feaae6b 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java
@@ -3241,6 +3241,20 @@ public interface PahoMqtt5EndpointBuilderFactory {
         public String pahoMqtt5OverrideTopic() {
             return "CamelPahoMqtt5OverrideTopic";
         }
+        /**
+         * Consumer: The properties set on the incoming message. Producer: The
+         * properties to be set on the outgoing message.
+         * 
+         * The option is a: {@code
+         * org.eclipse.paho.mqttv5.common.packet.MqttProperties} type.
+         * 
+         * Group: producer
+         * 
+         * @return the name of the header {@code PahoMqtt5MsgProperties}.
+         */
+        public String pahoMqtt5MsgProperties() {
+            return "CamelPahoMqtt5MsgProperties";
+        }
     }
     static PahoMqtt5EndpointBuilder endpointBuilder(String componentName, 
String path) {
         class PahoMqtt5EndpointBuilderImpl extends AbstractEndpointBuilder 
implements PahoMqtt5EndpointBuilder, AdvancedPahoMqtt5EndpointBuilder {

Reply via email to