This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new ddd2e40921a CAMEL-20754: Support for MQTT 5 Publish Properties (#14175) ddd2e40921a is described below commit ddd2e40921afb570a1dd78c34bd495bfc2b83ac2 Author: Henning Sudbrock <git...@hsudbrock.de> AuthorDate: Fri May 17 12:34:34 2024 +0200 CAMEL-20754: Support for MQTT 5 Publish Properties (#14175) Adds a message header for MQTT 5 Publish Properties to the Paho MQTT 5 component. This enables support for the MQTT message headers introduced with MQTT 5 (like, e.g., content-type, response-topic, message-expiry-interval, or user properties; cf. section 3.3.2.3 of the MQTT 5 spec, https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html). When consuming a message, the contents of the Camel message header are taken from the properties of the incoming MQT message; when producing an MQTT message, the properties are taken from the Camel message header if available. --- .../camel/catalog/components/paho-mqtt5.json | 3 +- .../camel/component/paho/mqtt5/paho-mqtt5.json | 3 +- .../component/paho/mqtt5/PahoMqtt5Constants.java | 5 ++++ .../component/paho/mqtt5/PahoMqtt5Consumer.java | 1 + .../component/paho/mqtt5/PahoMqtt5Producer.java | 4 +++ .../integration/PahoMqtt5ComponentMqtt5IT.java | 34 ++++++++++++++++++++++ .../dsl/PahoMqtt5EndpointBuilderFactory.java | 14 +++++++++ 7 files changed, 62 insertions(+), 2 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json index 54161f7f611..1038353b6a3 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json @@ -61,7 +61,8 @@ "CamelMqttQoS": { "index": 1, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The quality of service of the incoming message.", "constantName": "org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#MQTT_QOS" }, "CamelPahoMqtt5Qos": { "index": 2, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The client quality of service level (0-2).", "constantName": "org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_QOS" }, "CamelPahoMqtt5Retained": { "index": 3, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Retain option.", "constantName": "org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_RETAINED" }, - "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of topic to override and send to instead of topic specified on endpoint.", "constantName": "org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC" } + "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of topic to override and send to instead of topic specified on endpoint.", "constantName": "org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC" }, + "CamelPahoMqtt5MsgProperties": { "index": 5, "kind": "header", "displayName": "", "group": "producer", "label": "consumer,producer", "required": false, "javaType": "org.eclipse.paho.mqttv5.common.packet.MqttProperties", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Consumer: The properties set on the incoming message. Producer: The properties to be set on the outgoing message.", "constantName": "org.apache.camel.component.paho.mqtt5. [...] }, "properties": { "topic": { "index": 0, "kind": "path", "displayName": "Topic", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Name of the topic" }, diff --git a/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json b/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json index 54161f7f611..1038353b6a3 100644 --- a/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json +++ b/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json @@ -61,7 +61,8 @@ "CamelMqttQoS": { "index": 1, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The quality of service of the incoming message.", "constantName": "org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#MQTT_QOS" }, "CamelPahoMqtt5Qos": { "index": 2, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The client quality of service level (0-2).", "constantName": "org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_QOS" }, "CamelPahoMqtt5Retained": { "index": 3, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Retain option.", "constantName": "org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_RETAINED" }, - "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of topic to override and send to instead of topic specified on endpoint.", "constantName": "org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC" } + "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of topic to override and send to instead of topic specified on endpoint.", "constantName": "org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC" }, + "CamelPahoMqtt5MsgProperties": { "index": 5, "kind": "header", "displayName": "", "group": "producer", "label": "consumer,producer", "required": false, "javaType": "org.eclipse.paho.mqttv5.common.packet.MqttProperties", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Consumer: The properties set on the incoming message. Producer: The properties to be set on the outgoing message.", "constantName": "org.apache.camel.component.paho.mqtt5. [...] }, "properties": { "topic": { "index": 0, "kind": "path", "displayName": "Topic", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Name of the topic" }, diff --git a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java index 3317fa364b1..4e95fe474de 100644 --- a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java +++ b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java @@ -47,6 +47,11 @@ public final class PahoMqtt5Constants { javaType = "String") public static final String CAMEL_PAHO_OVERRIDE_TOPIC = CAMEL_PAHO + "OverrideTopic"; + @Metadata(label = "consumer,producer", + description = "Consumer: The properties set on the incoming message. Producer: The properties to be set on the outgoing message.", + javaType = "org.eclipse.paho.mqttv5.common.packet.MqttProperties") + public static final String CAMEL_PAHO_MSG_PROPERTIES = CAMEL_PAHO + "MsgProperties"; + private PahoMqtt5Constants() { } } diff --git a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java index 224c070134d..140736a7c25 100644 --- a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java +++ b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java @@ -152,6 +152,7 @@ public class PahoMqtt5Consumer extends DefaultConsumer { paho.setBody(mqttMessage.getPayload()); paho.setHeader(PahoMqtt5Constants.MQTT_TOPIC, topic); paho.setHeader(PahoMqtt5Constants.MQTT_QOS, mqttMessage.getQos()); + paho.setHeader(PahoMqtt5Constants.CAMEL_PAHO_MSG_PROPERTIES, mqttMessage.getProperties()); exchange.setIn(paho); return exchange; diff --git a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java index 6b26d8ce612..9a407742893 100644 --- a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java +++ b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java @@ -21,6 +21,7 @@ import org.apache.camel.support.DefaultProducer; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,11 +46,14 @@ public class PahoMqtt5Producer extends DefaultProducer { getEndpoint().getConfiguration().getQos(), Integer.class); boolean retained = exchange.getIn().getHeader(PahoMqtt5Constants.CAMEL_PAHO_MSG_RETAINED, getEndpoint().getConfiguration().isRetained(), Boolean.class); + MqttProperties properties + = exchange.getIn().getHeader(PahoMqtt5Constants.CAMEL_PAHO_MSG_PROPERTIES, MqttProperties.class); byte[] payload = exchange.getIn().getBody(byte[].class); MqttMessage message = new MqttMessage(payload); message.setQos(qos); message.setRetained(retained); + message.setProperties(properties); LOG.debug("Publishing to topic: {}, qos: {}, retrained: {}", topic, qos, retained); client.publish(topic, message); diff --git a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java index 3940d785951..bb8e87a1afd 100644 --- a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java +++ b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java @@ -17,6 +17,7 @@ package org.apache.camel.component.paho.mqtt5.integration; import java.nio.charset.StandardCharsets; +import java.util.List; import org.apache.camel.EndpointInject; import org.apache.camel.Exchange; @@ -28,9 +29,12 @@ import org.apache.camel.component.paho.mqtt5.PahoMqtt5Endpoint; import org.apache.camel.component.paho.mqtt5.PahoMqtt5Message; import org.apache.camel.component.paho.mqtt5.PahoMqtt5Persistence; import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import static org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants.CAMEL_PAHO_MSG_PROPERTIES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -114,6 +118,28 @@ public class PahoMqtt5ComponentMqtt5IT extends PahoMqtt5ITSupport { mock.assertIsSatisfied(); } + @Test + public void shouldSendAndReadMessagePropertiesFromMqtt() throws InterruptedException { + // Given + String msg = "msg"; + MqttProperties publishedMqttProperties = createMqttProperties( + "text/plain", + "some-response-topic", + List.of(new UserProperty("key1", "value1"), new UserProperty("key2", "value2"))); + mock.expectedBodiesReceived(msg); + + // When + template.sendBodyAndHeader("direct:test", msg, CAMEL_PAHO_MSG_PROPERTIES, publishedMqttProperties); + + // Then + mock.assertIsSatisfied(); + MqttProperties receivedMqttProperties + = mock.getExchanges().get(0).getIn().getHeader(CAMEL_PAHO_MSG_PROPERTIES, MqttProperties.class); + assertEquals(receivedMqttProperties.getContentType(), publishedMqttProperties.getContentType()); + assertEquals(receivedMqttProperties.getResponseTopic(), publishedMqttProperties.getResponseTopic()); + assertEquals(receivedMqttProperties.getUserProperties(), publishedMqttProperties.getUserProperties()); + } + @Test public void shouldNotReadMessageFromUnregisteredTopic() throws InterruptedException { // Given @@ -189,4 +215,12 @@ public class PahoMqtt5ComponentMqtt5IT extends PahoMqtt5ITSupport { mock.assertIsSatisfied(); } + private MqttProperties createMqttProperties(String contentType, String responseTopic, List<UserProperty> userProperties) { + MqttProperties properties = new MqttProperties(); + properties.setContentType(contentType); + properties.setResponseTopic(responseTopic); + properties.setUserProperties(userProperties); + return properties; + } + } diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java index 5fc5855ddbf..a8c6feaae6b 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java @@ -3241,6 +3241,20 @@ public interface PahoMqtt5EndpointBuilderFactory { public String pahoMqtt5OverrideTopic() { return "CamelPahoMqtt5OverrideTopic"; } + /** + * Consumer: The properties set on the incoming message. Producer: The + * properties to be set on the outgoing message. + * + * The option is a: {@code + * org.eclipse.paho.mqttv5.common.packet.MqttProperties} type. + * + * Group: producer + * + * @return the name of the header {@code PahoMqtt5MsgProperties}. + */ + public String pahoMqtt5MsgProperties() { + return "CamelPahoMqtt5MsgProperties"; + } } static PahoMqtt5EndpointBuilder endpointBuilder(String componentName, String path) { class PahoMqtt5EndpointBuilderImpl extends AbstractEndpointBuilder implements PahoMqtt5EndpointBuilder, AdvancedPahoMqtt5EndpointBuilder {