[ https://issues.apache.org/jira/browse/KAFKA-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596214#comment-16596214 ]
ASF GitHub Bot commented on KAFKA-7134: --------------------------------------- rajinisivaram closed pull request #5415: KAFKA-7134: KafkaLog4jAppender - Appender exceptions are propagated t… URL: https://github.com/apache/kafka/pull/5415 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 9b4610d0b22..b13e94856df 100644 --- a/build.gradle +++ b/build.gradle @@ -1235,6 +1235,7 @@ project(':log4j-appender') { testCompile project(':clients').sourceSets.test.output testCompile libs.junit + testCompile libs.easymock } javadoc { diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index 6ddaf929a6d..0ae7dcada93 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -35,6 +35,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; @@ -64,10 +65,12 @@ private String saslKerberosServiceName; private String clientJaasConfPath; private String kerb5ConfPath; + private Integer maxBlockMs; private int retries = Integer.MAX_VALUE; private int requiredNumAcks = 1; private int deliveryTimeoutMs = 120000; + private boolean ignoreExceptions = true; private boolean syncSend; private Producer<byte[], byte[]> producer; @@ -123,6 +126,14 @@ public void setTopic(String topic) { this.topic = topic; } + public boolean getIgnoreExceptions() { + return ignoreExceptions; + } + + public void setIgnoreExceptions(boolean ignoreExceptions) { + this.ignoreExceptions = ignoreExceptions; + } + public boolean getSyncSend() { return syncSend; } @@ -203,6 +214,14 @@ public String getKerb5ConfPath() { return kerb5ConfPath; } + public int getMaxBlockMs() { + return maxBlockMs; + } + + public void setMaxBlockMs(int maxBlockMs) { + this.maxBlockMs = maxBlockMs; + } + @Override public void activateOptions() { // check for config parameter validity @@ -242,6 +261,9 @@ public void activateOptions() { System.setProperty("java.security.krb5.conf", kerb5ConfPath); } } + if (maxBlockMs != null) { + props.put(MAX_BLOCK_MS_CONFIG, maxBlockMs); + } props.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); @@ -259,12 +281,14 @@ protected void append(LoggingEvent event) { String message = subAppend(event); LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message); Future<RecordMetadata> response = producer.send( - new ProducerRecord<byte[], byte[]>(topic, message.getBytes(StandardCharsets.UTF_8))); + new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8))); if (syncSend) { try { response.get(); } catch (InterruptedException | ExecutionException ex) { - throw new RuntimeException(ex); + if (!ignoreExceptions) + throw new RuntimeException(ex); + LogLog.debug("Exception while getting response", ex); } } } diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java index 34be2e93c88..d5342e8f9e3 100644 --- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java +++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java @@ -16,18 +16,31 @@ */ package org.apache.kafka.log4jappender; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.ConfigException; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; +import org.apache.log4j.helpers.LogLog; +import org.easymock.EasyMock; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; public class KafkaLog4jAppenderTest { - Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class); + private Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class); + + @Before + public void setup() { + LogLog.setInternalDebugging(true); + } @Test public void testKafkaLog4jConfigs() { @@ -66,22 +79,103 @@ public void testKafkaLog4jConfigs() { @Test - public void testLog4jAppends() throws UnsupportedEncodingException { - PropertyConfigurator.configure(getLog4jConfig()); + public void testLog4jAppends() { + PropertyConfigurator.configure(getLog4jConfig(false)); for (int i = 1; i <= 5; ++i) { logger.error(getMessage(i)); } Assert.assertEquals( - 5, ((MockKafkaLog4jAppender) (logger.getRootLogger().getAppender("KAFKA"))).getHistory().size()); + 5, (getMockKafkaLog4jAppender()).getHistory().size()); + } + + @Test(expected = RuntimeException.class) + public void testLog4jAppendsWithSyncSendAndSimulateProducerFailShouldThrowException() { + Properties props = getLog4jConfig(true); + props.put("log4j.appender.KAFKA.IgnoreExceptions", "false"); + PropertyConfigurator.configure(props); + + MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender(); + replaceProducerWithMocked(mockKafkaLog4jAppender, false); + + logger.error(getMessage(0)); + } + + @Test + public void testLog4jAppendsWithSyncSendWithoutIgnoringExceptionsShouldNotThrowException() { + Properties props = getLog4jConfig(true); + props.put("log4j.appender.KAFKA.IgnoreExceptions", "false"); + PropertyConfigurator.configure(props); + + MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender(); + replaceProducerWithMocked(mockKafkaLog4jAppender, true); + + logger.error(getMessage(0)); + } + + @Test + public void testLog4jAppendsWithRealProducerConfigWithSyncSendShouldNotThrowException() { + Properties props = getLog4jConfigWithRealProducer(true); + PropertyConfigurator.configure(props); + + logger.error(getMessage(0)); + } + + @Test(expected = RuntimeException.class) + public void testLog4jAppendsWithRealProducerConfigWithSyncSendAndNotIgnoringExceptionsShouldThrowException() { + Properties props = getLog4jConfigWithRealProducer(false); + PropertyConfigurator.configure(props); + + logger.error(getMessage(0)); } - private byte[] getMessage(int i) throws UnsupportedEncodingException { - return ("test_" + i).getBytes("UTF-8"); + private void replaceProducerWithMocked(MockKafkaLog4jAppender mockKafkaLog4jAppender, boolean success) { + @SuppressWarnings("unchecked") + MockProducer<byte[], byte[]> producer = EasyMock.niceMock(MockProducer.class); + @SuppressWarnings("unchecked") + Future<RecordMetadata> futureMock = EasyMock.niceMock(Future.class); + try { + if (!success) + EasyMock.expect(futureMock.get()) + .andThrow(new ExecutionException("simulated timeout", new TimeoutException())); + } catch (InterruptedException | ExecutionException e) { + // just mocking + } + EasyMock.expect(producer.send(EasyMock.anyObject())).andReturn(futureMock); + EasyMock.replay(producer, futureMock); + // reconfiguring mock appender + mockKafkaLog4jAppender.setKafkaProducer(producer); + mockKafkaLog4jAppender.activateOptions(); + } + + private MockKafkaLog4jAppender getMockKafkaLog4jAppender() { + return (MockKafkaLog4jAppender) Logger.getRootLogger().getAppender("KAFKA"); + } + + private byte[] getMessage(int i) { + return ("test_" + i).getBytes(StandardCharsets.UTF_8); + } + + private Properties getLog4jConfigWithRealProducer(boolean ignoreExceptions) { + Properties props = new Properties(); + props.put("log4j.rootLogger", "INFO, KAFKA"); + props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender"); + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout"); + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n"); + props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.2:9093"); + props.put("log4j.appender.KAFKA.Topic", "test-topic"); + props.put("log4j.appender.KAFKA.RequiredNumAcks", "1"); + props.put("log4j.appender.KAFKA.SyncSend", "true"); + // setting producer timeout (max.block.ms) to be low + props.put("log4j.appender.KAFKA.maxBlockMs", "10"); + // ignoring exceptions + props.put("log4j.appender.KAFKA.IgnoreExceptions", Boolean.toString(ignoreExceptions)); + props.put("log4j.logger.kafka.log4j", "INFO, KAFKA"); + return props; } - private Properties getLog4jConfig() { + private Properties getLog4jConfig(boolean syncSend) { Properties props = new Properties(); props.put("log4j.rootLogger", "INFO, KAFKA"); props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.MockKafkaLog4jAppender"); @@ -90,7 +184,7 @@ private Properties getLog4jConfig() { props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.1:9093"); props.put("log4j.appender.KAFKA.Topic", "test-topic"); props.put("log4j.appender.KAFKA.RequiredNumAcks", "1"); - props.put("log4j.appender.KAFKA.SyncSend", "false"); + props.put("log4j.appender.KAFKA.SyncSend", Boolean.toString(syncSend)); props.put("log4j.logger.kafka.log4j", "INFO, KAFKA"); return props; } diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java index 8040be4ccc6..a9eb5fb0721 100644 --- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java +++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java @@ -34,6 +34,10 @@ return mockProducer; } + void setKafkaProducer(MockProducer<byte[], byte[]> producer) { + this.mockProducer = producer; + } + @Override protected void append(LoggingEvent event) { if (super.getProducer() == null) { @@ -42,7 +46,7 @@ protected void append(LoggingEvent event) { super.append(event); } - protected List<ProducerRecord<byte[], byte[]>> getHistory() { + List<ProducerRecord<byte[], byte[]>> getHistory() { return mockProducer.history(); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaLog4jAppender - Appender exceptions are propagated to caller > ----------------------------------------------------------------- > > Key: KAFKA-7134 > URL: https://issues.apache.org/jira/browse/KAFKA-7134 > Project: Kafka > Issue Type: Bug > Components: clients > Reporter: venkata praveen > Assignee: Andras Katona > Priority: Major > > KafkaLog4jAppender exceptions are propagated to caller when Kafka is > down/slow/other, it may cause the application crash. Ideally appender should > print and ignore the exception > or should provide option to ignore/throw the exceptions like > 'ignoreExceptions' property of > https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender -- This message was sent by Atlassian JIRA (v7.6.3#76005)