This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch kamelets in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 68682b197701887e96071f85363fbac52dbac93f Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Wed May 19 10:46:13 2021 +0200 Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sinks --- .../utils/CamelKafkaConnectMain.java | 25 +++++++++++----------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 036375b..0871307 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -230,12 +230,12 @@ public class CamelKafkaConnectMain extends SimpleMain { //dataformats if (!ObjectHelper.isEmpty(marshallDataFormat)) { - camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); - camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat); } if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { - camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); - camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat); } //aggregator @@ -310,8 +310,8 @@ public class CamelKafkaConnectMain extends SimpleMain { .templateParameter("fromUrl") .templateParameter("errorHandler", "ckcErrorHandler") - .templateParameter("marshall", "dummyDataformat") - .templateParameter("unmarshall", "dummyDataformat") + .templateParameter("marshal", "dummyDataformat") + .templateParameter("unmarshal", "dummyDataformat") //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy? .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME) @@ -326,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain { ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}") .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}")); if (!ObjectHelper.isEmpty(marshallDataFormat)) { - rdInTemplateSource = rdInTemplateSource.marshal("{{marshall}}"); + rdInTemplateSource = rdInTemplateSource.marshal("{{marshal}}"); } if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { - rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshall}}"); + rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshal}}"); } if (getContext().getRegistry().lookupByName("aggregate") != null) { @@ -351,9 +351,8 @@ public class CamelKafkaConnectMain extends SimpleMain { RouteTemplateDefinition rtdSink = routeTemplate("ckcSink") .templateParameter("toUrl") .templateParameter("errorHandler", "ckcErrorHandler") - //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved -// .templateParameter("marshall", "dummyDataformat") -// .templateParameter("unmarshall", "dummyDataformat") + .templateParameter("marshal", "dummyDataformat") + .templateParameter("unmarshal", "dummyDataformat") //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy? .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME) @@ -368,10 +367,10 @@ public class CamelKafkaConnectMain extends SimpleMain { ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source") .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}")); if (!ObjectHelper.isEmpty(marshallDataFormat)) { - rdInTemplateSink = rdInTemplateSink.marshal(marshallDataFormat); + rdInTemplateSink = rdInTemplateSink.marshal("{{marshal}}"); } if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { - rdInTemplateSink = rdInTemplateSink.unmarshal(unmarshallDataFormat); + rdInTemplateSink = rdInTemplateSink.unmarshal("{{unmarshal}}"); } if (getContext().getRegistry().lookupByName("aggregate") != null) {