[camel-kafka-connector] 02/02: Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sinks
This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch kamelets in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git commit 893bd7a94839ad1a9740c438bc8cb2f23c146e21 Author: Andrea Tarocchi AuthorDate: Wed May 19 10:46:13 2021 +0200 Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sinks --- .../utils/CamelKafkaConnectMain.java | 25 +++--- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 036375b..0871307 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -230,12 +230,12 @@ public class CamelKafkaConnectMain extends SimpleMain { //dataformats if (!ObjectHelper.isEmpty(marshallDataFormat)) { - camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); - camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat); } if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { - camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); - camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat); } //aggregator @@ -310,8 +310,8 @@ public class CamelKafkaConnectMain extends SimpleMain { .templateParameter("fromUrl") .templateParameter("errorHandler", "ckcErrorHandler") -.templateParameter("marshall", "dummyDataformat") -.templateParameter("unmarshall", "dummyDataformat") +.templateParameter("marshal", "dummyDataformat") +.templateParameter("unmarshal", "dummyDataformat") //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy? .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME) @@ -326,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain { ProcessorDefinition rdInTemplateSource = rtdSource.from("{{fromUrl}}") .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}")); if (!ObjectHelper.isEmpty(marshallDataFormat)) { -rdInTemplateSource = rdInTemplateSource.marshal("{{marshall}}"); +rdInTemplateSource = rdInTemplateSource.marshal("{{marshal}}"); } if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { -rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshall}}"); +rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshal}}"); } if (getContext().getRegistry().lookupByName("aggregate") != null) { @@ -351,9 +351,8 @@ public class CamelKafkaConnectMain extends SimpleMain { RouteTemplateDefinition rtdSink = routeTemplate("ckcSink") .templateParameter("toUrl") .templateParameter("errorHandler", "ckcErrorHandler") -//TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved -//.templateParameter("marshall", "dummyDataformat") -//.templateParameter("unmarshall", "dummyDataformat") +.templateParameter("marshal", "dummyDataformat") +.templateParameter("unmarshal", "dummyDataformat") //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy? .templateParameter("aggregationStrategy",
[camel-kafka-connector] 02/02: Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551
This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch kamelets in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git commit e0a4d6a8d53b3a9261b9987b3544cdda59f427d3 Author: Andrea Tarocchi AuthorDate: Sat May 15 07:59:12 2021 +0200 Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 --- .../utils/CamelKafkaConnectMain.java | 28 +++--- .../camel/kafkaconnector/CamelSourceTaskTest.java | 5 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 6e7dbdf..ba272d6 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -229,15 +229,15 @@ public class CamelKafkaConnectMain extends SimpleMain { camelProperties.putAll(props); //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved -////dataformats -//if (!ObjectHelper.isEmpty(marshallDataFormat)) { -// camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); -// camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); -//} -//if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { -// camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); -// camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); -//} +//dataformats +if (!ObjectHelper.isEmpty(marshallDataFormat)) { + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); +} +if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); +} //aggregator if (!ObjectHelper.isEmpty(aggregationSize)) { @@ -310,9 +310,9 @@ public class CamelKafkaConnectMain extends SimpleMain { RouteTemplateDefinition rtdSource = routeTemplate("ckcSource") .templateParameter("fromUrl") .templateParameter("errorHandler", "ckcErrorHandler") -//TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved -//.templateParameter("marshall", "dummyDataformat") -//.templateParameter("unmarshall", "dummyDataformat") + +.templateParameter("marshall", "dummyDataformat") +.templateParameter("unmarshall", "dummyDataformat") //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy? .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME) @@ -327,10 +327,10 @@ public class CamelKafkaConnectMain extends SimpleMain { ProcessorDefinition rdInTemplateSource = rtdSource.from("{{fromUrl}}") .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}")); if (!ObjectHelper.isEmpty(marshallDataFormat)) { -rdInTemplateSource = rdInTemplateSource.marshal(marshallDataFormat); +rdInTemplateSource = rdInTemplateSource.marshal("{{marshall}}"); } if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { -rdInTemplateSource = rdInTemplateSource.unmarshal(unmarshallDataFormat); +rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshall}}"); } if (getContext().getRegistry().lookupByName("aggregate") != null) { diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index b1271ac..36ae9e2 100644 ---