Gary, can you please check your IDE settings for line endings? It is really difficult to see what changed if the whole file is marked as modified in the commit mail.
I suspect this is because your IDE converts the unix line endings to windows CRLF or something. On Sun, Aug 21, 2016 at 9:30 AM, <[email protected]> wrote: > Repository: logging-log4j2 > Updated Branches: > refs/heads/master 1aa3c3e24 -> f1b61bddf > > > Use Log4jThread to name the thread. > > Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo > Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/ > commit/f1b61bdd > Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/f1b61bdd > Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/f1b61bdd > > Branch: refs/heads/master > Commit: f1b61bddf640ecc5d2c80ea190ff7332577cb787 > Parents: 1aa3c3e > Author: Gary Gregory <[email protected]> > Authored: Sat Aug 20 17:30:16 2016 -0700 > Committer: Gary Gregory <[email protected]> > Committed: Sat Aug 20 17:30:16 2016 -0700 > > ---------------------------------------------------------------------- > .../core/appender/mom/kafka/KafkaManager.java | 185 +++++++++---------- > 1 file changed, 92 insertions(+), 93 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/ > f1b61bdd/log4j-core/src/main/java/org/apache/logging/log4j/ > core/appender/mom/kafka/KafkaManager.java > ---------------------------------------------------------------------- > diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/ > appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/ > org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java > index 4e4a09c..d535e02 100644 > --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/ > appender/mom/kafka/KafkaManager.java > +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/ > appender/mom/kafka/KafkaManager.java > @@ -1,93 +1,92 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one or more > - * contributor license agreements. See the NOTICE file distributed with > - * this work for additional information regarding copyright ownership. > - * The ASF licenses this file to You under the Apache license, Version 2.0 > - * (the "License"); you may not use this file except in compliance with > - * the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > - * See the license for the specific language governing permissions and > - * limitations under the license. > - */ > - > -package org.apache.logging.log4j.core.appender.mom.kafka; > - > -import java.util.Properties; > -import java.util.concurrent.ExecutionException; > -import java.util.concurrent.TimeUnit; > -import java.util.concurrent.TimeoutException; > - > -import org.apache.kafka.clients.producer.Producer; > -import org.apache.kafka.clients.producer.ProducerRecord; > -import org.apache.logging.log4j.core.appender.AbstractManager; > -import org.apache.logging.log4j.core.config.Property; > -import org.apache.logging.log4j.core.util.Log4jThread; > - > -public class KafkaManager extends AbstractManager { > - > - public static final String DEFAULT_TIMEOUT_MILLIS = "30000"; > - > - /** > - * package-private access for testing. > - */ > - static KafkaProducerFactory producerFactory = new > DefaultKafkaProducerFactory(); > - > - private final Properties config = new Properties(); > - private Producer<byte[], byte[]> producer = null; > - private final int timeoutMillis; > - > - private final String topic; > - > - public KafkaManager(final String name, final String topic, final > Property[] properties) { > - super(name); > - this.topic = topic; > - config.setProperty("key.serializer", "org.apache.kafka.common. > serialization.ByteArraySerializer"); > - config.setProperty("value.serializer", "org.apache.kafka.common. > serialization.ByteArraySerializer"); > - config.setProperty("batch.size", "0"); > - for (final Property property : properties) { > - config.setProperty(property.getName(), property.getValue()); > - } > - this.timeoutMillis = Integer.parseInt(config.getProperty(" > timeout.ms", DEFAULT_TIMEOUT_MILLIS)); > - } > - > - @Override > - public void releaseSub() { > - if (producer != null) { > - // This thread is a workaround for this Kafka issue: > https://issues.apache.org/jira/browse/KAFKA-1660 > - final Thread closeThread = new Log4jThread(new Runnable() { > - @Override > - public void run() { > - producer.close(); > - } > - }); > - closeThread.setName("KafkaManager-CloseThread"); > - closeThread.setDaemon(true); // avoid blocking JVM shutdown > - closeThread.start(); > - try { > - closeThread.join(timeoutMillis); > - } catch (final InterruptedException ignore) { > - // ignore > - } > - } > - } > - > - public void send(final byte[] msg) throws ExecutionException, > InterruptedException, TimeoutException { > - if (producer != null) { > - producer.send(new ProducerRecord<byte[], byte[]>(topic, > msg)).get(timeoutMillis, TimeUnit.MILLISECONDS); > - } > - } > - > - public void startup() { > - producer = producerFactory.newKafkaProducer(config); > - } > - > - public String getTopic() { > - return topic; > - } > - > -} > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache license, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the license for the specific language governing permissions and > + * limitations under the license. > + */ > + > +package org.apache.logging.log4j.core.appender.mom.kafka; > + > +import java.util.Properties; > +import java.util.concurrent.ExecutionException; > +import java.util.concurrent.TimeUnit; > +import java.util.concurrent.TimeoutException; > + > +import org.apache.kafka.clients.producer.Producer; > +import org.apache.kafka.clients.producer.ProducerRecord; > +import org.apache.logging.log4j.core.appender.AbstractManager; > +import org.apache.logging.log4j.core.config.Property; > +import org.apache.logging.log4j.core.util.Log4jThread; > + > +public class KafkaManager extends AbstractManager { > + > + public static final String DEFAULT_TIMEOUT_MILLIS = "30000"; > + > + /** > + * package-private access for testing. > + */ > + static KafkaProducerFactory producerFactory = new > DefaultKafkaProducerFactory(); > + > + private final Properties config = new Properties(); > + private Producer<byte[], byte[]> producer = null; > + private final int timeoutMillis; > + > + private final String topic; > + > + public KafkaManager(final String name, final String topic, final > Property[] properties) { > + super(name); > + this.topic = topic; > + config.setProperty("key.serializer", "org.apache.kafka.common. > serialization.ByteArraySerializer"); > + config.setProperty("value.serializer", "org.apache.kafka.common. > serialization.ByteArraySerializer"); > + config.setProperty("batch.size", "0"); > + for (final Property property : properties) { > + config.setProperty(property.getName(), property.getValue()); > + } > + this.timeoutMillis = Integer.parseInt(config.getProperty(" > timeout.ms", DEFAULT_TIMEOUT_MILLIS)); > + } > + > + @Override > + public void releaseSub() { > + if (producer != null) { > + // This thread is a workaround for this Kafka issue: > https://issues.apache.org/jira/browse/KAFKA-1660 > + final Thread closeThread = new Log4jThread(new Runnable() { > + @Override > + public void run() { > + producer.close(); > + } > + }, "KafkaManager-CloseThread"); > + closeThread.setDaemon(true); // avoid blocking JVM shutdown > + closeThread.start(); > + try { > + closeThread.join(timeoutMillis); > + } catch (final InterruptedException ignore) { > + // ignore > + } > + } > + } > + > + public void send(final byte[] msg) throws ExecutionException, > InterruptedException, TimeoutException { > + if (producer != null) { > + producer.send(new ProducerRecord<byte[], byte[]>(topic, > msg)).get(timeoutMillis, TimeUnit.MILLISECONDS); > + } > + } > + > + public void startup() { > + producer = producerFactory.newKafkaProducer(config); > + } > + > + public String getTopic() { > + return topic; > + } > + > +} > >
