[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-03-01 Thread via GitHub


dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1121810375


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   ping @ijuma / @Hangleton , are there any blockers to getting this merged?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-02-13 Thread via GitHub


dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1104589735


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   This is producer code, with no compression (the data is pre-encrypted so it 
would be useless anyway) and the message size is 1-2 kB



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-01-31 Thread via GitHub


dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1091834852


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   Sure. In a particular workload, this code path was about 30% of CPU usage in 
flamegraphs. It is now 2-3% after a local patch.
   
   This hasn't been discussed in dev- its just an attempt to upstream a small 
performance improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-01-30 Thread via GitHub


dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1090733865


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   Per 1): This parameter is always buffer.remaining(), I've cleaned up the 
call sites and removed this parameter.
   
   Per 2): Yes, its substantial. The reason is WritableByteChannelImpl writes 
in 8k chunks when feasible, instead of 1 byte chunks 
https://github.com/AdoptOpenJDK/openjdk-jdk8u/blob/2544d2a351eca1a3d62276f969dd2d95e4a4d2b6/jdk/src/share/classes/java/nio/channels/Channels.java#L442
   
   I can't show benchmarks unfortunately to demonstrate this, as they're of a 
production application and collected using internal tooling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-01-27 Thread via GitHub


dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1089108841


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   1) Yes, per the docs of WritableByteChannel
   
   ```
   Writes a sequence of bytes to this channel from the given buffer.
   An attempt is made to write up to r bytes to the channel, where r is the 
number of bytes remaining in the buffer, that is, src.remaining(), at the 
moment this method is invoked.
   
   Suppose that a byte sequence of length n is written, where 0 <= n <= r. This 
byte sequence will be transferred from the buffer starting at index p, where p 
is the buffer's position at the moment this method is invoked; the index of the 
last byte written will be p + n - 1. Upon return the buffer's position will be 
equal to p + n; its limit will not have changed.
   ```
   
   2) Good point, although I don't think this has an effect on any of the 4 
current users (DefaultRecord.writeTo and LegacyRecord.writeTo for writing key 
and value), I've added a defensive call to asReadOnlyBuffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org