Author: markt Date: Mon Jul 3 19:22:47 2017 New Revision: 1800708 URL: http://svn.apache.org/viewvc?rev=1800708&view=rev Log: Fix https://bz.apache.org/bugzilla/show_bug.cgi?id=51513 Add support for the compressionMinSize attribute to the GzipInterceptor, add optional statistics collection and expose the Interceptor over JMX. Based on a patch by Christian Stöber.
Added: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java (with props) Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties tomcat/trunk/webapps/docs/changelog.xml Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java?rev=1800708&r1=1800707&r2=1800708&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java Mon Jul 3 19:22:47 2017 @@ -20,6 +20,8 @@ package org.apache.catalina.tribes.group import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -36,38 +38,108 @@ import org.apache.juli.logging.LogFactor /** * @version 1.0 */ -public class GzipInterceptor extends ChannelInterceptorBase { +public class GzipInterceptor extends ChannelInterceptorBase implements GzipInterceptorMBean { private static final Log log = LogFactory.getLog(GzipInterceptor.class); protected static final StringManager sm = StringManager.getManager(GzipInterceptor.class); public static final int DEFAULT_BUFFER_SIZE = 2048; + public static final int DEFAULT_OPTION_COMPRESSION_ENABLE = 0x0100; + + private int compressionMinSize = 0; + private volatile boolean statsEnabled = false; + private int interval = 0; + + // Stats + private final AtomicInteger count = new AtomicInteger(); + private final AtomicInteger countCompressedTX = new AtomicInteger(); + private final AtomicInteger countUncompressedTX = new AtomicInteger(); + private final AtomicInteger countCompressedRX = new AtomicInteger(); + private final AtomicInteger countUncompressedRX = new AtomicInteger(); + private final AtomicLong sizeTX = new AtomicLong(); + private final AtomicLong compressedSizeTX = new AtomicLong(); + private final AtomicLong uncompressedSizeTX = new AtomicLong(); + private final AtomicLong sizeRX = new AtomicLong(); + private final AtomicLong compressedSizeRX = new AtomicLong(); + private final AtomicLong uncompressedSizeRX = new AtomicLong(); + + + public GzipInterceptor() { + setOptionFlag(DEFAULT_OPTION_COMPRESSION_ENABLE); + } + @Override - public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) + throws ChannelException { try { - byte[] data = compress(msg.getMessage().getBytes()); + byte[] data = msg.getMessage().getBytes(); + if (statsEnabled) { + sizeTX.addAndGet(data.length); + } + + if (data.length > compressionMinSize) { + data = compress(data); + // Set the flag that indicates that the message is compressed + msg.setOptions(msg.getOptions() | getOptionFlag()); + if (statsEnabled) { + countCompressedTX.incrementAndGet(); + compressedSizeTX.addAndGet(data.length); + } + } else if (statsEnabled){ + countUncompressedTX.incrementAndGet(); + uncompressedSizeTX.addAndGet(data.length); + } + msg.getMessage().trim(msg.getMessage().getLength()); msg.getMessage().append(data,0,data.length); super.sendMessage(destination, msg, payload); + + int currentCount = count.incrementAndGet(); + if (statsEnabled && interval > 0 && currentCount % interval == 0) { + report(); + } } catch ( IOException x ) { log.error(sm.getString("gzipInterceptor.compress.failed")); throw new ChannelException(x); } } + @Override public void messageReceived(ChannelMessage msg) { try { - byte[] data = decompress(msg.getMessage().getBytes()); + byte[] data = msg.getMessage().getBytes(); + if ((msg.getOptions() & getOptionFlag()) > 0) { + if (statsEnabled) { + countCompressedRX.incrementAndGet(); + compressedSizeRX.addAndGet(data.length); + } + // Message was compressed + data = decompress(data); + } else if (statsEnabled) { + countUncompressedRX.incrementAndGet(); + uncompressedSizeRX.addAndGet(data.length); + } + + if (statsEnabled) { + sizeRX.addAndGet(data.length); + } + msg.getMessage().trim(msg.getMessage().getLength()); msg.getMessage().append(data,0,data.length); super.messageReceived(msg); + + int currentCount = count.incrementAndGet(); + if (statsEnabled && interval > 0 && currentCount % interval == 0) { + report(); + } } catch ( IOException x ) { log.error(sm.getString("gzipInterceptor.decompress.failed"),x); } } + public static byte[] compress(byte[] data) throws IOException { ByteArrayOutputStream bout = new ByteArrayOutputStream(); GZIPOutputStream gout = new GZIPOutputStream(bout); @@ -77,6 +149,7 @@ public class GzipInterceptor extends Cha return bout.toByteArray(); } + /** * @param data Data to decompress * @return Decompressed data @@ -95,4 +168,134 @@ public class GzipInterceptor extends Cha } return bout.toByteArray(); } + + + @Override + public void report() { + log.info(sm.getString("gzipInterceptor.report", Integer.valueOf(getCount()), + Integer.valueOf(getCountCompressedTX()), Integer.valueOf(getCountUncompressedTX()), + Integer.valueOf(getCountCompressedRX()), Integer.valueOf(getCountUncompressedRX()), + Long.valueOf(getSizeTX()), Long.valueOf(getCompressedSizeTX()), + Long.valueOf(getUncompressedSizeTX()), + Long.valueOf(getSizeRX()), Long.valueOf(getCompressedSizeRX()), + Long.valueOf(getUncompressedSizeRX()))); + } + + + @Override + public int getCompressionMinSize() { + return compressionMinSize; + } + + + @Override + public void setCompressionMinSize(int compressionMinSize) { + this.compressionMinSize = compressionMinSize; + } + + + @Override + public boolean getStatsEnabled() { + return statsEnabled; + } + + + @Override + public void setStatsEnabled(boolean statsEnabled) { + this.statsEnabled = statsEnabled; + } + + + @Override + public int getInterval() { + return interval; + } + + + @Override + public void setInterval(int interval) { + this.interval = interval; + } + + + @Override + public int getCount() { + return count.get(); + } + + + @Override + public int getCountCompressedTX() { + return countCompressedTX.get(); + } + + + @Override + public int getCountUncompressedTX() { + return countUncompressedTX.get(); + } + + + @Override + public int getCountCompressedRX() { + return countCompressedRX.get(); + } + + + @Override + public int getCountUncompressedRX() { + return countUncompressedRX.get(); + } + + + @Override + public long getSizeTX() { + return sizeTX.get(); + } + + + @Override + public long getCompressedSizeTX() { + return compressedSizeTX.get(); + } + + + @Override + public long getUncompressedSizeTX() { + return uncompressedSizeTX.get(); + } + + + @Override + public long getSizeRX() { + return sizeRX.get(); + } + + + @Override + public long getCompressedSizeRX() { + return compressedSizeRX.get(); + } + + + @Override + public long getUncompressedSizeRX() { + return uncompressedSizeRX.get(); + } + + + @Override + public void reset() { + count.set(0); + countCompressedTX.set(0); + countUncompressedTX.set(0); + countCompressedRX.set(0); + countUncompressedRX.set(0); + sizeTX.set(0); + compressedSizeTX.set(0); + uncompressedSizeTX.set(0); + sizeRX.set(0); + compressedSizeRX.set(0); + uncompressedSizeRX.set(0); + } } Added: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java?rev=1800708&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java (added) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java Mon Jul 3 19:22:47 2017 @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.group.interceptors; + +public interface GzipInterceptorMBean { + + // Config + public int getOptionFlag(); + public void setOptionFlag(int optionFlag); + + /** + * @return the minimum payload size for compression to be enabled. + */ + public int getCompressionMinSize(); + /** + * Set the minimum payload size for compression to be enabled. A value of + * zero or less means compression will always be used. If not explicitly + * configured, a default of zero will be used. + * + * @param compressionMinSize The new minimum payload size + */ + public void setCompressionMinSize(int compressionMinSize); + + /** + * @return {@code true} if the interceptor is configured to collect + * statistics, otherwise {@code false} + */ + public boolean getStatsEnabled(); + /** + * Configure whether the interceptor collects statistics. + * + * @param statsEnabled {@code true} to enable statistics collections, + * otherwise {@code false} + */ + public void setStatsEnabled(boolean statsEnabled); + + /** + * @return If statistics collection is enabled, the number of messages + * between statistics reports being written to the log. + */ + public int getInterval(); + /** + * If statistics collection is enabled, set the number of messages between + * statistics reports being written to the log. A value of zero or less + * means no statistics reports are written. + * + * @param interval The new interval between reports + */ + public void setInterval(int interval); + + // Stats + public int getCount(); + public int getCountCompressedTX(); + public int getCountUncompressedTX(); + public int getCountCompressedRX(); + public int getCountUncompressedRX(); + public long getSizeTX(); + public long getCompressedSizeTX(); + public long getUncompressedSizeTX(); + public long getSizeRX(); + public long getCompressedSizeRX(); + public long getUncompressedSizeRX(); + public void reset(); + public void report(); +} Propchange: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties?rev=1800708&r1=1800707&r2=1800708&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties Mon Jul 3 19:22:47 2017 @@ -19,6 +19,19 @@ fragmentationInterceptor.heartbeat.faile fragmentationInterceptor.fragments.missing=Fragments are missing. gzipInterceptor.compress.failed=Unable to compress byte contents gzipInterceptor.decompress.failed=Unable to decompress byte contents +gzipInterceptor.report=GZip Interceptor Report[\ + \n\tTotal Messages: {0}\ + \n\tTx Messages Compressed: {1}\ + \n\tTx Messages Uncompressed: {2}\ + \n\tRx Messages Compressed: {3}\ + \n\tRx Messages Uncompressed: {4}\ + \n\tTotal Tx bytes: {5}\ + \n\tCompressed Tx bytes: {6}\ + \n\tUncompressed Tx bytes: {7}\ + \n\tTotal Rx bytes: {8}\ + \n\tCompressed Rx bytes: {9}\ + \n\tUncompressed Rx bytes: {10}\ + \n] messageDispatchInterceptor.queue.full=Asynchronous queue is full, reached its limit of [{0}] bytes, current:[{1}] bytes. messageDispatchInterceptor.unableAdd.queue=Unable to add the message to the async queue, queue bug? messageDispatchInterceptor.warning.optionflag=Warning, you are overriding the asynchronous option flag, this will disable the Channel.SEND_OPTIONS_ASYNCHRONOUS that other apps might use. @@ -54,7 +67,7 @@ throughputInterceptor.report=ThroughputI \n\tSent:{2} MB (application)\ \n\tTime:{3} seconds\ \n\tTx Speed:{4} MB/sec (total)\ - \n\tTxSpeed:{5} MB/sec (application)\ + \n\tTx Speed:{5} MB/sec (application)\ \n\tError Msg:{6}\ \n\tRx Msg:{7} messages\ \n\tRx Speed:{8} MB/sec (since 1st msg)\ Modified: tomcat/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1800708&r1=1800707&r2=1800708&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/changelog.xml (original) +++ tomcat/trunk/webapps/docs/changelog.xml Mon Jul 3 19:22:47 2017 @@ -138,6 +138,12 @@ <subsection name="Tribes"> <changelog> <add> + <bug>51513</bug>: Add support for the <code>compressionMinSize</code> + attribute to the <code>GzipInterceptor</code>, add optional statistics + collection and expose the Interceptor over JMX. Based on a patch by + Christian Stöber. (markt) + </add> + <add> <bug>61127</bug>Allow human-readable names for channelSendOptions and mapSendOptions. Patch provided by Igal Sapir. (schultz) </add> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org