This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5527e2f  Add contentLogLevel task property
     new 569cc3e  Merge pull request #193 from fvaleri/log-payload
5527e2f is described below

commit 5527e2fd9d46c1293cf531e233ca9cf43385704b
Author: Federico Valeri <fvaleri@localhost>
AuthorDate: Wed May 6 09:59:37 2020 +0200

    Add contentLogLevel task property
---
 core/pom.xml                                       |   5 +
 .../kafkaconnector/CamelSinkConnectorConfig.java   |   8 +-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |   4 +-
 .../kafkaconnector/CamelSourceConnectorConfig.java |  10 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |  38 ++--
 .../camel/kafkaconnector/utils/TaskHelper.java     |  42 +++++
 .../camel/kafkaconnector/CamelSinkTaskTest.java    |   3 +
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |   3 +
 .../camel/kafkaconnector/utils/TaskHelperTest.java | 200 +++++++++++++++++----
 parent/pom.xml                                     |   7 +
 10 files changed, 267 insertions(+), 53 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 91f8ec0..f082a9c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -89,6 +89,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-ext</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-api</artifactId>
             <scope>test</scope>
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index c353c58..666661b 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector;
 
 import java.util.Map;
 
+import org.apache.camel.LoggingLevel;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -42,10 +43,15 @@ public class CamelSinkConnectorConfig extends 
AbstractConfig {
     public static final String TOPIC_CONF = "topics";
     public static final String TOPIC_DOC = "A list of topics to use as input 
for this connector";
 
+    public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT = 
LoggingLevel.OFF.toString();
+    public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_CONF = 
"camel.sink.contentLogLevel";
+    public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DOC = "Log level 
for the record's content (default: " + CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT + 
"). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.";
+
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, 
Importance.HIGH, CAMEL_SINK_URL_DOC)
         .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, 
CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
-        .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, 
CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC);
+        .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, 
CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC)
+        .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, 
CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, 
CAMEL_SINK_CONTENT_LOG_LEVEL_DOC);
 
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> 
parsedConfig) {
         super(config, parsedConfig);
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index bd0870b..15ccd2b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -110,6 +110,7 @@ public class CamelSinkTask extends SinkTask {
     @Override
     public void put(Collection<SinkRecord> sinkRecords) {
         for (SinkRecord record : sinkRecords) {
+            TaskHelper.logRecordContent(LOG, record, config);
             Map<String, Object> headers = new HashMap<String, Object>();
             Exchange exchange = new 
DefaultExchange(producer.getCamelContext());
             headers.put(KAFKA_RECORD_KEY_HEADER, record.key());
@@ -123,7 +124,8 @@ public class CamelSinkTask extends SinkTask {
             }
             exchange.getMessage().setHeaders(headers);
             exchange.getMessage().setBody(record.value());
-            LOG.debug("Sending {} to {}", exchange, LOCAL_URL);
+
+            LOG.debug("Sending exchange {} to {}", exchange.getExchangeId(), 
LOCAL_URL);
             producer.send(LOCAL_URL, exchange);
         }
     }
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 34e2495..01d9fd3 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector;
 
 import java.util.Map;
 
+import org.apache.camel.LoggingLevel;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -60,13 +61,17 @@ public class CamelSourceConnectorConfig extends 
AbstractConfig {
 
     public static final Boolean 
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT = true;
     public static final String 
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF = 
"camel.source.pollingConsumerBlockWhenFull";
-    public static final String 
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC = " Whether to block any 
producer if the internal queue is full.";
+    public static final String 
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC = "Whether to block any 
producer if the internal queue is full.";
 
     public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT = null;
     public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF = 
"camel.source.camelMessageHeaderKey";
     public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The name 
of a camel message header containing an unique key that can be used as a Kafka 
message key."
           +  " If this is not specified, then the Kafka message will not have 
a key.";
 
+    public static final String CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT = 
LoggingLevel.OFF.toString();
+    public static final String CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF = 
"camel.source.contentLogLevel";
+    public static final String CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC = "Log level 
for the record's content (default: " + CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT + 
"). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.";
+
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT, 
Importance.HIGH, CAMEL_SOURCE_URL_DOC)
         .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, 
CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
@@ -77,7 +82,8 @@ public class CamelSourceConnectorConfig extends 
AbstractConfig {
         .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, 
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, 
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
         .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, 
Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, 
Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
         .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, 
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, 
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
-        .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, 
CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC);
+        .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, 
CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC)
+        .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, 
CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, 
CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC);
 
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> 
parsedConfig) {
         super(config, parsedConfig);
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index c9123ed..c8333ed 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -87,10 +87,8 @@ public class CamelSourceTask extends SourceTask {
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
                 remoteUrl = 
TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
-                                                actualProps,
-                                                
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
-                                                
CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
-                                                
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
+                        actualProps, 
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
+                        CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, 
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
 
             cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, 
unmarshaller, camelContext);
@@ -113,35 +111,41 @@ public class CamelSourceTask extends SourceTask {
 
         List<SourceRecord> records = new ArrayList<>();
 
-        while (collectedRecords < maxBatchPollSize && 
(Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
+        while (collectedRecords < maxBatchPollSize
+                && (Instant.now().toEpochMilli() - startPollEpochMilli) < 
maxPollDuration) {
             Exchange exchange = consumer.receiveNoWait();
 
             if (exchange != null) {
-                LOG.debug("Received exchange with");
-                LOG.debug("\t from endpoint: {}", exchange.getFromEndpoint());
-                LOG.debug("\t exchange id: {}", exchange.getExchangeId());
-                LOG.debug("\t message id: {}", 
exchange.getMessage().getMessageId());
-                LOG.debug("\t message body: {}", 
exchange.getMessage().getBody());
-                LOG.debug("\t message headers: {}", 
exchange.getMessage().getHeaders());
-                LOG.debug("\t message properties: {}", 
exchange.getProperties());
+                LOG.debug("Received Exchange {} with Message {} from Endpoint 
{}", exchange.getExchangeId(),
+                        exchange.getMessage().getMessageId(), 
exchange.getFromEndpoint());
 
                 // TODO: see if there is a better way to use sourcePartition 
an sourceOffset
-                Map<String, String> sourcePartition = 
Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
+                Map<String, String> sourcePartition = 
Collections.singletonMap("filename",
+                        exchange.getFromEndpoint().toString());
                 Map<String, String> sourceOffset = 
Collections.singletonMap("position", exchange.getExchangeId());
 
-                final Object messageHeaderKey = camelMessageHeaderKey != null 
? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
+                final Object messageHeaderKey = camelMessageHeaderKey != null
+                        ? 
exchange.getMessage().getHeader(camelMessageHeaderKey)
+                        : null;
                 final Object messageBodyValue = 
exchange.getMessage().getBody();
 
-                final Schema messageKeySchema = messageHeaderKey != null ? 
SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
-                final Schema messageBodySchema = messageBodyValue != null ? 
SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
+                final Schema messageKeySchema = messageHeaderKey != null
+                        ? 
SchemaHelper.buildSchemaBuilderForType(messageHeaderKey)
+                        : null;
+                final Schema messageBodySchema = messageBodyValue != null
+                        ? 
SchemaHelper.buildSchemaBuilderForType(messageBodyValue)
+                        : null;
 
-                SourceRecord record = new SourceRecord(sourcePartition, 
sourceOffset, topic, messageKeySchema, messageHeaderKey, messageBodySchema, 
messageBodyValue);
+                SourceRecord record = new SourceRecord(sourcePartition, 
sourceOffset, topic, messageKeySchema,
+                        messageHeaderKey, messageBodySchema, messageBodyValue);
                 if (exchange.getMessage().hasHeaders()) {
                     setAdditionalHeaders(record, 
exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                 }
                 if (exchange.hasProperties()) {
                     setAdditionalHeaders(record, exchange.getProperties(), 
PROPERTY_CAMEL_PREFIX);
                 }
+
+                TaskHelper.logRecordContent(LOG, record, config);
                 records.add(record);
                 collectedRecords++;
             } else {
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index c623662..672f13b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -22,7 +22,14 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.camel.LoggingLevel;
 import org.apache.camel.catalog.RuntimeCamelCatalog;
+import org.apache.camel.kafkaconnector.CamelSinkConnectorConfig;
+import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
 
 public final class TaskHelper {
 
@@ -93,4 +100,39 @@ public final class TaskHelper {
         }
         return false;
     }
+
+    public static <CFG extends AbstractConfig> void logRecordContent(Logger 
logger, ConnectRecord<?> record, CFG config) {
+        if (logger != null && record != null && config != null) {
+            // do not log record's content by default, as it may contain 
sensitive information
+            LoggingLevel level = LoggingLevel.OFF;
+            try {
+                final String key = (record instanceof SourceRecord)
+                    ? 
CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF
+                    : 
CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF;
+                level = 
LoggingLevel.valueOf(config.getString(key).toUpperCase());
+            } catch (Exception e) {
+                logger.warn("Invalid value for contentLogLevel property");
+            }
+            switch (level) {
+                case TRACE:
+                    logger.trace(record.toString());
+                    break;
+                case DEBUG:
+                    logger.debug(record.toString());
+                    break;
+                case INFO:
+                    logger.info(record.toString());
+                    break;
+                case WARN:
+                    logger.warn(record.toString());
+                    break;
+                case ERROR:
+                    logger.error(record.toString());
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
 }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 24cd03c..4941741 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -56,6 +57,8 @@ public class CamelSinkTaskTest {
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), 
sinkTask.getCamelSinkConnectorConfig(props)
+            
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
 
         sinkTask.stop();
     }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 33807ee..3032cba 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -20,6 +20,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.LoggingLevel;
 import org.apache.camel.ProducerTemplate;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -56,6 +57,8 @@ public class CamelSourceTaskTest {
 
         assertEquals(size, poll.size());
         assertEquals(TOPIC_NAME, poll.get(0).topic());
+        assertEquals(LoggingLevel.OFF.toString(), 
sourceTask.getCamelSourceConnectorConfig(props)
+            
.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF));
 
         sourceTask.stop();
     }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
index 3cfe5ed..bc0f537 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
@@ -20,16 +20,24 @@ import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.LoggingLevel;
 import org.apache.camel.catalog.RuntimeCamelCatalog;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
+import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.ext.LoggerWrapper;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-
 public class TaskHelperTest {
 
     @Test
@@ -41,13 +49,17 @@ public class TaskHelperTest {
 
     @Test
     public void testMergePropertiesDefaultAreAdded() {
-        Map<String, String> defaults = new HashMap<String, String>() {{
+        Map<String, String> defaults = new HashMap<String, String>() {
+            {
                 put("property", "defaultValue");
-            }};
+            }
+        };
 
-        Map<String, String> loaded = new HashMap<String, String>() {{
+        Map<String, String> loaded = new HashMap<String, String>() {
+            {
                 put("anotherProperty", "loadedValue");
-            }};
+            }
+        };
 
         Map result = TaskHelper.mergeProperties(defaults, loaded);
 
@@ -59,13 +71,17 @@ public class TaskHelperTest {
 
     @Test
     public void testMergePropertiesLoadedHavePrecedence() {
-        Map<String, String> defaults = new HashMap<String, String>() {{
+        Map<String, String> defaults = new HashMap<String, String>() {
+            {
                 put("property", "defaultValue");
-            }};
+            }
+        };
 
-        Map<String, String> loaded = new HashMap<String, String>() {{
+        Map<String, String> loaded = new HashMap<String, String>() {
+            {
                 put("property", "loadedValue");
-            }};
+            }
+        };
 
         Map result = TaskHelper.mergeProperties(defaults, loaded);
 
@@ -75,17 +91,21 @@ public class TaskHelperTest {
 
     @Test
     public void testMergePropertiesLoadedHavePrecedenceWithPrefixFiltering() {
-        Map<String, String> defaults = new HashMap<String, String>() {{
+        Map<String, String> defaults = new HashMap<String, String>() {
+            {
                 put("property", "defaultValue");
                 put("camel.component.x.objectProperty", 
"#class:my.package.MyClass");
                 put("camel.component.x.objectProperty.field", "defaultValue");
-            }};
+            }
+        };
 
-        Map<String, String> loaded = new HashMap<String, String>() {{
+        Map<String, String> loaded = new HashMap<String, String>() {
+            {
                 put("camel.component.x.objectProperty", 
"#class:my.package.MyOtherClass");
                 put("camel.component.x.objectProperty.anotherField", 
"loadedValue");
                 put("camel.component.x.normalProperty", "loadedValue");
-            }};
+            }
+        };
 
         Map result = TaskHelper.mergeProperties(defaults, loaded);
 
@@ -98,10 +118,12 @@ public class TaskHelperTest {
 
     @Test
     public void testCreateEndpointOptionsFromProperties() {
-        Map<String, String> props = new HashMap<String, String>() {{
+        Map<String, String> props = new HashMap<String, String>() {
+            {
                 put("prefix.key1", "value1");
                 put("notprefix.key2", "value2");
-            }};
+            }
+        };
 
         String result = TaskHelper.createEndpointOptionsFromProperties(props, 
"prefix.");
 
@@ -110,10 +132,12 @@ public class TaskHelperTest {
 
     @Test
     public void testCreateEndpointOptionsFromPropertiesConcatenation() {
-        Map<String, String> props = new HashMap<String, String>() {{
+        Map<String, String> props = new HashMap<String, String>() {
+            {
                 put("prefix.key1", "value1");
                 put("prefix.key2", "value2");
-            }};
+            }
+        };
 
         String result = TaskHelper.createEndpointOptionsFromProperties(props, 
"prefix.");
 
@@ -122,10 +146,12 @@ public class TaskHelperTest {
 
     @Test
     public void testCreateEndpointOptionsFromPropertiesEmpty() {
-        Map<String, String> props = new HashMap<String, String>() {{
+        Map<String, String> props = new HashMap<String, String>() {
+            {
                 put("prefix.key1", "value1");
                 put("notprefix.key2", "value2");
-            }};
+            }
+        };
 
         String result = TaskHelper.createEndpointOptionsFromProperties(props, 
"anotherprefix");
 
@@ -134,10 +160,12 @@ public class TaskHelperTest {
 
     @Test
     public void testCreateUrlPathFromProperties() {
-        Map<String, String> props = new HashMap<String, String>() {{
+        Map<String, String> props = new HashMap<String, String>() {
+            {
                 put("prefix.key1", "value1");
                 put("notprefix.key2", "value2");
-            }};
+            }
+        };
 
         String result = TaskHelper.createUrlPathFromProperties(props, 
"prefix.");
 
@@ -146,10 +174,12 @@ public class TaskHelperTest {
 
     @Test
     public void testCreateUrlPathFromPropertiesConcatenation() {
-        Map<String, String> props = new HashMap<String, String>() {{
+        Map<String, String> props = new HashMap<String, String>() {
+            {
                 put("prefix.key1", "value1");
                 put("prefix.key2", "value2");
-            }};
+            }
+        };
 
         String result = TaskHelper.createUrlPathFromProperties(props, 
"prefix.");
 
@@ -158,10 +188,12 @@ public class TaskHelperTest {
 
     @Test
     public void testCreateUrlPathFromPropertiesEmpty() {
-        Map<String, String> props = new HashMap<String, String>() {{
+        Map<String, String> props = new HashMap<String, String>() {
+            {
                 put("prefix.key1", "value1");
                 put("notprefix.key2", "value2");
-            }};
+            }
+        };
 
         String result = TaskHelper.createUrlPathFromProperties(props, 
"anotherprefix");
 
@@ -170,10 +202,12 @@ public class TaskHelperTest {
 
     @Test
     public void testBuildUrl() {
-        Map<String, String> props = new HashMap<String, String>() {{
+        Map<String, String> props = new HashMap<String, String>() {
+            {
                 put("prefix.key1", "value1");
                 put("anotherPrefix.key2", "value2");
-            }};
+            }
+        };
 
         String result = TaskHelper.buildUrl(props, "test", "prefix.", 
"anotherPrefix.");
 
@@ -184,23 +218,125 @@ public class TaskHelperTest {
     public void testBuildUrlWithRuntimeCatalog() throws URISyntaxException {
         DefaultCamelContext dcc = new DefaultCamelContext();
         RuntimeCamelCatalog rcc = 
dcc.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
-        Map<String, String> props = new HashMap<String, String>() {{
+        Map<String, String> props = new HashMap<String, String>() {
+            {
                 put("camel.source.path.name", "test");
                 put("camel.source.endpoint.synchronous", "true");
-            }};
+            }
+        };
 
         String result = TaskHelper.buildUrl(rcc, props, "direct", 
"camel.source.endpoint.", "camel.source.path.");
 
         assertEquals("direct:test?synchronous=true", result);
 
-        props = new HashMap<String, String>() {{
+        props = new HashMap<String, String>() {
+            {
                 put("camel.source.path.port", "8080");
                 put("camel.source.path.keyspace", "test");
                 put("camel.source.path.hosts", "localhost");
-            }};
+            }
+        };
 
         result = TaskHelper.buildUrl(rcc, props, "cql", 
"camel.source.endpoint.", "camel.source.path.");
 
         assertEquals("cql:localhost:8080/test", result);
     }
-}
\ No newline at end of file
+
+    private CamelSourceConnectorConfig getSourceConnectorConfig(String 
logLevel) {
+        return new 
CamelSourceConnectorConfig(CamelSourceConnectorConfig.conf(),
+            
Collections.singletonMap(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF,
 logLevel));
+    }
+
+    @Test
+    public void testlogRecordContent() {
+        String partName = "abc123";
+        Logger logger = new 
MyLogger(LoggerFactory.getLogger(TaskHelperTest.class), null);
+        SourceRecord record = new 
SourceRecord(Collections.singletonMap("partition", partName),
+            Collections.singletonMap("offset", "0"), null, null, null, null);
+        Queue<String> logEvents = ((MyLogger)logger).getEvents();
+
+        String offLevel = LoggingLevel.OFF.toString();
+        TaskHelper.logRecordContent(logger,  record, 
getSourceConnectorConfig(offLevel));
+        assertNull(logEvents.poll());
+
+        String traceLevel = LoggingLevel.TRACE.toString();
+        TaskHelper.logRecordContent(logger,  record, 
getSourceConnectorConfig(traceLevel));
+        assertTrue(logEvents.peek().contains(traceLevel) && 
logEvents.poll().contains(partName));
+
+        String debugLevel = LoggingLevel.DEBUG.toString();
+        TaskHelper.logRecordContent(logger,  record, 
getSourceConnectorConfig(debugLevel));
+        assertTrue(logEvents.peek().contains(debugLevel) && 
logEvents.poll().contains(partName));
+
+        String infoLevel = LoggingLevel.INFO.toString();
+        TaskHelper.logRecordContent(logger,  record, 
getSourceConnectorConfig(infoLevel));
+        assertTrue(logEvents.peek().contains(infoLevel) && 
logEvents.poll().contains(partName));
+
+        String warnLevel = LoggingLevel.WARN.toString();
+        TaskHelper.logRecordContent(logger,  record, 
getSourceConnectorConfig(warnLevel));
+        assertTrue(logEvents.peek().contains(warnLevel) && 
logEvents.poll().contains(partName));
+
+        String errorLevel = LoggingLevel.ERROR.toString();
+        TaskHelper.logRecordContent(logger,  record, 
getSourceConnectorConfig(errorLevel));
+        assertTrue(logEvents.peek().contains(errorLevel) && 
logEvents.poll().contains(partName));
+
+        TaskHelper.logRecordContent(null, record, 
getSourceConnectorConfig(debugLevel));
+        assertNull(logEvents.poll());
+
+        TaskHelper.logRecordContent(logger,  null, 
getSourceConnectorConfig(debugLevel));
+        assertNull(logEvents.poll());
+
+        TaskHelper.logRecordContent(logger,  record, null);
+        assertNull(logEvents.poll());
+
+        String invalidLevel = "NOLOG";
+        TaskHelper.logRecordContent(logger, record, 
getSourceConnectorConfig(invalidLevel));
+        assertTrue(logEvents.poll().contains(warnLevel));
+
+        TaskHelper.logRecordContent(logger, record, 
getSourceConnectorConfig(null));
+        assertTrue(logEvents.poll().contains(warnLevel));
+    }
+
+    class MyLogger extends LoggerWrapper {
+        private Queue<String> events = new ConcurrentLinkedQueue<String>();
+
+        public MyLogger(Logger logger, String fqcn) {
+            super(logger, fqcn);
+        }
+
+        public Queue<String> getEvents() {
+            return events;
+        }
+
+        private void log(LoggingLevel level, String msg) {
+            StringBuilder sb = new StringBuilder()
+                .append(level).append(" ").append(msg);
+            events.add(sb.toString());
+        }
+
+        @Override
+        public void trace(String msg) {
+            log(LoggingLevel.TRACE, msg);
+        }
+
+        @Override
+        public void debug(String msg) {
+            log(LoggingLevel.DEBUG, msg);
+        }
+
+        @Override
+        public void info(String msg) {
+            log(LoggingLevel.INFO, msg);
+        }
+
+        @Override
+        public void warn(String msg) {
+            log(LoggingLevel.WARN, msg);
+        }
+
+        @Override
+        public void error(String msg) {
+            log(LoggingLevel.ERROR, msg);
+        }
+    }
+
+}
diff --git a/parent/pom.xml b/parent/pom.xml
index 3c76418..9212b67 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -32,6 +32,7 @@
         <junit.version>5.6.2</junit.version>
         <camel.version>3.3.0</camel.version>
         <jackson.version>2.10.3</jackson.version>
+        <slf4j.version>1.7.30</slf4j.version>
         <log4j2.version>2.8.2</log4j2.version>
         <version.commons-io>2.6</version.commons-io>
         <version.java>1.8</version.java>
@@ -192,6 +193,12 @@
                 <scope>test</scope>
             </dependency>
             <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-ext</artifactId>
+                <version>${slf4j.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.logging.log4j</groupId>
                 <artifactId>log4j-api</artifactId>
                 <version>${log4j2.version}</version>

Reply via email to