This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 5527e2f Add contentLogLevel task property new 569cc3e Merge pull request #193 from fvaleri/log-payload 5527e2f is described below commit 5527e2fd9d46c1293cf531e233ca9cf43385704b Author: Federico Valeri <fvaleri@localhost> AuthorDate: Wed May 6 09:59:37 2020 +0200 Add contentLogLevel task property --- core/pom.xml | 5 + .../kafkaconnector/CamelSinkConnectorConfig.java | 8 +- .../apache/camel/kafkaconnector/CamelSinkTask.java | 4 +- .../kafkaconnector/CamelSourceConnectorConfig.java | 10 +- .../camel/kafkaconnector/CamelSourceTask.java | 38 ++-- .../camel/kafkaconnector/utils/TaskHelper.java | 42 +++++ .../camel/kafkaconnector/CamelSinkTaskTest.java | 3 + .../camel/kafkaconnector/CamelSourceTaskTest.java | 3 + .../camel/kafkaconnector/utils/TaskHelperTest.java | 200 +++++++++++++++++---- parent/pom.xml | 7 + 10 files changed, 267 insertions(+), 53 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 91f8ec0..f082a9c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -89,6 +89,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-ext</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <scope>test</scope> diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index c353c58..666661b 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector; import java.util.Map; +import org.apache.camel.LoggingLevel; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -42,10 +43,15 @@ public class CamelSinkConnectorConfig extends AbstractConfig { public static final String TOPIC_CONF = "topics"; public static final String TOPIC_DOC = "A list of topics to use as input for this connector"; + public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT = LoggingLevel.OFF.toString(); + public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_CONF = "camel.sink.contentLogLevel"; + public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DOC = "Log level for the record's content (default: " + CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT + "). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF."; + private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC) .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC) - .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC); + .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC) + .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC); public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index bd0870b..15ccd2b 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -110,6 +110,7 @@ public class CamelSinkTask extends SinkTask { @Override public void put(Collection<SinkRecord> sinkRecords) { for (SinkRecord record : sinkRecords) { + TaskHelper.logRecordContent(LOG, record, config); Map<String, Object> headers = new HashMap<String, Object>(); Exchange exchange = new DefaultExchange(producer.getCamelContext()); headers.put(KAFKA_RECORD_KEY_HEADER, record.key()); @@ -123,7 +124,8 @@ public class CamelSinkTask extends SinkTask { } exchange.getMessage().setHeaders(headers); exchange.getMessage().setBody(record.value()); - LOG.debug("Sending {} to {}", exchange, LOCAL_URL); + + LOG.debug("Sending exchange {} to {}", exchange.getExchangeId(), LOCAL_URL); producer.send(LOCAL_URL, exchange); } } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index 34e2495..01d9fd3 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector; import java.util.Map; +import org.apache.camel.LoggingLevel; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -60,13 +61,17 @@ public class CamelSourceConnectorConfig extends AbstractConfig { public static final Boolean CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT = true; public static final String CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF = "camel.source.pollingConsumerBlockWhenFull"; - public static final String CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC = " Whether to block any producer if the internal queue is full."; + public static final String CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC = "Whether to block any producer if the internal queue is full."; public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT = null; public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF = "camel.source.camelMessageHeaderKey"; public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The name of a camel message header containing an unique key that can be used as a Kafka message key." + " If this is not specified, then the Kafka message will not have a key."; + public static final String CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT = LoggingLevel.OFF.toString(); + public static final String CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF = "camel.source.contentLogLevel"; + public static final String CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC = "Log level for the record's content (default: " + CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT + "). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF."; + private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_URL_DOC) .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC) @@ -77,7 +82,8 @@ public class CamelSourceConnectorConfig extends AbstractConfig { .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC) .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC) .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC) - .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC); + .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC) + .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC); public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index c9123ed..c8333ed 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -87,10 +87,8 @@ public class CamelSourceTask extends SourceTask { CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(), - actualProps, - config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), - CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, - CAMEL_SOURCE_PATH_PROPERTIES_PREFIX); + actualProps, config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), + CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX); } cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, unmarshaller, camelContext); @@ -113,35 +111,41 @@ public class CamelSourceTask extends SourceTask { List<SourceRecord> records = new ArrayList<>(); - while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) { + while (collectedRecords < maxBatchPollSize + && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) { Exchange exchange = consumer.receiveNoWait(); if (exchange != null) { - LOG.debug("Received exchange with"); - LOG.debug("\t from endpoint: {}", exchange.getFromEndpoint()); - LOG.debug("\t exchange id: {}", exchange.getExchangeId()); - LOG.debug("\t message id: {}", exchange.getMessage().getMessageId()); - LOG.debug("\t message body: {}", exchange.getMessage().getBody()); - LOG.debug("\t message headers: {}", exchange.getMessage().getHeaders()); - LOG.debug("\t message properties: {}", exchange.getProperties()); + LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), + exchange.getMessage().getMessageId(), exchange.getFromEndpoint()); // TODO: see if there is a better way to use sourcePartition an sourceOffset - Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString()); + Map<String, String> sourcePartition = Collections.singletonMap("filename", + exchange.getFromEndpoint().toString()); Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId()); - final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null; + final Object messageHeaderKey = camelMessageHeaderKey != null + ? exchange.getMessage().getHeader(camelMessageHeaderKey) + : null; final Object messageBodyValue = exchange.getMessage().getBody(); - final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null; - final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null; + final Schema messageKeySchema = messageHeaderKey != null + ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) + : null; + final Schema messageBodySchema = messageBodyValue != null + ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) + : null; - SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue); + SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, messageKeySchema, + messageHeaderKey, messageBodySchema, messageBodyValue); if (exchange.getMessage().hasHeaders()) { setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX); } if (exchange.hasProperties()) { setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX); } + + TaskHelper.logRecordContent(LOG, record, config); records.add(record); collectedRecords++; } else { diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java index c623662..672f13b 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java @@ -22,7 +22,14 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import org.apache.camel.LoggingLevel; import org.apache.camel.catalog.RuntimeCamelCatalog; +import org.apache.camel.kafkaconnector.CamelSinkConnectorConfig; +import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; public final class TaskHelper { @@ -93,4 +100,39 @@ public final class TaskHelper { } return false; } + + public static <CFG extends AbstractConfig> void logRecordContent(Logger logger, ConnectRecord<?> record, CFG config) { + if (logger != null && record != null && config != null) { + // do not log record's content by default, as it may contain sensitive information + LoggingLevel level = LoggingLevel.OFF; + try { + final String key = (record instanceof SourceRecord) + ? CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF + : CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF; + level = LoggingLevel.valueOf(config.getString(key).toUpperCase()); + } catch (Exception e) { + logger.warn("Invalid value for contentLogLevel property"); + } + switch (level) { + case TRACE: + logger.trace(record.toString()); + break; + case DEBUG: + logger.debug(record.toString()); + break; + case INFO: + logger.info(record.toString()); + break; + case WARN: + logger.warn(record.toString()); + break; + case ERROR: + logger.error(record.toString()); + break; + default: + break; + } + } + } + } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 24cd03c..4941741 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.camel.ConsumerTemplate; import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.sink.SinkRecord; @@ -56,6 +57,8 @@ public class CamelSinkTaskTest { Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); sinkTask.stop(); } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 33807ee..3032cba 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.camel.LoggingLevel; import org.apache.camel.ProducerTemplate; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.source.SourceRecord; @@ -56,6 +57,8 @@ public class CamelSourceTaskTest { assertEquals(size, poll.size()); assertEquals(TOPIC_NAME, poll.get(0).topic()); + assertEquals(LoggingLevel.OFF.toString(), sourceTask.getCamelSourceConnectorConfig(props) + .getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF)); sourceTask.stop(); } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java index 3cfe5ed..bc0f537 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java @@ -20,16 +20,24 @@ import java.net.URISyntaxException; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.LoggingLevel; import org.apache.camel.catalog.RuntimeCamelCatalog; import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig; +import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.ext.LoggerWrapper; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; - public class TaskHelperTest { @Test @@ -41,13 +49,17 @@ public class TaskHelperTest { @Test public void testMergePropertiesDefaultAreAdded() { - Map<String, String> defaults = new HashMap<String, String>() {{ + Map<String, String> defaults = new HashMap<String, String>() { + { put("property", "defaultValue"); - }}; + } + }; - Map<String, String> loaded = new HashMap<String, String>() {{ + Map<String, String> loaded = new HashMap<String, String>() { + { put("anotherProperty", "loadedValue"); - }}; + } + }; Map result = TaskHelper.mergeProperties(defaults, loaded); @@ -59,13 +71,17 @@ public class TaskHelperTest { @Test public void testMergePropertiesLoadedHavePrecedence() { - Map<String, String> defaults = new HashMap<String, String>() {{ + Map<String, String> defaults = new HashMap<String, String>() { + { put("property", "defaultValue"); - }}; + } + }; - Map<String, String> loaded = new HashMap<String, String>() {{ + Map<String, String> loaded = new HashMap<String, String>() { + { put("property", "loadedValue"); - }}; + } + }; Map result = TaskHelper.mergeProperties(defaults, loaded); @@ -75,17 +91,21 @@ public class TaskHelperTest { @Test public void testMergePropertiesLoadedHavePrecedenceWithPrefixFiltering() { - Map<String, String> defaults = new HashMap<String, String>() {{ + Map<String, String> defaults = new HashMap<String, String>() { + { put("property", "defaultValue"); put("camel.component.x.objectProperty", "#class:my.package.MyClass"); put("camel.component.x.objectProperty.field", "defaultValue"); - }}; + } + }; - Map<String, String> loaded = new HashMap<String, String>() {{ + Map<String, String> loaded = new HashMap<String, String>() { + { put("camel.component.x.objectProperty", "#class:my.package.MyOtherClass"); put("camel.component.x.objectProperty.anotherField", "loadedValue"); put("camel.component.x.normalProperty", "loadedValue"); - }}; + } + }; Map result = TaskHelper.mergeProperties(defaults, loaded); @@ -98,10 +118,12 @@ public class TaskHelperTest { @Test public void testCreateEndpointOptionsFromProperties() { - Map<String, String> props = new HashMap<String, String>() {{ + Map<String, String> props = new HashMap<String, String>() { + { put("prefix.key1", "value1"); put("notprefix.key2", "value2"); - }}; + } + }; String result = TaskHelper.createEndpointOptionsFromProperties(props, "prefix."); @@ -110,10 +132,12 @@ public class TaskHelperTest { @Test public void testCreateEndpointOptionsFromPropertiesConcatenation() { - Map<String, String> props = new HashMap<String, String>() {{ + Map<String, String> props = new HashMap<String, String>() { + { put("prefix.key1", "value1"); put("prefix.key2", "value2"); - }}; + } + }; String result = TaskHelper.createEndpointOptionsFromProperties(props, "prefix."); @@ -122,10 +146,12 @@ public class TaskHelperTest { @Test public void testCreateEndpointOptionsFromPropertiesEmpty() { - Map<String, String> props = new HashMap<String, String>() {{ + Map<String, String> props = new HashMap<String, String>() { + { put("prefix.key1", "value1"); put("notprefix.key2", "value2"); - }}; + } + }; String result = TaskHelper.createEndpointOptionsFromProperties(props, "anotherprefix"); @@ -134,10 +160,12 @@ public class TaskHelperTest { @Test public void testCreateUrlPathFromProperties() { - Map<String, String> props = new HashMap<String, String>() {{ + Map<String, String> props = new HashMap<String, String>() { + { put("prefix.key1", "value1"); put("notprefix.key2", "value2"); - }}; + } + }; String result = TaskHelper.createUrlPathFromProperties(props, "prefix."); @@ -146,10 +174,12 @@ public class TaskHelperTest { @Test public void testCreateUrlPathFromPropertiesConcatenation() { - Map<String, String> props = new HashMap<String, String>() {{ + Map<String, String> props = new HashMap<String, String>() { + { put("prefix.key1", "value1"); put("prefix.key2", "value2"); - }}; + } + }; String result = TaskHelper.createUrlPathFromProperties(props, "prefix."); @@ -158,10 +188,12 @@ public class TaskHelperTest { @Test public void testCreateUrlPathFromPropertiesEmpty() { - Map<String, String> props = new HashMap<String, String>() {{ + Map<String, String> props = new HashMap<String, String>() { + { put("prefix.key1", "value1"); put("notprefix.key2", "value2"); - }}; + } + }; String result = TaskHelper.createUrlPathFromProperties(props, "anotherprefix"); @@ -170,10 +202,12 @@ public class TaskHelperTest { @Test public void testBuildUrl() { - Map<String, String> props = new HashMap<String, String>() {{ + Map<String, String> props = new HashMap<String, String>() { + { put("prefix.key1", "value1"); put("anotherPrefix.key2", "value2"); - }}; + } + }; String result = TaskHelper.buildUrl(props, "test", "prefix.", "anotherPrefix."); @@ -184,23 +218,125 @@ public class TaskHelperTest { public void testBuildUrlWithRuntimeCatalog() throws URISyntaxException { DefaultCamelContext dcc = new DefaultCamelContext(); RuntimeCamelCatalog rcc = dcc.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(); - Map<String, String> props = new HashMap<String, String>() {{ + Map<String, String> props = new HashMap<String, String>() { + { put("camel.source.path.name", "test"); put("camel.source.endpoint.synchronous", "true"); - }}; + } + }; String result = TaskHelper.buildUrl(rcc, props, "direct", "camel.source.endpoint.", "camel.source.path."); assertEquals("direct:test?synchronous=true", result); - props = new HashMap<String, String>() {{ + props = new HashMap<String, String>() { + { put("camel.source.path.port", "8080"); put("camel.source.path.keyspace", "test"); put("camel.source.path.hosts", "localhost"); - }}; + } + }; result = TaskHelper.buildUrl(rcc, props, "cql", "camel.source.endpoint.", "camel.source.path."); assertEquals("cql:localhost:8080/test", result); } -} \ No newline at end of file + + private CamelSourceConnectorConfig getSourceConnectorConfig(String logLevel) { + return new CamelSourceConnectorConfig(CamelSourceConnectorConfig.conf(), + Collections.singletonMap(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, logLevel)); + } + + @Test + public void testlogRecordContent() { + String partName = "abc123"; + Logger logger = new MyLogger(LoggerFactory.getLogger(TaskHelperTest.class), null); + SourceRecord record = new SourceRecord(Collections.singletonMap("partition", partName), + Collections.singletonMap("offset", "0"), null, null, null, null); + Queue<String> logEvents = ((MyLogger)logger).getEvents(); + + String offLevel = LoggingLevel.OFF.toString(); + TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(offLevel)); + assertNull(logEvents.poll()); + + String traceLevel = LoggingLevel.TRACE.toString(); + TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(traceLevel)); + assertTrue(logEvents.peek().contains(traceLevel) && logEvents.poll().contains(partName)); + + String debugLevel = LoggingLevel.DEBUG.toString(); + TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(debugLevel)); + assertTrue(logEvents.peek().contains(debugLevel) && logEvents.poll().contains(partName)); + + String infoLevel = LoggingLevel.INFO.toString(); + TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(infoLevel)); + assertTrue(logEvents.peek().contains(infoLevel) && logEvents.poll().contains(partName)); + + String warnLevel = LoggingLevel.WARN.toString(); + TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(warnLevel)); + assertTrue(logEvents.peek().contains(warnLevel) && logEvents.poll().contains(partName)); + + String errorLevel = LoggingLevel.ERROR.toString(); + TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(errorLevel)); + assertTrue(logEvents.peek().contains(errorLevel) && logEvents.poll().contains(partName)); + + TaskHelper.logRecordContent(null, record, getSourceConnectorConfig(debugLevel)); + assertNull(logEvents.poll()); + + TaskHelper.logRecordContent(logger, null, getSourceConnectorConfig(debugLevel)); + assertNull(logEvents.poll()); + + TaskHelper.logRecordContent(logger, record, null); + assertNull(logEvents.poll()); + + String invalidLevel = "NOLOG"; + TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(invalidLevel)); + assertTrue(logEvents.poll().contains(warnLevel)); + + TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(null)); + assertTrue(logEvents.poll().contains(warnLevel)); + } + + class MyLogger extends LoggerWrapper { + private Queue<String> events = new ConcurrentLinkedQueue<String>(); + + public MyLogger(Logger logger, String fqcn) { + super(logger, fqcn); + } + + public Queue<String> getEvents() { + return events; + } + + private void log(LoggingLevel level, String msg) { + StringBuilder sb = new StringBuilder() + .append(level).append(" ").append(msg); + events.add(sb.toString()); + } + + @Override + public void trace(String msg) { + log(LoggingLevel.TRACE, msg); + } + + @Override + public void debug(String msg) { + log(LoggingLevel.DEBUG, msg); + } + + @Override + public void info(String msg) { + log(LoggingLevel.INFO, msg); + } + + @Override + public void warn(String msg) { + log(LoggingLevel.WARN, msg); + } + + @Override + public void error(String msg) { + log(LoggingLevel.ERROR, msg); + } + } + +} diff --git a/parent/pom.xml b/parent/pom.xml index 3c76418..9212b67 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -32,6 +32,7 @@ <junit.version>5.6.2</junit.version> <camel.version>3.3.0</camel.version> <jackson.version>2.10.3</jackson.version> + <slf4j.version>1.7.30</slf4j.version> <log4j2.version>2.8.2</log4j2.version> <version.commons-io>2.6</version.commons-io> <version.java>1.8</version.java> @@ -192,6 +193,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-ext</artifactId> + <version>${slf4j.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j2.version}</version>