[camel-kafka-connector] 02/02: Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sinks

2021-05-19 Thread valdar
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 893bd7a94839ad1a9740c438bc8cb2f23c146e21
Author: Andrea Tarocchi 
AuthorDate: Wed May 19 10:46:13 2021 +0200

Related to #423 resolved a problem with marshal/unmarshal after fixin 
https://issues.apache.org/jira/browse/CAMEL-16551 for sinks
---
 .../utils/CamelKafkaConnectMain.java   | 25 +++---
 1 file changed, 12 insertions(+), 13 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 036375b..0871307 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -230,12 +230,12 @@ public class CamelKafkaConnectMain extends SimpleMain {
 
 //dataformats
 if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
-
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
+
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"marshal", marshallDataFormat);
+
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"marshal", marshallDataFormat);
 }
 if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
-
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
+
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshal", unmarshallDataFormat);
+
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshal", unmarshallDataFormat);
 }
 
 //aggregator
@@ -310,8 +310,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
 .templateParameter("fromUrl")
 .templateParameter("errorHandler", 
"ckcErrorHandler")
 
-.templateParameter("marshall", "dummyDataformat")
-.templateParameter("unmarshall", "dummyDataformat")
+.templateParameter("marshal", "dummyDataformat")
+.templateParameter("unmarshal", "dummyDataformat")
 
 //TODO: change 
CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
 .templateParameter("aggregationStrategy", 
CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -326,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
 ProcessorDefinition rdInTemplateSource = 
rtdSource.from("{{fromUrl}}")
 .errorHandler(new 
ErrorHandlerBuilderRef("{{errorHandler}}"));
 if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-rdInTemplateSource = 
rdInTemplateSource.marshal("{{marshall}}");
+rdInTemplateSource = 
rdInTemplateSource.marshal("{{marshal}}");
 }
 if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-rdInTemplateSource = 
rdInTemplateSource.unmarshal("{{unmarshall}}");
+rdInTemplateSource = 
rdInTemplateSource.unmarshal("{{unmarshal}}");
 }
 
 if (getContext().getRegistry().lookupByName("aggregate") 
!= null) {
@@ -351,9 +351,8 @@ public class CamelKafkaConnectMain extends SimpleMain {
 RouteTemplateDefinition rtdSink = routeTemplate("ckcSink")
 .templateParameter("toUrl")
 .templateParameter("errorHandler", 
"ckcErrorHandler")
-//TODO: enable or delete these parameters once 
https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-//.templateParameter("marshall", "dummyDataformat")
-//.templateParameter("unmarshall", 
"dummyDataformat")
+.templateParameter("marshal", "dummyDataformat")
+.templateParameter("unmarshal", "dummyDataformat")
 
 //TODO: change 
CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
 .templateParameter("aggregationStrategy", 

[camel-kafka-connector] 02/02: Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551

2021-05-16 Thread valdar
This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e0a4d6a8d53b3a9261b9987b3544cdda59f427d3
Author: Andrea Tarocchi 
AuthorDate: Sat May 15 07:59:12 2021 +0200

Related to #423 resolved a problem with marshal/unmarshal after fixin 
https://issues.apache.org/jira/browse/CAMEL-16551
---
 .../utils/CamelKafkaConnectMain.java   | 28 +++---
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  5 
 2 files changed, 14 insertions(+), 19 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 6e7dbdf..ba272d6 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -229,15 +229,15 @@ public class CamelKafkaConnectMain extends SimpleMain {
 camelProperties.putAll(props);
 
 //TODO: enable or delete these parameters once 
https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-////dataformats
-//if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-//
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
-//
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
-//}
-//if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-//
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
-//
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
-//}
+//dataformats
+if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
+
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
+}
+if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
+
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
+}
 
 //aggregator
 if (!ObjectHelper.isEmpty(aggregationSize)) {
@@ -310,9 +310,9 @@ public class CamelKafkaConnectMain extends SimpleMain {
 RouteTemplateDefinition rtdSource = 
routeTemplate("ckcSource")
 .templateParameter("fromUrl")
 .templateParameter("errorHandler", 
"ckcErrorHandler")
-//TODO: enable or delete these parameters once 
https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-//.templateParameter("marshall", "dummyDataformat")
-//.templateParameter("unmarshall", 
"dummyDataformat")
+
+.templateParameter("marshall", "dummyDataformat")
+.templateParameter("unmarshall", "dummyDataformat")
 
 //TODO: change 
CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
 .templateParameter("aggregationStrategy", 
CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -327,10 +327,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
 ProcessorDefinition rdInTemplateSource = 
rtdSource.from("{{fromUrl}}")
 .errorHandler(new 
ErrorHandlerBuilderRef("{{errorHandler}}"));
 if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-rdInTemplateSource = 
rdInTemplateSource.marshal(marshallDataFormat);
+rdInTemplateSource = 
rdInTemplateSource.marshal("{{marshall}}");
 }
 if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-rdInTemplateSource = 
rdInTemplateSource.unmarshal(unmarshallDataFormat);
+rdInTemplateSource = 
rdInTemplateSource.unmarshal("{{unmarshall}}");
 }
 
 if (getContext().getRegistry().lookupByName("aggregate") 
!= null) {
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index b1271ac..36ae9e2 100644
---