[jira] [Commented] (KAFKA-6430) Improve Kafka GZip compression performance

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16367993#comment-16367993
 ] 

ASF GitHub Bot commented on KAFKA-6430:
---

ijuma closed pull request #4537: KAFKA-6430: Add buffer between Java data 
stream and gzip stream
URL: https://github.com/apache/kafka/pull/4537
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 16d6e01f097..9b3bfc45983 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -20,6 +20,8 @@
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.invoke.MethodHandle;
@@ -49,8 +51,10 @@ public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSu
 @Override
 public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte 
messageVersion) {
 try {
-// GZIPOutputStream has a default buffer size of 512 bytes, 
which is too small
-return new GZIPOutputStream(buffer, 8 * 1024);
+// Set input buffer (uncompressed) to 16 KB (none by default) 
and output buffer (compressed) to
+// 8 KB (0.5 KB by default) to ensure reasonable performance 
in cases where the caller passes a small
+// number of bytes to write (potentially a single byte)
+return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 
* 1024), 16 * 1024);
 } catch (Exception e) {
 throw new KafkaException(e);
 }
@@ -59,7 +63,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 @Override
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 try {
-return new GZIPInputStream(new ByteBufferInputStream(buffer));
+// Set output buffer (uncompressed) to 16 KB (none by default) 
and input buffer (compressed) to
+// 8 KB (0.5 KB by default) to ensure reasonable performance 
in cases where the caller reads a small
+// number of bytes (potentially a single byte)
+return new BufferedInputStream(new GZIPInputStream(new 
ByteBufferInputStream(buffer), 8 * 1024),
+16 * 1024);
 } catch (Exception e) {
 throw new KafkaException(e);
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Kafka GZip compression performance
> --
>
> Key: KAFKA-6430
> URL: https://issues.apache.org/jira/browse/KAFKA-6430
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, compression, core
>Reporter: Ying Zheng
>Priority: Minor
>
> To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
>   new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
> To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
>new DataInputStream(new GZIPInputStream(buffer));
> This is very straight forward, but actually inefficient. For each message, in 
> addition to the key and value data, Kafka has to write about 30 some metadata 
> bytes (slightly varies in different Kafka version), including magic byte, 
> checksum, timestamp, offset, key length, value length etc. For each of these 
> bytes, java DataOutputStream has to call write(byte) once. Here is the 
> awkward writeInt() method in DataOutputStream, which writes 4 bytes 
> separately in big-endian order. 
> {code}
> public final void writeInt(int v) throws IOException {
> out.write((v >>> 24) & 0xFF);
> out.write((v >>> 16) & 0xFF);
> out.write((v >>>  8) & 0xFF);
> out.write((v >>>  0) & 0xFF);
> incCount(4);
> }
> {code}
> Unfortunately, GZIPOutputStream does not implement the write(byte) method. 
> Instead, it only 

[jira] [Commented] (KAFKA-6430) Improve Kafka GZip compression performance

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354798#comment-16354798
 ] 

ASF GitHub Bot commented on KAFKA-6430:
---

ying-zheng opened a new pull request #4537: KAFKA-6430: Add buffer between Java 
data stream and gzip stream
URL: https://github.com/apache/kafka/pull/4537
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Kafka GZip compression performance
> --
>
> Key: KAFKA-6430
> URL: https://issues.apache.org/jira/browse/KAFKA-6430
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, compression, core
>Reporter: Ying Zheng
>Priority: Minor
>
> To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
>   new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
> To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
>new DataInputStream(new GZIPInputStream(buffer));
> This is very straight forward, but actually inefficient. For each message, in 
> addition to the key and value data, Kafka has to write about 30 some metadata 
> bytes (slightly varies in different Kafka version), including magic byte, 
> checksum, timestamp, offset, key length, value length etc. For each of these 
> bytes, java DataOutputStream has to call write(byte) once. Here is the 
> awkward writeInt() method in DataOutputStream, which writes 4 bytes 
> separately in big-endian order. 
> {code}
> public final void writeInt(int v) throws IOException {
> out.write((v >>> 24) & 0xFF);
> out.write((v >>> 16) & 0xFF);
> out.write((v >>>  8) & 0xFF);
> out.write((v >>>  0) & 0xFF);
> incCount(4);
> }
> {code}
> Unfortunately, GZIPOutputStream does not implement the write(byte) method. 
> Instead, it only provides a write(byte[], offset, len) method, which calls 
> the corresponding JNI zlib function. The write(byte) calls from 
> DataOutputStream are translated into write(byte[], offset, len) calls in a 
> very inefficient way: (Oracle JDK 1.8 code)
> {code}
> class DeflaterOutputStream {
> public void write(int b) throws IOException {
> byte[] buf = new byte[1];
> buf[0] = (byte)(b & 0xff);
> write(buf, 0, 1);
> }
> public void write(byte[] b, int off, int len) throws IOException {
> if (def.finished()) {
> throw new IOException("write beyond end of stream");
> }
> if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
> throw new IndexOutOfBoundsException();
> } else if (len == 0) {
> return;
> }
> if (!def.finished()) {
> def.setInput(b, off, len);
> while (!def.needsInput()) {
> deflate();
> }
> }
> }
> }
> class GZIPOutputStream extends DeflaterOutputStream {
> public synchronized void write(byte[] buf, int off, int len)
> throws IOException
> {
> super.write(buf, off, len);
> crc.update(buf, off, len);
> }
> }
> class Deflater {
> private native int deflateBytes(long addr, byte[] b, int off, int len, int 
> flush);
> }
> class CRC32 {
> public void update(byte[] b, int off, int len) {
> if (b == null) {
> throw new NullPointerException();
> }
> if (off < 0 || len < 0 || off > b.length - len) {
> throw new ArrayIndexOutOfBoundsException();
> }
> crc = updateBytes(crc, b, off, len);
> }
> private native static int updateBytes(int crc, byte[] b, int off, int 
> len);
> }
> {code}
> For each meta data byte, the code above has to allocate 1 single byte array, 
> acquire several locks, call two native JNI methods (Deflater.deflateBytes and 
> CRC32.updateBytes). In each Kafka message, there are about 30 some meta data 
> bytes.
> The call stack of Deflater.deflateBytes():
> 

[jira] [Commented] (KAFKA-6430) Improve Kafka GZip compression performance

2018-01-07 Thread Ying Zheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315507#comment-16315507
 ] 

Ying Zheng commented on KAFKA-6430:
---

[~tedyu]
Gotcha!
Thank you!

> Improve Kafka GZip compression performance
> --
>
> Key: KAFKA-6430
> URL: https://issues.apache.org/jira/browse/KAFKA-6430
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Ying Zheng
>Priority: Minor
>
> To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
>   new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
> To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
>new DataInputStream(new GZIPInputStream(buffer));
> This is very straight forward, but actually inefficient. For each message, in 
> addition to the key and value data, Kafka has to write about 30 some metadata 
> bytes (slightly varies in different Kafka version), including magic byte, 
> checksum, timestamp, offset, key length, value length etc. For each of these 
> bytes, java DataOutputStream has to call write(byte) once. Here is the 
> awkward writeInt() method in DataOutputStream, which writes 4 bytes 
> separately in big-endian order. 
> {code}
> public final void writeInt(int v) throws IOException {
> out.write((v >>> 24) & 0xFF);
> out.write((v >>> 16) & 0xFF);
> out.write((v >>>  8) & 0xFF);
> out.write((v >>>  0) & 0xFF);
> incCount(4);
> }
> {code}
> Unfortunately, GZIPOutputStream does not implement the write(byte) method. 
> Instead, it only provides a write(byte[], offset, len) method, which calls 
> the corresponding JNI zlib function. The write(byte) calls from 
> DataOutputStream are translated into write(byte[], offset, len) calls in a 
> very inefficient way: (Oracle JDK 1.8 code)
> {code}
> class DeflaterOutputStream {
> public void write(int b) throws IOException {
> byte[] buf = new byte[1];
> buf[0] = (byte)(b & 0xff);
> write(buf, 0, 1);
> }
> public void write(byte[] b, int off, int len) throws IOException {
> if (def.finished()) {
> throw new IOException("write beyond end of stream");
> }
> if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
> throw new IndexOutOfBoundsException();
> } else if (len == 0) {
> return;
> }
> if (!def.finished()) {
> def.setInput(b, off, len);
> while (!def.needsInput()) {
> deflate();
> }
> }
> }
> }
> class GZIPOutputStream extends DeflaterOutputStream {
> public synchronized void write(byte[] buf, int off, int len)
> throws IOException
> {
> super.write(buf, off, len);
> crc.update(buf, off, len);
> }
> }
> class Deflater {
> private native int deflateBytes(long addr, byte[] b, int off, int len, int 
> flush);
> }
> class CRC32 {
> public void update(byte[] b, int off, int len) {
> if (b == null) {
> throw new NullPointerException();
> }
> if (off < 0 || len < 0 || off > b.length - len) {
> throw new ArrayIndexOutOfBoundsException();
> }
> crc = updateBytes(crc, b, off, len);
> }
> private native static int updateBytes(int crc, byte[] b, int off, int 
> len);
> }
> {code}
> For each meta data byte, the code above has to allocate 1 single byte array, 
> acquire several locks, call two native JNI methods (Deflater.deflateBytes and 
> CRC32.updateBytes). In each Kafka message, there are about 30 some meta data 
> bytes.
> The call stack of Deflater.deflateBytes():
> DeflaterOutputStream.public void write(int b) -> 
> GZIPOutputStream.write(byte[] buf, int off, int len) -> 
> DeflaterOutputStream.write(byte[] b, int off, int len) -> 
> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, int 
> len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> 
> Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush)
> The call stack of CRC32.updateBytes():
> DeflaterOutputStream.public void write(int b) -> 
> GZIPOutputStream.write(byte[] buf, int off, int len) -> CRC32.update(byte[] 
> b, int off, int len) -> CRC32.updateBytes(int crc, byte[] b, int off, int len)
> At Uber, we found that adding a small buffer between DataOutputStream and 
> GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in 
> average.
> {code}
>  -return new DataOutputStream(new 
> GZIPOutputStream(buffer, bufferSize));
> +return new DataOutputStream(new BufferedOutputStream(new 
> GZIPOutputStream(buffer, bufferSize), 1 << 14));
> {code}
> The similar issue also exist in GZip 

[jira] [Commented] (KAFKA-6430) Improve Kafka GZip compression performance

2018-01-07 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315500#comment-16315500
 ] 

Ted Yu commented on KAFKA-6430:
---

Once you send out pull request, a committer would assign this to you.

> Improve Kafka GZip compression performance
> --
>
> Key: KAFKA-6430
> URL: https://issues.apache.org/jira/browse/KAFKA-6430
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Ying Zheng
>Priority: Minor
>
> To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
>   new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
> To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
>new DataInputStream(new GZIPInputStream(buffer));
> This is very straight forward, but actually inefficient. For each message, in 
> addition to the key and value data, Kafka has to write about 30 some metadata 
> bytes (slightly varies in different Kafka version), including magic byte, 
> checksum, timestamp, offset, key length, value length etc. For each of these 
> bytes, java DataOutputStream has to call write(byte) once. Here is the 
> awkward writeInt() method in DataOutputStream, which writes 4 bytes 
> separately in big-endian order. 
> {code}
> public final void writeInt(int v) throws IOException {
> out.write((v >>> 24) & 0xFF);
> out.write((v >>> 16) & 0xFF);
> out.write((v >>>  8) & 0xFF);
> out.write((v >>>  0) & 0xFF);
> incCount(4);
> }
> {code}
> Unfortunately, GZIPOutputStream does not implement the write(byte) method. 
> Instead, it only provides a write(byte[], offset, len) method, which calls 
> the corresponding JNI zlib function. The write(byte) calls from 
> DataOutputStream are translated into write(byte[], offset, len) calls in a 
> very inefficient way: (Oracle JDK 1.8 code)
> {code}
> class DeflaterOutputStream {
> public void write(int b) throws IOException {
> byte[] buf = new byte[1];
> buf[0] = (byte)(b & 0xff);
> write(buf, 0, 1);
> }
> public void write(byte[] b, int off, int len) throws IOException {
> if (def.finished()) {
> throw new IOException("write beyond end of stream");
> }
> if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
> throw new IndexOutOfBoundsException();
> } else if (len == 0) {
> return;
> }
> if (!def.finished()) {
> def.setInput(b, off, len);
> while (!def.needsInput()) {
> deflate();
> }
> }
> }
> }
> class GZIPOutputStream extends DeflaterOutputStream {
> public synchronized void write(byte[] buf, int off, int len)
> throws IOException
> {
> super.write(buf, off, len);
> crc.update(buf, off, len);
> }
> }
> class Deflater {
> private native int deflateBytes(long addr, byte[] b, int off, int len, int 
> flush);
> }
> class CRC32 {
> public void update(byte[] b, int off, int len) {
> if (b == null) {
> throw new NullPointerException();
> }
> if (off < 0 || len < 0 || off > b.length - len) {
> throw new ArrayIndexOutOfBoundsException();
> }
> crc = updateBytes(crc, b, off, len);
> }
> private native static int updateBytes(int crc, byte[] b, int off, int 
> len);
> }
> {code}
> For each meta data byte, the code above has to allocate 1 single byte array, 
> acquire several locks, call two native JNI methods (Deflater.deflateBytes and 
> CRC32.updateBytes). In each Kafka message, there are about 30 some meta data 
> bytes.
> The call stack of Deflater.deflateBytes():
> DeflaterOutputStream.public void write(int b) -> 
> GZIPOutputStream.write(byte[] buf, int off, int len) -> 
> DeflaterOutputStream.write(byte[] b, int off, int len) -> 
> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, int 
> len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> 
> Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush)
> The call stack of CRC32.updateBytes():
> DeflaterOutputStream.public void write(int b) -> 
> GZIPOutputStream.write(byte[] buf, int off, int len) -> CRC32.update(byte[] 
> b, int off, int len) -> CRC32.updateBytes(int crc, byte[] b, int off, int len)
> At Uber, we found that adding a small buffer between DataOutputStream and 
> GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in 
> average.
> {code}
>  -return new DataOutputStream(new 
> GZIPOutputStream(buffer, bufferSize));
> +return new DataOutputStream(new BufferedOutputStream(new 
> GZIPOutputStream(buffer, bufferSize), 1 << 14));
> {code}
> The similar 

[jira] [Commented] (KAFKA-6430) Improve Kafka GZip compression performance

2018-01-07 Thread Ying Zheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315497#comment-16315497
 ] 

Ying Zheng commented on KAFKA-6430:
---

@Ted Yu, Thank you! BTW, do you know how can I assign this ticket to myself?

> Improve Kafka GZip compression performance
> --
>
> Key: KAFKA-6430
> URL: https://issues.apache.org/jira/browse/KAFKA-6430
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Ying Zheng
>Priority: Minor
>
> To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
>   new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
> To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
>new DataInputStream(new GZIPInputStream(buffer));
> This is very straight forward, but actually inefficient. For each message, in 
> addition to the key and value data, Kafka has to write about 30 some metadata 
> bytes (slightly varies in different Kafka version), including magic byte, 
> checksum, timestamp, offset, key length, value length etc. For each of these 
> bytes, java DataOutputStream has to call write(byte) once. Here is the 
> awkward writeInt() method in DataOutputStream, which writes 4 bytes 
> separately in big-endian order. 
> {code}
> public final void writeInt(int v) throws IOException {
> out.write((v >>> 24) & 0xFF);
> out.write((v >>> 16) & 0xFF);
> out.write((v >>>  8) & 0xFF);
> out.write((v >>>  0) & 0xFF);
> incCount(4);
> }
> {code}
> Unfortunately, GZIPOutputStream does not implement the write(byte) method. 
> Instead, it only provides a write(byte[], offset, len) method, which calls 
> the corresponding JNI zlib function. The write(byte) calls from 
> DataOutputStream are translated into write(byte[], offset, len) calls in a 
> very inefficient way: (Oracle JDK 1.8 code)
> {code}
> class DeflaterOutputStream {
> public void write(int b) throws IOException {
> byte[] buf = new byte[1];
> buf[0] = (byte)(b & 0xff);
> write(buf, 0, 1);
> }
> public void write(byte[] b, int off, int len) throws IOException {
> if (def.finished()) {
> throw new IOException("write beyond end of stream");
> }
> if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
> throw new IndexOutOfBoundsException();
> } else if (len == 0) {
> return;
> }
> if (!def.finished()) {
> def.setInput(b, off, len);
> while (!def.needsInput()) {
> deflate();
> }
> }
> }
> }
> class GZIPOutputStream extends DeflaterOutputStream {
> public synchronized void write(byte[] buf, int off, int len)
> throws IOException
> {
> super.write(buf, off, len);
> crc.update(buf, off, len);
> }
> }
> class Deflater {
> private native int deflateBytes(long addr, byte[] b, int off, int len, int 
> flush);
> }
> class CRC32 {
> public void update(byte[] b, int off, int len) {
> if (b == null) {
> throw new NullPointerException();
> }
> if (off < 0 || len < 0 || off > b.length - len) {
> throw new ArrayIndexOutOfBoundsException();
> }
> crc = updateBytes(crc, b, off, len);
> }
> private native static int updateBytes(int crc, byte[] b, int off, int 
> len);
> }
> {code}
> For each meta data byte, the code above has to allocate 1 single byte array, 
> acquire several locks, call two native JNI methods (Deflater.deflateBytes and 
> CRC32.updateBytes). In each Kafka message, there are about 30 some meta data 
> bytes.
> The call stack of Deflater.deflateBytes():
> DeflaterOutputStream.public void write(int b) -> 
> GZIPOutputStream.write(byte[] buf, int off, int len) -> 
> DeflaterOutputStream.write(byte[] b, int off, int len) -> 
> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, int 
> len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> 
> Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush)
> The call stack of CRC32.updateBytes():
> DeflaterOutputStream.public void write(int b) -> 
> GZIPOutputStream.write(byte[] buf, int off, int len) -> CRC32.update(byte[] 
> b, int off, int len) -> CRC32.updateBytes(int crc, byte[] b, int off, int len)
> At Uber, we found that adding a small buffer between DataOutputStream and 
> GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in 
> average.
> {code}
>  -return new DataOutputStream(new 
> GZIPOutputStream(buffer, bufferSize));
> +return new DataOutputStream(new BufferedOutputStream(new 
> GZIPOutputStream(buffer, bufferSize), 1 << 14));
> {code}

[jira] [Commented] (KAFKA-6430) Improve Kafka GZip compression performance

2018-01-07 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315495#comment-16315495
 ] 

Ted Yu commented on KAFKA-6430:
---

bq. Here is the awkward writeInt() method in DataInputStream

I guess you meant DataOutputStream instead of DataInputStream

Please use \{code\} surrounding code snippet for the code to be more readable.

> Improve Kafka GZip compression performance
> --
>
> Key: KAFKA-6430
> URL: https://issues.apache.org/jira/browse/KAFKA-6430
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Ying Zheng
>Priority: Minor
>
> To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
>   new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
> To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
>new DataInputStream(new GZIPInputStream(buffer));
> This is very straight forward, but actually inefficient. For each message, in 
> addition to the key and value data, Kafka has to write about 30 some metadata 
> bytes (slightly varies in different Kafka version), including magic byte, 
> checksum, timestamp, offset, key length, value length etc. For each of these 
> bytes, java DataOutputStream has to call write(byte) once. Here is the 
> awkward writeInt() method in DataInputStream, which writes 4 bytes separately 
> in big-endian order. 
> public final void writeInt(int v) throws IOException {
> out.write((v >>> 24) & 0xFF);
> out.write((v >>> 16) & 0xFF);
> out.write((v >>>  8) & 0xFF);
> out.write((v >>>  0) & 0xFF);
> incCount(4);
> }
> Unfortunately, GZIPOutputStream does not implement the write(byte) method. 
> Instead, it only provides a write(byte[], offset, len) method, which calls 
> the corresponding JNI zlib function. The write(byte) calls from 
> DataOutputStream are translated into write(byte[], offset, len) calls in a 
> very inefficient way: (Oracle JDK 1.8 code)
> class DeflaterOutputStream {
> public void write(int b) throws IOException {
> byte[] buf = new byte[1];
> buf[0] = (byte)(b & 0xff);
> write(buf, 0, 1);
> }
> public void write(byte[] b, int off, int len) throws IOException {
> if (def.finished()) {
> throw new IOException("write beyond end of stream");
> }
> if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
> throw new IndexOutOfBoundsException();
> } else if (len == 0) {
> return;
> }
> if (!def.finished()) {
> def.setInput(b, off, len);
> while (!def.needsInput()) {
> deflate();
> }
> }
> }
> }
> class GZIPOutputStream extends DeflaterOutputStream {
> public synchronized void write(byte[] buf, int off, int len)
> throws IOException
> {
> super.write(buf, off, len);
> crc.update(buf, off, len);
> }
> }
> class Deflater {
> private native int deflateBytes(long addr, byte[] b, int off, int len, int 
> flush);
> }
> class CRC32 {
> public void update(byte[] b, int off, int len) {
> if (b == null) {
> throw new NullPointerException();
> }
> if (off < 0 || len < 0 || off > b.length - len) {
> throw new ArrayIndexOutOfBoundsException();
> }
> crc = updateBytes(crc, b, off, len);
> }
> private native static int updateBytes(int crc, byte[] b, int off, int 
> len);
> }
> For each meta data byte, the code above has to allocate 1 single byte array, 
> acquire several locks, call two native JNI methods (Deflater.deflateBytes and 
> CRC32.updateBytes). In each Kafka message, there are about 30 some meta data 
> bytes.
> The call stack of Deflater.deflateBytes():
> DeflaterOutputStream.public void write(int b) -> 
> GZIPOutputStream.write(byte[] buf, int off, int len) -> 
> DeflaterOutputStream.write(byte[] b, int off, int len) -> 
> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, int 
> len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> 
> Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush)
> The call stack of CRC32.updateBytes():
> DeflaterOutputStream.public void write(int b) -> 
> GZIPOutputStream.write(byte[] buf, int off, int len) -> CRC32.update(byte[] 
> b, int off, int len) -> CRC32.updateBytes(int crc, byte[] b, int off, int len)
> At Uber, we found that adding a small buffer between DataOutputStream and 
> GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in 
> average.
>  -return new DataOutputStream(new 
> GZIPOutputStream(buffer, bufferSize));
> +return new DataOutputStream(new