Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-29 Thread via GitHub


chia7712 merged PR #15589:
URL: https://github.com/apache/kafka/pull/15589


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-28 Thread via GitHub


chiacyu commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r154312


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -204,16 +202,14 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
 try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+ ByteBufferOutputStream out = new ByteBufferOutputStream(512)) {

Review Comment:
   Done. Thanks for the reminder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-28 Thread via GitHub


chiacyu commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1543121911


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java:
##
@@ -132,9 +133,9 @@ public void testCompressDecompress(CompressionType 
compressionType) throws IOExc
 } else {
 assertArrayEquals(testString, compressed);
 }
-
 ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, 
compressionType);
+byte[] actualResult = Utils.toArray(decompressed);

Review Comment:
   Done. Thanks for the reminder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-27 Thread via GitHub


AndrewJSchofield commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1542082131


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -204,16 +202,14 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
 try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+ ByteBufferOutputStream out = new ByteBufferOutputStream(512)) {

Review Comment:
   One tiny, tiny comment. The indentation of this line is out by 1 space. 
Apart from that, lgtm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-27 Thread via GitHub


apoorvmittal10 commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1541573651


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java:
##
@@ -132,9 +133,9 @@ public void testCompressDecompress(CompressionType 
compressionType) throws IOExc
 } else {
 assertArrayEquals(testString, compressed);
 }
-
 ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, 
compressionType);
+byte[] actualResult = Utils.toArray(decompressed);

Review Comment:
   Can we please move the conversion after the `assertNotNull(decompressed);` 
as there is a usage of `decompressed` inside the Utils.toArray method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-26 Thread via GitHub


chia7712 commented on PR #15589:
URL: https://github.com/apache/kafka/pull/15589#issuecomment-2021800318

   @apoorvmittal10 Could you please take a look this PR if ur queue is not full 
:)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-26 Thread via GitHub


chiacyu commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1539400840


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java:
##
@@ -132,9 +133,9 @@ public void testCompressDecompress(CompressionType 
compressionType) throws IOExc
 } else {
 assertArrayEquals(testString, compressed);
 }
-
 ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, 
compressionType);
+byte[] actualResult = Utils.toArray(decompressed, testString.length);

Review Comment:
   Got it, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-26 Thread via GitHub


chia7712 commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1538726557


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java:
##
@@ -132,9 +133,9 @@ public void testCompressDecompress(CompressionType 
compressionType) throws IOExc
 } else {
 assertArrayEquals(testString, compressed);
 }
-
 ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, 
compressionType);
+byte[] actualResult = Utils.toArray(decompressed, testString.length);

Review Comment:
   `byte[] actualResult = Utils.toArray(decompressed);`
   
   we should not set the size in order to check the `flip` (i.e the return 
buffer is ready to read) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-25 Thread via GitHub


chiacyu commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1537633908


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java:
##
@@ -132,9 +132,9 @@ public void testCompressDecompress(CompressionType 
compressionType) throws IOExc
 } else {
 assertArrayEquals(testString, compressed);
 }
-
 ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, 
compressionType);
+byte[] actualResult = Arrays.copyOfRange(decompressed.array(), 0, 
testString.length);

Review Comment:
   Sounds good. Will do. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-25 Thread via GitHub


chia7712 commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1537626874


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java:
##
@@ -132,9 +132,9 @@ public void testCompressDecompress(CompressionType 
compressionType) throws IOExc
 } else {
 assertArrayEquals(testString, compressed);
 }
-
 ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, 
compressionType);
+byte[] actualResult = Arrays.copyOfRange(decompressed.array(), 0, 
testString.length);

Review Comment:
   How about using `Utils.toArray`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-24 Thread via GitHub


chia7712 commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1536848215


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -204,16 +202,14 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
 try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+ ByteBufferOutputStream out = new ByteBufferOutputStream(1)) {

Review Comment:
   Yep, we can set a larger size to initialize buffer. Maybe we can use the 
same size (512 bytes) as line#192



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-24 Thread via GitHub


brandboat commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1536846618


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -204,16 +202,14 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
 try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+ ByteBufferOutputStream out = new ByteBufferOutputStream(1)) {

Review Comment:
   `1` is a little bit too small I think, to avoid too many expandBuffer 
invocation, maybe we can use 32 (which is the default byte array value in 
ByteArrayOutputStream) or 64 ? @chia7712  WDYT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-24 Thread via GitHub


chiacyu commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1536831206


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -203,20 +201,19 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
-try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) {
 byte[] bytes = new byte[data.capacity() * 2];
 int nRead;
 while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
-out.write(bytes, 0, nRead);
+try (ByteBufferOutputStream out = new 
ByteBufferOutputStream(nRead)) {

Review Comment:
   The helper function already comes with [unit 
tests](https://github.com/apache/kafka/blob/bf9a27fefdb3d93c7a510f871433c4c9e07de71a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java#L126).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-24 Thread via GitHub


chiacyu commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1536831097


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -203,20 +201,19 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
-try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) {
 byte[] bytes = new byte[data.capacity() * 2];
 int nRead;
 while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
-out.write(bytes, 0, nRead);
+try (ByteBufferOutputStream out = new 
ByteBufferOutputStream(nRead)) {
+out.write(bytes, 0, nRead);
+return out.buffer();

Review Comment:
   Done. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-24 Thread via GitHub


chia7712 commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1536830823


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -204,17 +202,16 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
 try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
-byte[] bytes = new byte[data.capacity() * 2];
-int nRead;
-while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
-out.write(bytes, 0, nRead);
-}
-
-out.flush();
-return ByteBuffer.wrap(out.toByteArray());
-} catch (IOException e) {
+ ByteBufferOutputStream out = new ByteBufferOutputStream(1)) {
+byte[] bytes = new byte[data.capacity() * 2];

Review Comment:
   Could you please remove unnecessary indent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-24 Thread via GitHub


chiacyu commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1536789689


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -203,17 +201,13 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
-try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) {
 byte[] bytes = new byte[data.capacity() * 2];
-int nRead;
-while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
+int nRead = in.read(bytes, 0, bytes.length);

Review Comment:
   Thanks for the reminder. Since the initialization of ByteBufferOutputStream 
requires the capacity. I use totalReads to record the total read bytes.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-24 Thread via GitHub


chia7712 commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1536789605


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -203,17 +201,17 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
-try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) {
 byte[] bytes = new byte[data.capacity() * 2];
 int nRead;
+int totalReads = 0;
 while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
-out.write(bytes, 0, nRead);
+totalReads+=nRead;

Review Comment:
   you don't need to do this since `ByteBufferOutputStream` can expand inner 
buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-24 Thread via GitHub


chia7712 commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1536789020


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -203,20 +201,19 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
-try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) {
 byte[] bytes = new byte[data.capacity() * 2];
 int nRead;
 while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
-out.write(bytes, 0, nRead);
+try (ByteBufferOutputStream out = new 
ByteBufferOutputStream(nRead)) {

Review Comment:
   Could we use single try-catch to release both `in` and `out`?



##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -203,20 +201,19 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
-try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) {
 byte[] bytes = new byte[data.capacity() * 2];
 int nRead;
 while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
-out.write(bytes, 0, nRead);
+try (ByteBufferOutputStream out = new 
ByteBufferOutputStream(nRead)) {
+out.write(bytes, 0, nRead);
+return out.buffer();

Review Comment:
   should we call `flip` before returning it??



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-24 Thread via GitHub


chia7712 commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1536784932


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -203,17 +201,13 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
-try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create())) {
 byte[] bytes = new byte[data.capacity() * 2];
-int nRead;
-while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
+int nRead = in.read(bytes, 0, bytes.length);

Review Comment:
   we can't remove the while loop since the `InputStream#read` does NOT 
guarantee to read ALL data at once. Please take a look at API spec: 
https://docs.oracle.com/javase/8/docs/api/java/io/InputStream.html#read-byte:A-int-int-



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org