This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 71a01426c6b8766498c540b57b61d7d3b242a009
Author: Andrea Tarocchi <andrea.taroc...@gmail.com>
AuthorDate: Wed May 19 10:46:13 2021 +0200

    Related to #423 resolved a problem with marshal/unmarshal after fixin 
https://issues.apache.org/jira/browse/CAMEL-16551 for sinks
---
 .../utils/CamelKafkaConnectMain.java               | 25 +++++++++++-----------
 1 file changed, 12 insertions(+), 13 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 036375b..0871307 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -230,12 +230,12 @@ public class CamelKafkaConnectMain extends SimpleMain {
 
             //dataformats
             if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
-                
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
+                
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"marshal", marshallDataFormat);
+                
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"marshal", marshallDataFormat);
             }
             if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
-                
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
+                
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshal", unmarshallDataFormat);
+                
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshal", unmarshallDataFormat);
             }
 
             //aggregator
@@ -310,8 +310,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
                             .templateParameter("fromUrl")
                             .templateParameter("errorHandler", 
"ckcErrorHandler")
 
-                            .templateParameter("marshall", "dummyDataformat")
-                            .templateParameter("unmarshall", "dummyDataformat")
+                            .templateParameter("marshal", "dummyDataformat")
+                            .templateParameter("unmarshal", "dummyDataformat")
 
                             //TODO: change 
CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
                             .templateParameter("aggregationStrategy", 
CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -326,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     ProcessorDefinition<?> rdInTemplateSource = 
rtdSource.from("{{fromUrl}}")
                             .errorHandler(new 
ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSource = 
rdInTemplateSource.marshal("{{marshall}}");
+                        rdInTemplateSource = 
rdInTemplateSource.marshal("{{marshal}}");
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSource = 
rdInTemplateSource.unmarshal("{{unmarshall}}");
+                        rdInTemplateSource = 
rdInTemplateSource.unmarshal("{{unmarshal}}");
                     }
 
                     if (getContext().getRegistry().lookupByName("aggregate") 
!= null) {
@@ -351,9 +351,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
                             .templateParameter("toUrl")
                             .templateParameter("errorHandler", 
"ckcErrorHandler")
-                            //TODO: enable or delete these parameters once 
https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-//                            .templateParameter("marshall", "dummyDataformat")
-//                            .templateParameter("unmarshall", 
"dummyDataformat")
+                            .templateParameter("marshal", "dummyDataformat")
+                            .templateParameter("unmarshal", "dummyDataformat")
 
                             //TODO: change 
CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
                             .templateParameter("aggregationStrategy", 
CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -368,10 +367,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     ProcessorDefinition<?> rdInTemplateSink = 
rtdSink.from("kamelet:source")
                             .errorHandler(new 
ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSink = 
rdInTemplateSink.marshal(marshallDataFormat);
+                        rdInTemplateSink = 
rdInTemplateSink.marshal("{{marshal}}");
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSink = 
rdInTemplateSink.unmarshal(unmarshallDataFormat);
+                        rdInTemplateSink = 
rdInTemplateSink.unmarshal("{{unmarshal}}");
                     }
 
                     if (getContext().getRegistry().lookupByName("aggregate") 
!= null) {

Reply via email to