[GitHub] [kafka] fvaleri commented on pull request #13136: KAFKA-14582: Move JmxTool to tools

2023-01-20 Thread GitBox


fvaleri commented on PR #13136:
URL: https://github.com/apache/kafka/pull/13136#issuecomment-1398707514

   Output example:
   
   ```sh
   $ bin/kafka-run-class.sh org.apache.kafka.tools.JmxCommand --jmx-url 
service:jmx:rmi:///jndi/rmi://:/jmxrmi --object-name 
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec --attributes 
FifteenMinuteRate,FiveMinuteRate --date-format "MMdd-hh:mm:ss" 
--reporting-interval 1000 --report-format tsv
   Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://:/jmxrmi
   time20230120-06:23:14
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate 
   0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate
0.0
   time 20230120-06:23:15
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate 
0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate
0.0
   time 20230120-06:23:16
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate 
0.0
   kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate
0.0
   ^C
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on pull request #13060: KAFKA-14559: Fix JMX tool to handle the object names with wild cards and optional attributes

2023-01-20 Thread GitBox


kamalcph commented on PR #13060:
URL: https://github.com/apache/kafka/pull/13060#issuecomment-1398673057

   @ijuma @showuon 
   Please take a look when you get chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on pull request #13059: MINOR: KafkaConfig should not expose internal config when queried for non-internal values

2023-01-20 Thread GitBox


kamalcph commented on PR #13059:
URL: https://github.com/apache/kafka/pull/13059#issuecomment-1398670571

   @ijuma @showuon 
   
   can you please merge the patch?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri opened a new pull request, #13136: KAFKA-14582: Move JmxTool to tools

2023-01-20 Thread GitBox


fvaleri opened a new pull request, #13136:
URL: https://github.com/apache/kafka/pull/13136

   This PR is based on https://github.com/apache/kafka/pull/13131.
   
   This class is also used by the system tests, so I need to check if the 
replacement works fine there too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-20 Thread GitBox


fvaleri commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1082712140


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore
+ * as it also checks whether the version needs to be printed, but
+ * refactoring this would have meant changing all command line tools
+ * and unnecessarily increased the blast radius of this change.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @param message Message to display on successful check
+ */
+public static void printHelpAndExitIfNeeded(CommandDefaultOptions 
commandOpts, String message) {
+if (isPrintHelpNeeded(commandOpts)) {
+printUsageAndDie(commandOpts.parser, message);
+}
+if (isPrintVersionNeeded(commandOpts)) {
+printVersionAndDie();
+}
+}
+
+/**
+ * Check that all the listed options are present.
+ */
+public static void checkRequiredArgs(OptionParser parser, OptionSet 
options, OptionSpec... requiredList) {
+for (OptionSpec arg : requiredList) {
+if (!options.has(arg)) {
+printUsageAndDie(parser, String.format("Missing required 
argument \"%s\"", arg));
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+OptionSpec... invalidOptions) {
+if (options.has(usedOption)) {
+for (OptionSpec arg : invalidOptions) {
+if (options.has(arg)) {
+printUsageAndDie(parser, String.format("Option \"%s\" 
can't be used with option \"%s\"", usedOption, arg));
+}
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+Set> invalidOptions) {
+OptionSpec[] array = new OptionSpec[invalidOptions.size()];
+invalidOptions.toArray(array);
+checkInvalidArgs(parser, options, usedOption, array);
+}
+
+/**
+ * Check that none of the listed options are present with the combination 
of used options.
+ */
+public static void 

[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-20 Thread GitBox


clolov commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1082663006


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore
+ * as it also checks whether the version needs to be printed, but
+ * refactoring this would have meant changing all command line tools
+ * and unnecessarily increased the blast radius of this change.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @param message Message to display on successful check
+ */
+public static void printHelpAndExitIfNeeded(CommandDefaultOptions 
commandOpts, String message) {
+if (isPrintHelpNeeded(commandOpts)) {
+printUsageAndDie(commandOpts.parser, message);
+}
+if (isPrintVersionNeeded(commandOpts)) {
+printVersionAndDie();
+}
+}
+
+/**
+ * Check that all the listed options are present.
+ */
+public static void checkRequiredArgs(OptionParser parser, OptionSet 
options, OptionSpec... requiredList) {
+for (OptionSpec arg : requiredList) {
+if (!options.has(arg)) {
+printUsageAndDie(parser, String.format("Missing required 
argument \"%s\"", arg));
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+OptionSpec... invalidOptions) {
+if (options.has(usedOption)) {
+for (OptionSpec arg : invalidOptions) {
+if (options.has(arg)) {
+printUsageAndDie(parser, String.format("Option \"%s\" 
can't be used with option \"%s\"", usedOption, arg));
+}
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+Set> invalidOptions) {
+OptionSpec[] array = new OptionSpec[invalidOptions.size()];
+invalidOptions.toArray(array);
+checkInvalidArgs(parser, options, usedOption, array);
+}
+
+/**
+ * Check that none of the listed options are present with the combination 
of used options.
+ */
+public static void 

[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082623877


##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 return ZstdFactory.wrapForInput(buffer, messageVersion, 
decompressionBufferSupplier);
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 16 * 1024; // 16KB

Review Comment:
   OK, that's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082622801


##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 return ZstdFactory.wrapForInput(buffer, messageVersion, 
decompressionBufferSupplier);
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 16 * 1024; // 16KB

Review Comment:
   The PR says:
   
   > we pushed the skipping of key/value logic to zstd-jni implementation 
instead of using the one provided by BufferedInputStream



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082615267


##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 return ZstdFactory.wrapForInput(buffer, messageVersion, 
decompressionBufferSupplier);
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 16 * 1024; // 16KB

Review Comment:
   This will be picked up as a separate follow-up PR: 
https://issues.apache.org/jira/browse/KAFKA-14634 
   
   I decided to keep it separate is to introduce fewer changes at a time and 
measure their impact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


ijuma commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1398459548

   One more thing: when it comes to the testing, can we include the case where 
the batches have a single 10 byte message?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082588469


##
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##
@@ -62,10 +68,11 @@ public void release(ByteBuffer buffer) {
 }
 };
 
-// Set output buffer (uncompressed) to 16 KB (none by default) to 
ensure reasonable performance
-// in cases where the caller reads a small number of bytes 
(potentially a single byte).
-return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new 
ByteBufferInputStream(buffer),
-bufferPool), 16 * 1024);
+// We do not use an intermediate buffer to store the decompressed 
data as a result of JNI read() calls using
+// `ZstdInputStreamNoFinalizer` here. Every read() call to 
`DataInputStream` will be a JNI call and the
+// caller is expected to balance the tradeoff between reading 
large amount of data vs. making multiple JNI
+// calls.
+return new DataInputStream(new ZstdInputStreamNoFinalizer(new 
ByteBufferInputStream(buffer), bufferPool));

Review Comment:
   2 questions:
   1. Why do we wrap into DataInputStream?
   2. Have as checked that there are no workloads where we end up doing too 
many JNI calls?



##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -47,6 +47,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 return new ByteBufferInputStream(buffer);
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 2 * 1024; // 2KB

Review Comment:
   What's the meaning of this for an uncompressed stream?



##
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##
@@ -26,21 +26,25 @@
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public class ZstdFactory {
+/**
+ * Default compression level
+ */
+private static final int DEFAULT_COMPRESSION_LEVEL = 3;

Review Comment:
   Since this is unrelated, do we have to include it as part of this PR?



##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 return ZstdFactory.wrapForInput(buffer, messageVersion, 
decompressionBufferSupplier);
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 16 * 1024; // 16KB

Review Comment:
   We decided not to get this info from the zstd library?



##
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
 public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
 final ByteBuffer buffer = this.buffer.duplicate();
 buffer.position(RECORDS_OFFSET);
-return new DataInputStream(compressionType().wrapForInput(buffer, 
magic(), bufferSupplier));
+final InputStream decompressedStream = 
compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+return decompressedStream instanceof DataInputStream ? 
(DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
 }
 
 private CloseableIterator compressedIterator(BufferSupplier 
bufferSupplier, boolean skipKeyValue) {
 final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
 if (skipKeyValue) {
 // this buffer is used to skip length delimited fields like key, 
value, headers
-byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+final ByteBuffer skipBuffer = 
bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   I thought we wanted to call the underlying skipBytes API versus doing the 
skipping by reading into a skip buffer. I don't see that change. What am I 
missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1398428117

   @ijuma please review when you get a chance since you already have context 
about this code change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082366873


##
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##
@@ -26,21 +26,25 @@
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public class ZstdFactory {
+/**
+ * Default compression level
+ */
+private static final int DEFAULT_COMPRESSION_LEVEL = 3;

Review Comment:
   FYI reviewer
   
   This change is a no-op since we are already using the default value of 3 
when no value is provided. This change has been made to make it explicit in 
Kafka code that we are using compression level of 3 with zstd.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2023-01-20 Thread GitBox


clolov commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1398172190

   Thank you very much for the review and merge!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya opened a new pull request, #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


divijvaidya opened a new pull request, #13135:
URL: https://github.com/apache/kafka/pull/13135

   This covers two JIRAs https://issues.apache.org/jira/browse/KAFKA-14632 and 
https://issues.apache.org/jira/browse/KAFKA-14633 
   
   ## Background 
   ![Screenshot 2023-01-19 at 18 27 
45](https://user-images.githubusercontent.com/71267/213521204-bb3228ed-7d21-4e07-a520-697ea6fcc0ed.png)
   Currently, we use 2 intermediate buffers while handling decompressed data 
(one of size 2KB and another of size 16KB). These buffers are (de)allocated 
once per batch. 
   
   The impact of this was observed in a flamegraph analysis for a compressed 
workload where we observed that 75% of CPU during `appendAsLeader()` is taken 
up by `ValidateMessagesAndAssignOffsets`.
   
   ![Screenshot 2023-01-20 at 10 41 
08](https://user-images.githubusercontent.com/71267/213664252-389eaf3d-b8aa-465b-b010-db1024663d6f.png)
   
   
   ## Change
   With this PR:
   1. we are removing the number of intermediate buffers from 2 to 1. This 
reduces 1 point of data copy. Note that this removed data copy occurred in 
chunks of 2kb at a time, multiple times. This is achieved by getting rid of 
`BufferedInputStream` and moving to `DataInputStream`. This change has only 
been made for `zstd` and `gzip`.
   2. we are using thread local buffer pool for both the buffers involved in 
the process of decompression. This change impacts all compression types.
   3. we pushed the skipping of key/value logic to 
   
   After the change, the above buffer allocation looks as follows:
   ![Screenshot 2023-01-19 at 18 28 
14](https://user-images.githubusercontent.com/71267/213525653-917ac5ee-810a-435e-bf84-c97d6b76005e.png)
   
   ## Results
   After this change, a JMH benchmark for `ValidateMessagesAndAssignOffsets` 
demonstrated 10-50% increased throughput across all compression types without 
any regression. The improvement is prominent when thread cached buffer pools 
are used with 1-2% regression in some limited scenarios.
   
   When buffer pools are not used (NO_CACHING in the results), we observed GZIP 
having 10% better performance in some cases with 1-4% regression for some other 
scenarios. Overall, without using the buffer pools, the upside of this code 
change is limited to single digit improvements in certain scenarios.
   
   
   Details results from JMH benchmark are available here: 
[benchmark-jira.xlsx](https://github.com/apache/kafka/files/10465049/benchmark-jira.xlsx)
   
   
   ## Testing
   - Sanity testing using the existing unit test to ensure that we don't impact 
correctness.
   - JMH benchmarks for all compression types to ensure that we did not regress 
other compression types.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman opened a new pull request, #13134: MINOR: fix flaky StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription

2023-01-20 Thread GitBox


ableegoldman opened a new pull request, #13134:
URL: https://github.com/apache/kafka/pull/13134

   This test is supposed to be a sanity check that rebalancing with a large 
number of partitions/consumers won't start to take obscenely long or approach 
the `max.poll.interval.ms` -- bumping up the timeout by another 30s still feels 
very reasonable considering the test is for 1 million partitions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hzh0425 opened a new pull request, #13133: MINOR: Fix some typos in remote.metadata.storage

2023-01-19 Thread GitBox


hzh0425 opened a new pull request, #13133:
URL: https://github.com/apache/kafka/pull/13133

   Fix some typos in storage module / remote.metadata.storage
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-19 Thread GitBox


dajac merged PR #13112:
URL: https://github.com/apache/kafka/pull/13112


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13099: KAFKA-14604: avoid SASL session expiration time overflowed when calculation

2023-01-19 Thread GitBox


showuon commented on code in PR #13099:
URL: https://github.com/apache/kafka/pull/13099#discussion_r1082162567


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1496,4 +1496,32 @@ public static String replaceSuffix(String str, String 
oldSuffix, String newSuffi
 throw new IllegalArgumentException("Expected string to end with " 
+ oldSuffix + " but string is " + str);
 return str.substring(0, str.length() - oldSuffix.length()) + newSuffix;
 }
+
+public static long zeroIfNegative(long value) {
+return Math.max(0L, value);
+}
+
+// returns the sum of a and b unless it would overflow, which will return 
Long.MAX_VALUE
+public static long saturatedAdd(long a, long b) {
+long result = Long.MAX_VALUE;
+try {
+result = Math.addExact(a, b);
+} catch (ArithmeticException e) {
+log.info("The sum of {} and {} is overflowed, set to 
Long.MAX_VALUE", a, b);

Review Comment:
   Hmm. OK. Let's throw an exception. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yufeiyan1220 commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown

2023-01-19 Thread GitBox


yufeiyan1220 commented on PR #13125:
URL: https://github.com/apache/kafka/pull/13125#issuecomment-1397979098

   > Seems like we aren't particularly consistent at removing these metrics and 
sensors, fetcher would be another example. Mind making the clean up more 
comprehensive?
   
   Never mind. I think it should be better to make the metric removal logic 
consistent. Let me take some time to look for these cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown

2023-01-19 Thread GitBox


philipnee commented on PR #13125:
URL: https://github.com/apache/kafka/pull/13125#issuecomment-1397952270

   Seems like we aren't particularly consistent at removing these metrics and 
sensors, fetcher would be another example.  Mind making the clean up more 
comprehensive?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman opened a new pull request, #13132: MINOR: fix warnings in Streams javadocs

2023-01-19 Thread GitBox


ableegoldman opened a new pull request, #13132:
URL: https://github.com/apache/kafka/pull/13132

   While working on the 3.4 release I noticed we've built up an embarrassingly 
long list of warnings within the Streams javadocs. It's unavoidable for some 
links to break as the source code changes, but let's reset back to a good state 
before the list gets even longer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets

2023-01-19 Thread GitBox


mjsax merged PR #13129:
URL: https://github.com/apache/kafka/pull/13129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yufeiyan1220 commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown

2023-01-19 Thread GitBox


yufeiyan1220 commented on PR #13125:
URL: https://github.com/apache/kafka/pull/13125#issuecomment-1397837046

   > Thanks for the PR and the issue @yufeiyan1220 - I wonder if the clean up 
is necessary, as the metrics will be closed upon the client closing. Willing to 
hear what others say.
   
   I found that problem when I tried to create and close consumer frequently. 
Just like the test case I have made in `KafkaConsumerTest.java`, these consumer 
coordinator metrics are still present if I does not close  these metrics 
explicitly. I think most of users might not try to create and close consumer 
frequently, but there is a potential for memory leak actually. 
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Gerrrr commented on pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets

2023-01-19 Thread GitBox


Ge commented on PR #13129:
URL: https://github.com/apache/kafka/pull/13129#issuecomment-1397815859

   @hachikuji @jolshan I don't have Kafka committer privileges. Can you please 
commit?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Gerrrr commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets

2023-01-19 Thread GitBox


Ge commented on code in PR #13129:
URL: https://github.com/apache/kafka/pull/13129#discussion_r1081881848


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -318,7 +318,8 @@ public class ProducerConfig extends AbstractConfig {
 
 /**  transaction.timeout.ms  */
 public static final String TRANSACTION_TIMEOUT_CONFIG = 
"transaction.timeout.ms";
-public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount 
of time in ms that the transaction coordinator will wait for a transaction 
status update from the producer before proactively aborting the ongoing 
transaction." +
+public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount 
of time in milliseconds that a transaction will remain open before the 
coordinator proactively aborts it. " +
+"The start of the transaction is set at the time that the first 
partition is added to it. " +
 "If this value is larger than the transaction.max.timeout.ms 
setting in the broker, the request will fail with a 
InvalidTxnTimeoutException error.";

Review Comment:
   Done in 
[a1119c1](https://github.com/apache/kafka/pull/13129/commits/a1119c16f5d8c3cedf554d62e8112085b2f6)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets

2023-01-19 Thread GitBox


hachikuji commented on code in PR #13129:
URL: https://github.com/apache/kafka/pull/13129#discussion_r1081878202


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -318,7 +318,8 @@ public class ProducerConfig extends AbstractConfig {
 
 /**  transaction.timeout.ms  */
 public static final String TRANSACTION_TIMEOUT_CONFIG = 
"transaction.timeout.ms";
-public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount 
of time in ms that the transaction coordinator will wait for a transaction 
status update from the producer before proactively aborting the ongoing 
transaction." +
+public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount 
of time in milliseconds that a transaction will remain open before the 
coordinator proactively aborts it. " +
+"The start of the transaction is set at the time that the first 
partition is added to it. " +
 "If this value is larger than the transaction.max.timeout.ms 
setting in the broker, the request will fail with a 
InvalidTxnTimeoutException error.";

Review Comment:
   nit: could we put `` blocks around `transaction.max.timeout.ms` while 
we're in here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Gerrrr commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets

2023-01-19 Thread GitBox


Ge commented on code in PR #13129:
URL: https://github.com/apache/kafka/pull/13129#discussion_r1081848092


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -319,6 +319,7 @@ public class ProducerConfig extends AbstractConfig {
 /**  transaction.timeout.ms  */
 public static final String TRANSACTION_TIMEOUT_CONFIG = 
"transaction.timeout.ms";
 public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount 
of time in ms that the transaction coordinator will wait for a transaction 
status update from the producer before proactively aborting the ongoing 
transaction." +
+"The transaction status update happens on the first producer send, 
on adding new partitions to the transaction, and on commit. " +

Review Comment:
   Thank you for the review! Updated in 
[420991c](https://github.com/apache/kafka/pull/13129/commits/420991c39c40a508ec6b7e87c3755597a67441e9).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-19 Thread GitBox


dajac commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1081788435


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -557,25 +557,25 @@ private[group] class GroupCoordinatorAdapter(
   }
 
   override def onElection(
-partitionIndex: Int,
-partitionLeaderEpoch: Int
+consumerOffsetsPartitionIndex: Int,
+consumerOffsetsPartitionLeaderEpoch: Int
   ): Unit = {
-coordinator.onElection(partitionIndex, partitionLeaderEpoch)
+coordinator.onElection(consumerOffsetsPartitionIndex, 
consumerOffsetsPartitionLeaderEpoch)
   }
 
   override def onResignation(
-partitionIndex: Int,
-partitionLeaderEpoch: OptionalInt
+consumerOffsetsPartitionIndex: Int,
+consumerOffsetsPartitionLeaderEpoch: OptionalInt
   ): Unit = {
-coordinator.onResignation(partitionIndex, partitionLeaderEpoch)
+coordinator.onResignation(consumerOffsetsPartitionIndex, 
consumerOffsetsPartitionLeaderEpoch)
   }
 
-  override def offsetsTopicConfigs(): Properties = {
+  override def consumerOffsetsTopicConfigs(): Properties = {
 coordinator.offsetsTopicConfigs
   }
 
-  override def startup(groupMetadataTopicPartitionCount: IntSupplier): Unit = {
-coordinator.startup(() => groupMetadataTopicPartitionCount.getAsInt)
+  override def startup(consumerOffsetsPartitionCount: IntSupplier): Unit = {

Review Comment:
   I don't think that we can use `groupMetadataTopic` as prefix everywhere 
because in onElection/onResignation we really want to expression the partition 
index. However, we could use `groupMetadata` as prefix in those cases.
   
   I updated the PR, let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-19 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1081688123


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -557,25 +557,25 @@ private[group] class GroupCoordinatorAdapter(
   }
 
   override def onElection(
-partitionIndex: Int,
-partitionLeaderEpoch: Int
+consumerOffsetsPartitionIndex: Int,
+consumerOffsetsPartitionLeaderEpoch: Int
   ): Unit = {
-coordinator.onElection(partitionIndex, partitionLeaderEpoch)
+coordinator.onElection(consumerOffsetsPartitionIndex, 
consumerOffsetsPartitionLeaderEpoch)
   }
 
   override def onResignation(
-partitionIndex: Int,
-partitionLeaderEpoch: OptionalInt
+consumerOffsetsPartitionIndex: Int,
+consumerOffsetsPartitionLeaderEpoch: OptionalInt
   ): Unit = {
-coordinator.onResignation(partitionIndex, partitionLeaderEpoch)
+coordinator.onResignation(consumerOffsetsPartitionIndex, 
consumerOffsetsPartitionLeaderEpoch)
   }
 
-  override def offsetsTopicConfigs(): Properties = {
+  override def consumerOffsetsTopicConfigs(): Properties = {
 coordinator.offsetsTopicConfigs
   }
 
-  override def startup(groupMetadataTopicPartitionCount: IntSupplier): Unit = {
-coordinator.startup(() => groupMetadataTopicPartitionCount.getAsInt)
+  override def startup(consumerOffsetsPartitionCount: IntSupplier): Unit = {

Review Comment:
   sorry to go back and forth on this one, but I wonder if we call it 
"GROUP_METADATA_TOPIC_NAME" in Topic.java, if we should actually just call it 
groupMetadataTopic here (and above). It's still more specific than "partition" 
in the above cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-19 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1081688123


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -557,25 +557,25 @@ private[group] class GroupCoordinatorAdapter(
   }
 
   override def onElection(
-partitionIndex: Int,
-partitionLeaderEpoch: Int
+consumerOffsetsPartitionIndex: Int,
+consumerOffsetsPartitionLeaderEpoch: Int
   ): Unit = {
-coordinator.onElection(partitionIndex, partitionLeaderEpoch)
+coordinator.onElection(consumerOffsetsPartitionIndex, 
consumerOffsetsPartitionLeaderEpoch)
   }
 
   override def onResignation(
-partitionIndex: Int,
-partitionLeaderEpoch: OptionalInt
+consumerOffsetsPartitionIndex: Int,
+consumerOffsetsPartitionLeaderEpoch: OptionalInt
   ): Unit = {
-coordinator.onResignation(partitionIndex, partitionLeaderEpoch)
+coordinator.onResignation(consumerOffsetsPartitionIndex, 
consumerOffsetsPartitionLeaderEpoch)
   }
 
-  override def offsetsTopicConfigs(): Properties = {
+  override def consumerOffsetsTopicConfigs(): Properties = {
 coordinator.offsetsTopicConfigs
   }
 
-  override def startup(groupMetadataTopicPartitionCount: IntSupplier): Unit = {
-coordinator.startup(() => groupMetadataTopicPartitionCount.getAsInt)
+  override def startup(consumerOffsetsPartitionCount: IntSupplier): Unit = {

Review Comment:
   sorry to go back and forth on this one, but I wonder if we call it 
"GROUP_METADATA_TOPIC_NAME" in topics, if we should actually just call it 
groupMetadataTopic here (and above). It's still more specific than "partition" 
in the above cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets

2023-01-19 Thread GitBox


jolshan commented on code in PR #13129:
URL: https://github.com/apache/kafka/pull/13129#discussion_r1081661935


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -319,6 +319,7 @@ public class ProducerConfig extends AbstractConfig {
 /**  transaction.timeout.ms  */
 public static final String TRANSACTION_TIMEOUT_CONFIG = 
"transaction.timeout.ms";
 public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount 
of time in ms that the transaction coordinator will wait for a transaction 
status update from the producer before proactively aborting the ongoing 
transaction." +
+"The transaction status update happens on the first producer send, 
on adding new partitions to the transaction, and on commit. " +

Review Comment:
   Ah -- I think I also had this confusion since there are notions of 
expiration and timeout. But yes, looking at your links I see that timeout is 
affected by the start timestamp only, while expiration is affected by the other 
components mentioned in the current description (adding partitions etc).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-01-19 Thread GitBox


jeffkbkim commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1081656322


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - UNKNOWN_MEMBER_ID
+  // - FENCED_MEMBER_EPOCH
+  // - UNSUPPORTED_ASSIGNOR
+  // - UNRELEASED_INSTANCE_ID
+  // - GROUP_MAX_SIZE_REACHED
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The top-level error code, or 0 if there was no error" },
+{ "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "The top-level error message, or null if there was no error." },
+{ "name": "MemberId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "The member id generated by the coordinator. Only provided when 
the member joins with MemberEpoch == 0." },
+{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
+  "about": "The member epoch." },
+{ "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+",
+  "about": "True if the member should compute the assignment for the 
group." },
+{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
+  "about": "The heartbeat interval in milliseconds." },
+{ "name": "Assignment", "type": "Assignment", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "null if not provided; the assignment otherwise.", "fields": [
+  { "name": "Error", "type": "int8", "versions": "0+",
+"about": "The assigned error." },
+  { "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", 
"versions": "0+",
+"about": "The partitions assigned to the member that can be used 
immediately." },
+  { "name": "PendingTopicPartitions", "type": "[]TopicPartitions", 
"versions": "0+",
+"about": "The partitions assigned to the member that cannot be used 
because they are not released by their former owners yet." },
+  { "name": "MetadataVersion", "type": "int16", "versions": "0+",
+"about": "The version of the metadata." },
+  { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
+"about": "The assigned metadata." }
+]}
+  ],
+  "commonStructs": [

Review Comment:
   aha - thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-01-19 Thread GitBox


jeffkbkim commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1081655338


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - UNKNOWN_MEMBER_ID
+  // - FENCED_MEMBER_EPOCH
+  // - UNSUPPORTED_ASSIGNOR
+  // - UNRELEASED_INSTANCE_ID
+  // - GROUP_MAX_SIZE_REACHED

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets

2023-01-19 Thread GitBox


hachikuji commented on code in PR #13129:
URL: https://github.com/apache/kafka/pull/13129#discussion_r1081629522


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -319,6 +319,7 @@ public class ProducerConfig extends AbstractConfig {
 /**  transaction.timeout.ms  */
 public static final String TRANSACTION_TIMEOUT_CONFIG = 
"transaction.timeout.ms";
 public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount 
of time in ms that the transaction coordinator will wait for a transaction 
status update from the producer before proactively aborting the ongoing 
transaction." +
+"The transaction status update happens on the first producer send, 
on adding new partitions to the transaction, and on commit. " +

Review Comment:
   I think this configuration is a bit misleading as currently documented. It 
looks to me like the implementation computes the timeout from the start of the 
transaction (i.e. the first call to `AddPartitionsToTxn`). We use 
`txnStartTimestamp` for this purpose here: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L134.
 This value is only updated on the initial transition to the `Ongoing` state: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala#L331.
 
   
   I'd suggest we rephrase this configuration to something like this:
   > The maximum amount of time in milliseconds that a transaction will remain 
open before the coordinator proactively aborts. The start of the transaction is 
set at the time that the first partition is added to it. If this value is 
larger than the transaction.max.timeout.ms setting in the broker, 
the request will fail with a InvalidTxnTimeoutException error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets

2023-01-19 Thread GitBox


hachikuji commented on code in PR #13129:
URL: https://github.com/apache/kafka/pull/13129#discussion_r1081629522


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -319,6 +319,7 @@ public class ProducerConfig extends AbstractConfig {
 /**  transaction.timeout.ms  */
 public static final String TRANSACTION_TIMEOUT_CONFIG = 
"transaction.timeout.ms";
 public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount 
of time in ms that the transaction coordinator will wait for a transaction 
status update from the producer before proactively aborting the ongoing 
transaction." +
+"The transaction status update happens on the first producer send, 
on adding new partitions to the transaction, and on commit. " +

Review Comment:
   I think this configuration is a bit misleading as currently documented. It 
looks to me like the implementation computes the timeout from the start of the 
transaction (i.e. the first call to `AddPartitionsToTxn`). We use 
`txnStartTimestamp` for this purpose here: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L134.
 This value is only updated on the initial transition to the `Ongoing` state: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala#L331.
 
   
   I'd suggest we rephrase this configuration to something like this:
   > The maximum amount of time in milliseconds that a transaction will remain 
open before the coordinator proactively aborts. The start of the transaction is 
set at the time that the first partition is added to it. If this value is 
larger than the transaction.max.timeout.ms setting in the broker, the request 
will fail with a InvalidTxnTimeoutException error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2023-01-19 Thread GitBox


cadonna merged PR #12821:
URL: https://github.com/apache/kafka/pull/12821


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2023-01-19 Thread GitBox


cadonna commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1397370205

   Build failures are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.clients.consumer.KafkaConsumerTest.testReturnRecordsDuringRebalance()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStart
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13119: KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging

2023-01-19 Thread GitBox


kirktrue commented on code in PR #13119:
URL: https://github.com/apache/kafka/pull/13119#discussion_r1081589480


##
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java:
##
@@ -240,6 +240,9 @@ static String handleOutput(final HttpURLConnection con) 
throws IOException {
 int responseCode = con.getResponseCode();
 log.debug("handleOutput - responseCode: {}", responseCode);
 
+// NOTE: the contents of the response should not be logged so that we 
don't leak any
+// sensitive data.
+// TODO: is it OK to log the error response body and/or its formatted 
version?

Review Comment:
   @smjn: Thanks for referencing the RFC and section. I updated the comments to 
include the link. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-19 Thread GitBox


dajac commented on PR #13112:
URL: https://github.com/apache/kafka/pull/13112#issuecomment-1397338100

   @jolshan Updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-19 Thread GitBox


dajac commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1081583343


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##
@@ -215,4 +223,86 @@ CompletableFuture deleteOffsets(
 OffsetDeleteRequestData request,
 BufferSupplier bufferSupplier
 );
+
+/**
+ * Return the partition index for the given Group.
+ *
+ * @param groupId   The group id.
+ *
+ * @return The partition index.
+ */
+int partitionFor(String groupId);
+
+/**
+ * Commit or abort the pending transactional offsets for the given 
partitions.
+ *
+ * @param producerIdThe producer id.
+ * @param partitionsThe partitions.
+ * @param transactionResult The result of the transaction.
+ */
+void onTransactionCompleted(
+long producerId,
+Iterable partitions,
+TransactionResult transactionResult
+);
+
+/**
+ * Delete the provided partitions' offsets.
+ *
+ * @param topicPartitions   The deleted partitions.
+ * @param bufferSupplierThe buffer supplier tight to the request 
thread.
+ */
+void onPartitionsDeleted(
+List topicPartitions,
+BufferSupplier bufferSupplier
+);
+
+/**
+ * Group coordinator is now the leader for the given partition at the
+ * given leader epoch. It should load cached state from the partition
+ * and begin handling requests for groups mapped to it.
+ *
+ * @param partitionIndexThe partition index.
+ * @param partitionLeaderEpoch  The leader epoch of the partition.

Review Comment:
   That's right. It is all about the `__consumer_offsets`. `coordinatorEpoch` 
is the leader epoch of the partition. I used leader epoch here because it is a 
bit clearer to me.
   
   Let me update those names to be consistent. Good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13119: KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging

2023-01-19 Thread GitBox


kirktrue commented on code in PR #13119:
URL: https://github.com/apache/kafka/pull/13119#discussion_r1081575409


##
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java:
##
@@ -240,6 +240,9 @@ static String handleOutput(final HttpURLConnection con) 
throws IOException {
 int responseCode = con.getResponseCode();
 log.debug("handleOutput - responseCode: {}", responseCode);
 
+// NOTE: the contents of the response should not be logged so that we 
don't leak any
+// sensitive data.
+// TODO: is it OK to log the error response body and/or its formatted 
version?

Review Comment:
   @smjn - No known instance that I'm aware of. A security review brought it to 
my attention, so addressing preemptively. The main fix is to remove the logging 
of the successful response, which would include outputting the access token to 
the logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13119: KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging

2023-01-19 Thread GitBox


kirktrue commented on code in PR #13119:
URL: https://github.com/apache/kafka/pull/13119#discussion_r1081575409


##
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java:
##
@@ -240,6 +240,9 @@ static String handleOutput(final HttpURLConnection con) 
throws IOException {
 int responseCode = con.getResponseCode();
 log.debug("handleOutput - responseCode: {}", responseCode);
 
+// NOTE: the contents of the response should not be logged so that we 
don't leak any
+// sensitive data.
+// TODO: is it OK to log the error response body and/or its formatted 
version?

Review Comment:
   @smjn - No known instance that I'm aware of. A security review brought it to 
my attention, so addressing preemptively.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-19 Thread GitBox


dajac commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1081571516


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3448,7 +3445,7 @@ class KafkaApisTest {
 val expectedJoinGroupResponse = new JoinGroupResponseData()
   .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
   .setMemberId("member")
-  .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
+  .setProtocolName(if (version >= 7) null else 
kafka.coordinator.group.GroupCoordinator.NoProtocol)

Review Comment:
   Yes, we should but I wanted to do this separately because I don't know yet 
where to put them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-19 Thread GitBox


dajac commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1081571061


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -511,4 +532,53 @@ class GroupCoordinatorAdapter(
 
 future
   }
+
+  override def partitionFor(groupId: String): Int = {
+coordinator.partitionFor(groupId)
+  }
+
+  override def onTransactionCompleted(
+producerId: Long,
+partitions: java.lang.Iterable[TopicPartition],
+transactionResult: TransactionResult
+  ): Unit = {
+coordinator.scheduleHandleTxnCompletion(
+  producerId,
+  partitions.asScala,
+  transactionResult
+)
+  }
+
+  override def onPartitionsDeleted(
+topicPartitions: util.List[TopicPartition],
+bufferSupplier: BufferSupplier
+  ): Unit = {
+coordinator.handleDeletedPartitions(topicPartitions.asScala, 
RequestLocal(bufferSupplier))

Review Comment:
   We cannot use RequestLocal in the interface because it is part of core. 
Therefore, we have to pass the underlying buffer supplier and build 
RequestLocal object here. Note that we did this for other apis that we have 
merged so far.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-19 Thread GitBox


fvaleri commented on PR #13131:
URL: https://github.com/apache/kafka/pull/13131#issuecomment-1397311675

   Take a look at `DumpLogSegments` to see how `CommandDefaultOptions` is used. 
It is much better than building the option list at the start of the 
`main/execute` method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #8431: MINOR: Rename description of flatMapValues transformation

2023-01-19 Thread GitBox


bbejeck commented on PR #8431:
URL: https://github.com/apache/kafka/pull/8431#issuecomment-1397212359

   cherry-picked to 3.4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13130: Fix upgrade compatibility issue from older versions to 3.4

2023-01-19 Thread GitBox


mumrah merged PR #13130:
URL: https://github.com/apache/kafka/pull/13130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #8431: MINOR: Rename description of flatMapValues transformation

2023-01-19 Thread GitBox


bbejeck commented on PR #8431:
URL: https://github.com/apache/kafka/pull/8431#issuecomment-1397198456

   Hi @maseiler, can you do the same PR for the  [3.3 version of the streams 
developer 
guide](https://github.com/apache/kafka-site/blob/asf-site/33/streams/developer-guide/dsl-api.html)?
 
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #8431: MINOR: Rename description of flatMapValues transformation

2023-01-19 Thread GitBox


bbejeck commented on PR #8431:
URL: https://github.com/apache/kafka/pull/8431#issuecomment-1397167523

   Thanks for the contribution @maseiler !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #8431: MINOR: Rename description of flatMapValues transformation

2023-01-19 Thread GitBox


bbejeck commented on PR #8431:
URL: https://github.com/apache/kafka/pull/8431#issuecomment-1397166959

   Merged #8431 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck merged pull request #8431: MINOR: Rename description of flatMapValues transformation

2023-01-19 Thread GitBox


bbejeck merged PR #8431:
URL: https://github.com/apache/kafka/pull/8431


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] maseiler commented on pull request #8431: MINOR: Rename description of flatMapValues transformation

2023-01-19 Thread GitBox


maseiler commented on PR #8431:
URL: https://github.com/apache/kafka/pull/8431#issuecomment-1397133912

   @mjsax, I solved the merge conflict and rebased it on the latest version


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2023-01-19 Thread GitBox


clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1081305288


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##
@@ -187,151 +137,107 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
 // Thrown stateMgr exception will not be wrapped.
 assertEquals("state manager failed to close", thrown.getMessage());
 
-ctrl.verify();
+// The unlock logic should still be executed.
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerThrowsExceptionWhenDirty() {
-expect(stateManager.taskId()).andReturn(taskId);
-
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-stateDirectory.unlock(taskId);
-
-ctrl.checkOrder(true);
-ctrl.replay();
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
+doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
 assertThrows(
 ProcessorStateException.class,
 () -> StateManagerUtil.closeStateManager(
 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerWithStateStoreWipeOut() {
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
-
+final InOrder inOrder = inOrder(stateManager, stateDirectory);
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 // The `baseDir` will be accessed when attempting to delete the state 
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-stateDirectory.unlock(taskId);
-expectLastCall();
-
-ctrl.checkOrder(true);
-ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
 StateManagerUtil.closeStateManager(logger,
 "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-ctrl.verify();
+inOrder.verify(stateManager).close();
+inOrder.verify(stateDirectory).unlock(taskId);
+verifyNoMoreInteractions(stateManager, stateDirectory);
 }
 
 @Test
-public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
+public void  shouldStillWipeStateStoresIfCloseThrowsException() {
 final File randomFile = new File("/random/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
-expect(stateManager.baseDir()).andReturn(randomFile);
 
-Utils.delete(randomFile);
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
+doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
+when(stateManager.baseDir()).thenReturn(randomFile);
 
-stateDirectory.unlock(taskId);
-expectLastCall();
+try (MockedStatic utils = mockStatic(Utils.class)) {
+assertThrows(ProcessorStateException.class, () ->
+StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+}
 
-ctrl.checkOrder(true);
-ctrl.replay();
-
-replayAll();
-
-assertThrows(ProcessorStateException.class, () ->
-StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
-public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
+public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
 final File unknownFile = new File("/unknown/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
-
-expect(stateManager.baseDir()).andReturn(unknownFile);
-
-Utils.delete(unknownFile);
-expectLastCall().andThrow(new IOException("Deletion failed"));
+ 

[GitHub] [kafka] cadonna merged pull request #12818: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2023-01-19 Thread GitBox


cadonna merged PR #12818:
URL: https://github.com/apache/kafka/pull/12818


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12818: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2023-01-19 Thread GitBox


cadonna commented on PR #12818:
URL: https://github.com/apache/kafka/pull/12818#issuecomment-1397015212

   Build failures are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.controller.ControllerIntegrationTest.testPartitionReassignmentToBrokerWithOfflineLogDir()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, 
useInlinePem=false
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testSnapshotSaveAndLoad()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools

2023-01-19 Thread GitBox


fvaleri commented on code in PR #13127:
URL: https://github.com/apache/kafka/pull/13127#discussion_r1081230571


##
build.gradle:
##
@@ -1757,6 +1757,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+implementation project(':core')

Review Comment:
   @cadonna yes, there is some confusion on which tools need to be migrated. 
For example, `DumpLogSegments` has multiple dependencies on `core` classes, so 
it should be excluded, but was added as a sub task. Checking with @mimaison how 
we want to precede with these. All the rest can be migrated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-19 Thread GitBox


fvaleri commented on PR #13131:
URL: https://github.com/apache/kafka/pull/13131#issuecomment-1396923697

   @clolov @vamossagar12 @tinaselenge 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-19 Thread GitBox


fvaleri commented on PR #13131:
URL: https://github.com/apache/kafka/pull/13131#issuecomment-1396922639

   @mimaison @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri opened a new pull request, #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-19 Thread GitBox


fvaleri opened a new pull request, #13131:
URL: https://github.com/apache/kafka/pull/13131

   These classes are required by most commands, so they must be migrated first.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2023-01-19 Thread GitBox


cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1081170724


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##
@@ -187,151 +137,107 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
 // Thrown stateMgr exception will not be wrapped.
 assertEquals("state manager failed to close", thrown.getMessage());
 
-ctrl.verify();
+// The unlock logic should still be executed.
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerThrowsExceptionWhenDirty() {
-expect(stateManager.taskId()).andReturn(taskId);
-
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-stateDirectory.unlock(taskId);
-
-ctrl.checkOrder(true);
-ctrl.replay();
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
+doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
 assertThrows(
 ProcessorStateException.class,
 () -> StateManagerUtil.closeStateManager(
 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerWithStateStoreWipeOut() {
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
-
+final InOrder inOrder = inOrder(stateManager, stateDirectory);
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 // The `baseDir` will be accessed when attempting to delete the state 
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-stateDirectory.unlock(taskId);
-expectLastCall();
-
-ctrl.checkOrder(true);
-ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
 StateManagerUtil.closeStateManager(logger,
 "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-ctrl.verify();
+inOrder.verify(stateManager).close();
+inOrder.verify(stateDirectory).unlock(taskId);
+verifyNoMoreInteractions(stateManager, stateDirectory);
 }
 
 @Test
-public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
+public void  shouldStillWipeStateStoresIfCloseThrowsException() {
 final File randomFile = new File("/random/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
-expect(stateManager.baseDir()).andReturn(randomFile);
 
-Utils.delete(randomFile);
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
+doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
+when(stateManager.baseDir()).thenReturn(randomFile);
 
-stateDirectory.unlock(taskId);
-expectLastCall();
+try (MockedStatic utils = mockStatic(Utils.class)) {
+assertThrows(ProcessorStateException.class, () ->
+StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+}
 
-ctrl.checkOrder(true);
-ctrl.replay();
-
-replayAll();
-
-assertThrows(ProcessorStateException.class, () ->
-StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
-public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
+public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
 final File unknownFile = new File("/unknown/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
-
-expect(stateManager.baseDir()).andReturn(unknownFile);
-
-Utils.delete(unknownFile);
-expectLastCall().andThrow(new IOException("Deletion failed"));
+

[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-19 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1081145849


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   I've gone ahead and made the changes to convert the 
`KafkaConfigBackingStore` APIs to be synchronous even when EOS is disabled 
(thus making the behavior more in-line with the EOS case). This simplifies 
things significantly while also providing consistency w.r.t error handling 
across all the `KafkaConfigBackingStore` APIs without making invasive changes.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
 try {
 fencableProducer.beginTransaction();
-fencableProducer.send(new ProducerRecord<>(topic, key, value));
+fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   I've removed the usage of the producer callback here since we're moving to 
synchronous usage of producer send in the non-EOS case as well anyway (aside 
from the earlier point that it doesn't really make sense to handle errors via 
both a callback as well as `commitTransaction`). The behavior of surfacing 
exceptions synchronously is similar in both cases now; one through calling 
`get()` on the returned future from `Producer::send` and the other through 
`Producer::commitTransaction`.
   
   > Another option for the above issue could be changing the exception mapper 
to concatenate all the exception messages from the exception chain.
   
   Yet another option for this could be to simply append a "Check the worker 
logs for more details on the error" to the top level exception's message in the 
REST API response (the worker logs will have the entire exception chain). 
Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2023-01-19 Thread GitBox


clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1081139070


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##
@@ -187,151 +137,107 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
 // Thrown stateMgr exception will not be wrapped.
 assertEquals("state manager failed to close", thrown.getMessage());
 
-ctrl.verify();
+// The unlock logic should still be executed.
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerThrowsExceptionWhenDirty() {
-expect(stateManager.taskId()).andReturn(taskId);
-
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-stateDirectory.unlock(taskId);
-
-ctrl.checkOrder(true);
-ctrl.replay();
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
+doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
 assertThrows(
 ProcessorStateException.class,
 () -> StateManagerUtil.closeStateManager(
 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerWithStateStoreWipeOut() {
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
-
+final InOrder inOrder = inOrder(stateManager, stateDirectory);
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 // The `baseDir` will be accessed when attempting to delete the state 
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-stateDirectory.unlock(taskId);
-expectLastCall();
-
-ctrl.checkOrder(true);
-ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
 StateManagerUtil.closeStateManager(logger,
 "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-ctrl.verify();
+inOrder.verify(stateManager).close();
+inOrder.verify(stateDirectory).unlock(taskId);
+verifyNoMoreInteractions(stateManager, stateDirectory);
 }
 
 @Test
-public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
+public void  shouldStillWipeStateStoresIfCloseThrowsException() {
 final File randomFile = new File("/random/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
-expect(stateManager.baseDir()).andReturn(randomFile);
 
-Utils.delete(randomFile);
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
+doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
+when(stateManager.baseDir()).thenReturn(randomFile);
 
-stateDirectory.unlock(taskId);
-expectLastCall();
+try (MockedStatic utils = mockStatic(Utils.class)) {
+assertThrows(ProcessorStateException.class, () ->
+StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+}
 
-ctrl.checkOrder(true);
-ctrl.replay();
-
-replayAll();
-
-assertThrows(ProcessorStateException.class, () ->
-StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
-public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
+public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
 final File unknownFile = new File("/unknown/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
-
-expect(stateManager.baseDir()).andReturn(unknownFile);
-
-Utils.delete(unknownFile);
-expectLastCall().andThrow(new IOException("Deletion failed"));
+ 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools

2023-01-19 Thread GitBox


fvaleri commented on code in PR #13127:
URL: https://github.com/apache/kafka/pull/13127#discussion_r1081057443


##
build.gradle:
##
@@ -1757,6 +1757,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+implementation project(':core')

Review Comment:
   Yes I'm working on https://issues.apache.org/jira/browse/KAFKA-14628 and 
really close to opening the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools

2023-01-19 Thread GitBox


vamossagar12 commented on code in PR #13127:
URL: https://github.com/apache/kafka/pull/13127#discussion_r1081021244


##
build.gradle:
##
@@ -1757,6 +1757,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+implementation project(':core')

Review Comment:
   >I think there they talk about the dependencies of tools' test module not of 
the tools per se.
   
   Oh Ok. Got it. Thanks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools

2023-01-19 Thread GitBox


cadonna commented on code in PR #13127:
URL: https://github.com/apache/kafka/pull/13127#discussion_r1081015656


##
build.gradle:
##
@@ -1757,6 +1757,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+implementation project(':core')

Review Comment:
   Ah, great! Thanks for sharing! Good to know that people is aware!
   
   > Also, not sure if this comment from Ismael has any relevance here: 
https://github.com/apache/kafka/pull/13095#issuecomment-1376021193?
   
   I think there they talk about the dependencies of tools' test module not of 
the tools per se. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools

2023-01-19 Thread GitBox


vamossagar12 commented on code in PR #13127:
URL: https://github.com/apache/kafka/pull/13127#discussion_r1081010418


##
build.gradle:
##
@@ -1757,6 +1757,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+implementation project(':core')

Review Comment:
   Also, not sure if this comment from Ismael has any relevance here: 
https://github.com/apache/kafka/pull/13095#issuecomment-1376021193?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools

2023-01-19 Thread GitBox


vamossagar12 commented on code in PR #13127:
URL: https://github.com/apache/kafka/pull/13127#discussion_r1081009344


##
build.gradle:
##
@@ -1757,6 +1757,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+implementation project(':core')

Review Comment:
   Thanks @clolov i was about to type the same thing :D So this PR and I guess 
your PR is blocked until https://github.com/apache/kafka/pull/13122 is merged?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools

2023-01-19 Thread GitBox


clolov commented on code in PR #13127:
URL: https://github.com/apache/kafka/pull/13127#discussion_r1081007952


##
build.gradle:
##
@@ -1757,6 +1757,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+implementation project(':core')

Review Comment:
   Hello, hello, chipping in to say this is the same conversation we had with 
@fvaleri in https://github.com/apache/kafka/pull/13122. There should be a 
separate PR soon containing changes for CommandLineUtils (et al.) to unblock 
all JIRA tickets depending on it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools

2023-01-19 Thread GitBox


cadonna commented on code in PR #13127:
URL: https://github.com/apache/kafka/pull/13127#discussion_r1081000404


##
build.gradle:
##
@@ -1757,6 +1757,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+implementation project(':core')

Review Comment:
   Ticket https://issues.apache.org/jira/browse/KAFKA-14525 (the parent of 
https://issues.apache.org/jira/browse/KAFKA-14586) says 
   
   > tools that don't require access to `core` classes and communicate via the 
kafka protocol (typically by using the client classes) should be moved to the 
`tools` module.
   
   This addition contradicts that requirement.
   
   As far as I see, `kafka.utils.CommandLineUtils` is the only dependency to 
`core`. Is that true? Would it be possible to move 
`kafka.utils.CommandLineUtils` to a different module? Or should we even get 
completely rid of that dependency by rewriting it in java and put it in a 
different module.
   
   https://issues.apache.org/jira/browse/KAFKA-14576 should have a similar 
issue since also the console consumer should be moved to tools and it has a 
dependency to  `kafka.utils.CommandLineUtils`.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2023-01-19 Thread GitBox


cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1080946697


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##
@@ -187,151 +137,107 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
 // Thrown stateMgr exception will not be wrapped.
 assertEquals("state manager failed to close", thrown.getMessage());
 
-ctrl.verify();
+// The unlock logic should still be executed.
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerThrowsExceptionWhenDirty() {
-expect(stateManager.taskId()).andReturn(taskId);
-
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-stateDirectory.unlock(taskId);
-
-ctrl.checkOrder(true);
-ctrl.replay();
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
+doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
 assertThrows(
 ProcessorStateException.class,
 () -> StateManagerUtil.closeStateManager(
 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerWithStateStoreWipeOut() {
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
-
+final InOrder inOrder = inOrder(stateManager, stateDirectory);
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 // The `baseDir` will be accessed when attempting to delete the state 
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-stateDirectory.unlock(taskId);
-expectLastCall();
-
-ctrl.checkOrder(true);
-ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
 StateManagerUtil.closeStateManager(logger,
 "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-ctrl.verify();
+inOrder.verify(stateManager).close();
+inOrder.verify(stateDirectory).unlock(taskId);
+verifyNoMoreInteractions(stateManager, stateDirectory);
 }
 
 @Test
-public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
+public void  shouldStillWipeStateStoresIfCloseThrowsException() {
 final File randomFile = new File("/random/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
-expect(stateManager.baseDir()).andReturn(randomFile);
 
-Utils.delete(randomFile);
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
+doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
+when(stateManager.baseDir()).thenReturn(randomFile);
 
-stateDirectory.unlock(taskId);
-expectLastCall();
+try (MockedStatic utils = mockStatic(Utils.class)) {
+assertThrows(ProcessorStateException.class, () ->
+StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+}
 
-ctrl.checkOrder(true);
-ctrl.replay();
-
-replayAll();
-
-assertThrows(ProcessorStateException.class, () ->
-StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
-public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
+public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
 final File unknownFile = new File("/unknown/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
-
-expect(stateManager.baseDir()).andReturn(unknownFile);
-
-Utils.delete(unknownFile);
-expectLastCall().andThrow(new IOException("Deletion failed"));
+

[GitHub] [kafka] cadonna commented on pull request #12739: Replace EasyMock and PowerMock with Mockito | TimeOrderedCachingPersistentWindowStoreTest

2023-01-19 Thread GitBox


cadonna commented on PR #12739:
URL: https://github.com/apache/kafka/pull/12739#issuecomment-1396582822

   @shekhar-rajak Do you have any updates for this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13127: Kafka 14586: Moving StreamResetter to tools

2023-01-18 Thread GitBox


vamossagar12 commented on PR #13127:
URL: https://github.com/apache/kafka/pull/13127#issuecomment-1396439952

   Adding @fvaleri . There are checkstyle issues, which I would fix. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests

2023-01-18 Thread GitBox


philipnee commented on code in PR #13021:
URL: https://github.com/apache/kafka/pull/13021#discussion_r1080743797


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+public class CommitRequestManager implements RequestManager {
+private final Queue stagedCommits;
+// TODO: We will need to refactor the subscriptionState
+private final SubscriptionState subscriptionState;
+private final Logger log;
+private final Optional autoCommitState;

Review Comment:
   It makes sense to use a MaxValue as well, my counter argument is, i think 
explicitly disabling autoCommitState makes the logic more straightforward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg commented on a diff in pull request #13130: Fix upgrade compatibility issue from older versions to 3.4

2023-01-18 Thread GitBox


akhileshchg commented on code in PR #13130:
URL: https://github.com/apache/kafka/pull/13130#discussion_r1080739852


##
core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala:
##
@@ -157,7 +157,8 @@ case class MetaProperties(
 object BrokerMetadataCheckpoint extends Logging {
   def getBrokerMetadataAndOfflineDirs(
 logDirs: collection.Seq[String],
-ignoreMissing: Boolean
+ignoreMissing: Boolean,
+kraftMode: Boolean = false

Review Comment:
   Makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13130: Fix upgrade compatibility issue from older versions to 3.4

2023-01-18 Thread GitBox


ijuma commented on code in PR #13130:
URL: https://github.com/apache/kafka/pull/13130#discussion_r1080736930


##
core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala:
##
@@ -157,7 +157,8 @@ case class MetaProperties(
 object BrokerMetadataCheckpoint extends Logging {
   def getBrokerMetadataAndOfflineDirs(
 logDirs: collection.Seq[String],
-ignoreMissing: Boolean
+ignoreMissing: Boolean,
+kraftMode: Boolean = false

Review Comment:
   Default arguments like this make it easy to miss places that should be 
updated. Better to be explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg opened a new pull request, #13130: Fix upgrade compatibility issue from older versions to 3.4

2023-01-18 Thread GitBox


akhileshchg opened a new pull request, #13130:
URL: https://github.com/apache/kafka/pull/13130

   3.4 introduced a change that requires cluster.id to be present in 
meta.properties if the file is available. This information is not persisted by 
the brokers in old versions (< 0.10). So on upgrade, the requirement check 
fails and halts the broker start-up. Fixed the requirement to ensure cluster.id 
is not required in zk mode on upgrade.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Gerrrr opened a new pull request, #13129: KAFKA-14638: Elaborate when transaction.timeout.ms resets

2023-01-18 Thread GitBox


Ge opened a new pull request, #13129:
URL: https://github.com/apache/kafka/pull/13129

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests

2023-01-18 Thread GitBox


hachikuji commented on code in PR #13021:
URL: https://github.com/apache/kafka/pull/13021#discussion_r1080689650


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+public class CommitRequestManager implements RequestManager {
+private final Queue stagedCommits;
+// TODO: We will need to refactor the subscriptionState
+private final SubscriptionState subscriptionState;
+private final Logger log;
+private final Optional autoCommitState;
+private final Optional 
coordinatorRequestManager;
+private final GroupStateManager groupState;
+
+public CommitRequestManager(
+final Time time,
+final LogContext logContext,
+final SubscriptionState subscriptionState,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final GroupStateManager groupState) {
+this.log = logContext.logger(getClass());
+this.stagedCommits = new LinkedList<>();
+if (config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+final long autoCommitInterval =
+
Integer.toUnsignedLong(config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+this.autoCommitState = Optional.of(new AutoCommitState(time, 
autoCommitInterval));
+} else {
+this.autoCommitState = Optional.empty();
+}
+this.coordinatorRequestManager = 
Optional.ofNullable(coordinatorRequestManager);
+this.groupState = groupState;
+this.subscriptionState = subscriptionState;
+}
+
+// Visible for testing
+CommitRequestManager(
+final Time time,
+final LogContext logContext,
+final SubscriptionState subscriptionState,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final GroupStateManager groupState,
+final AutoCommitState autoCommitState) {
+this.log = logContext.logger(getClass());
+this.subscriptionState = subscriptionState;
+this.coordinatorRequestManager = 
Optional.ofNullable(coordinatorRequestManager);
+this.groupState = groupState;
+this.autoCommitState = Optional.ofNullable(autoCommitState);
+this.stagedCommits = new LinkedList<>();
+}
+
+/**
+ * Poll for the commit request if there's any. The function will also try 
to autocommit, if enabled.
+ *
+ * @param currentTimeMs
+ * @return
+ */
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (!coordinatorRequestManager.isPresent()) {
+return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>());
+}
+
+maybeAutoCommit(currentTimeMs);
+
+if (stagedCommits.isEmpty()) {
+return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>());
+}
+
+List unsentCommitRequests =
+

[GitHub] [kafka] philipnee commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown

2023-01-18 Thread GitBox


philipnee commented on PR #13125:
URL: https://github.com/apache/kafka/pull/13125#issuecomment-1396238469

   Thanks for the PR and the issue @yufeiyan1220 - I wonder if the clean up is 
necessary, as the metrics will be closed upon the client closing. Willing to 
hear what others say.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1080666149


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##
@@ -215,4 +223,86 @@ CompletableFuture deleteOffsets(
 OffsetDeleteRequestData request,
 BufferSupplier bufferSupplier
 );
+
+/**
+ * Return the partition index for the given Group.
+ *
+ * @param groupId   The group id.
+ *
+ * @return The partition index.
+ */
+int partitionFor(String groupId);
+
+/**
+ * Commit or abort the pending transactional offsets for the given 
partitions.
+ *
+ * @param producerIdThe producer id.
+ * @param partitionsThe partitions.
+ * @param transactionResult The result of the transaction.
+ */
+void onTransactionCompleted(
+long producerId,
+Iterable partitions,
+TransactionResult transactionResult
+);
+
+/**
+ * Delete the provided partitions' offsets.
+ *
+ * @param topicPartitions   The deleted partitions.
+ * @param bufferSupplierThe buffer supplier tight to the request 
thread.
+ */
+void onPartitionsDeleted(
+List topicPartitions,
+BufferSupplier bufferSupplier
+);
+
+/**
+ * Group coordinator is now the leader for the given partition at the
+ * given leader epoch. It should load cached state from the partition
+ * and begin handling requests for groups mapped to it.
+ *
+ * @param partitionIndexThe partition index.
+ * @param partitionLeaderEpoch  The leader epoch of the partition.

Review Comment:
   It seems like in some places we avoid mentioning consumer offsets, but in 
other places we do mention the topic partitions. (see below for the configs)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1080665363


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##
@@ -215,4 +223,86 @@ CompletableFuture deleteOffsets(
 OffsetDeleteRequestData request,
 BufferSupplier bufferSupplier
 );
+
+/**
+ * Return the partition index for the given Group.
+ *
+ * @param groupId   The group id.
+ *
+ * @return The partition index.
+ */
+int partitionFor(String groupId);
+
+/**
+ * Commit or abort the pending transactional offsets for the given 
partitions.
+ *
+ * @param producerIdThe producer id.
+ * @param partitionsThe partitions.
+ * @param transactionResult The result of the transaction.
+ */
+void onTransactionCompleted(
+long producerId,
+Iterable partitions,
+TransactionResult transactionResult
+);
+
+/**
+ * Delete the provided partitions' offsets.
+ *
+ * @param topicPartitions   The deleted partitions.
+ * @param bufferSupplierThe buffer supplier tight to the request 
thread.
+ */
+void onPartitionsDeleted(
+List topicPartitions,
+BufferSupplier bufferSupplier
+);
+
+/**
+ * Group coordinator is now the leader for the given partition at the
+ * given leader epoch. It should load cached state from the partition
+ * and begin handling requests for groups mapped to it.
+ *
+ * @param partitionIndexThe partition index.
+ * @param partitionLeaderEpoch  The leader epoch of the partition.

Review Comment:
   Few questions -- this is the coordinator's state topic partitions -- ie 
__consumer_offsets or a different partition?
   
   I also noticed in the current group coordinator implementation, we call the 
second parameter "coordinatorEpoch" is the equivalent to the leader epoch for 
the the partition its managing?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##
@@ -215,4 +223,86 @@ CompletableFuture deleteOffsets(
 OffsetDeleteRequestData request,
 BufferSupplier bufferSupplier
 );
+
+/**
+ * Return the partition index for the given Group.
+ *
+ * @param groupId   The group id.
+ *
+ * @return The partition index.
+ */
+int partitionFor(String groupId);
+
+/**
+ * Commit or abort the pending transactional offsets for the given 
partitions.
+ *
+ * @param producerIdThe producer id.
+ * @param partitionsThe partitions.
+ * @param transactionResult The result of the transaction.
+ */
+void onTransactionCompleted(
+long producerId,
+Iterable partitions,
+TransactionResult transactionResult
+);
+
+/**
+ * Delete the provided partitions' offsets.
+ *
+ * @param topicPartitions   The deleted partitions.
+ * @param bufferSupplierThe buffer supplier tight to the request 
thread.
+ */
+void onPartitionsDeleted(
+List topicPartitions,
+BufferSupplier bufferSupplier
+);
+
+/**
+ * Group coordinator is now the leader for the given partition at the
+ * given leader epoch. It should load cached state from the partition
+ * and begin handling requests for groups mapped to it.
+ *
+ * @param partitionIndexThe partition index.
+ * @param partitionLeaderEpoch  The leader epoch of the partition.

Review Comment:
   Few questions -- this is the coordinator's state topic partitions -- ie 
__consumer_offsets or a different partition?
   
   I also noticed in the current group coordinator implementation, we call the 
second parameter "coordinatorEpoch" is the equivalent to the leader epoch for 
the the partition it's managing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1080662718


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##
@@ -215,4 +223,86 @@ CompletableFuture deleteOffsets(
 OffsetDeleteRequestData request,
 BufferSupplier bufferSupplier
 );
+
+/**
+ * Return the partition index for the given Group.
+ *
+ * @param groupId   The group id.
+ *
+ * @return The partition index.
+ */
+int partitionFor(String groupId);
+
+/**
+ * Commit or abort the pending transactional offsets for the given 
partitions.
+ *
+ * @param producerIdThe producer id.
+ * @param partitionsThe partitions.
+ * @param transactionResult The result of the transaction.
+ */
+void onTransactionCompleted(
+long producerId,
+Iterable partitions,
+TransactionResult transactionResult
+);
+
+/**
+ * Delete the provided partitions' offsets.

Review Comment:
   Is it clearer to say "Remove the provided deleted partitions offsets"? 
   I guess the parameter explains that it was deleted partitions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1080661020


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3448,7 +3445,7 @@ class KafkaApisTest {
 val expectedJoinGroupResponse = new JoinGroupResponseData()
   .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
   .setMemberId("member")
-  .setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
+  .setProtocolName(if (version >= 7) null else 
kafka.coordinator.group.GroupCoordinator.NoProtocol)

Review Comment:
   Did we want to move these constants too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erichaagdev opened a new pull request, #13128: MINOR: Define a root project name in the Gradle settings file

2023-01-18 Thread GitBox


erichaagdev opened a new pull request, #13128:
URL: https://github.com/apache/kafka/pull/13128

   It is a good practice to always define a root project name. However, this 
change is specifically being made to address a build caching miss as a result 
of not having the root project name defined.
   
   The `aggregatedJavadoc` task takes in the root project name as an input. 
When executing the build from two uniquely named directories, we will see the 
second execution of `aggregatedJavadoc` will be re-executed due to computed 
build cache key being different.
   
   Here are the steps to reproduce the issue:
   
   1. Remove the contents of the Gradle local build cache with `rm -rf 
~/.gradle/caches/build-cache-1`. WARNING: This is a destructive action, 
consider renaming the directory instead if you need to preserve the local build 
cache.
   2. Clone and execute the project with `git clone --depth 1 
g...@github.com:apache/kafka.git first_build && (cd first_build && ./gradlew 
aggregatedJavadoc --build-cache)`.
   3. Clone and execute the project again with `git clone --depth 1 
g...@github.com:apache/kafka.git second_build && (cd second_build && ./gradlew 
aggregatedJavadoc --build-cache --scan)`.
   4. Agree to the Gradle Terms of Service when prompted to publish a Build 
Scan.
   5. We can see on the Performance > Task execution screen that [a cacheable 
task](https://scans.gradle.com/s/7bnq7gb4ykmdk/timeline?cacheability=cacheable=success,failed=longest)
 has been executed. We can click the Cacheable link to find [the 
culprit](https://scans.gradle.com/s/7bnq7gb4ykmdk/timeline?cacheability=cacheable=success,failed=longest#lhlgorbepbsx4).
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1080653448


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -511,4 +532,53 @@ class GroupCoordinatorAdapter(
 
 future
   }
+
+  override def partitionFor(groupId: String): Int = {
+coordinator.partitionFor(groupId)
+  }
+
+  override def onTransactionCompleted(
+producerId: Long,
+partitions: java.lang.Iterable[TopicPartition],
+transactionResult: TransactionResult
+  ): Unit = {
+coordinator.scheduleHandleTxnCompletion(
+  producerId,
+  partitions.asScala,
+  transactionResult
+)
+  }
+
+  override def onPartitionsDeleted(
+topicPartitions: util.List[TopicPartition],
+bufferSupplier: BufferSupplier
+  ): Unit = {
+coordinator.handleDeletedPartitions(topicPartitions.asScala, 
RequestLocal(bufferSupplier))

Review Comment:
   also any reason for the change from request local as a parameter to creating 
it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12922: KAFKA-14397; Don't reset producer sequence number after delivery timeout

2023-01-18 Thread GitBox


hachikuji commented on PR #12922:
URL: https://github.com/apache/kafka/pull/12922#issuecomment-1396203599

   Note I'm holding off on merging this patch because I realized when writing 
the test case that Justine suggested that the code was not correctly handling 
the case when the next in-line batch fails due to retries or timeouts 
correctly. Basically we do not reset the next expected sequence number and some 
other state. I'll try to work on this in the next couple weeks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rishiraj88 commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


rishiraj88 commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1080573968


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
   private def maybeUpdateCoordinatorEpoch(
 partitionId: Int,
-epochOpt: Option[Int]
+epochOpt: OptionalInt
   ): Boolean = {
 val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
   if (currentEpoch == null) {
-epochOpt.map(Int.box).orNull
+if (epochOpt.isPresent) epochOpt.getAsInt
+else null
   } else {
-epochOpt match {
-  case Some(epoch) if epoch > currentEpoch => epoch
-  case _ => currentEpoch
-}
+if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) 
epochOpt.getAsInt
+else currentEpoch
   }
 })
-epochOpt.forall(_ == updatedEpoch)
+if(epochOpt.isPresent) {

Review Comment:
   The point made by @jolshan looks helpful. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1080547019


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
   private def maybeUpdateCoordinatorEpoch(
 partitionId: Int,
-epochOpt: Option[Int]
+epochOpt: OptionalInt
   ): Boolean = {
 val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
   if (currentEpoch == null) {
-epochOpt.map(Int.box).orNull
+if (epochOpt.isPresent) epochOpt.getAsInt
+else null
   } else {
-epochOpt match {
-  case Some(epoch) if epoch > currentEpoch => epoch
-  case _ => currentEpoch
-}
+if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) 
epochOpt.getAsInt
+else currentEpoch
   }
 })
-epochOpt.forall(_ == updatedEpoch)
+if(epochOpt.isPresent) {

Review Comment:
   Yeah. I was only suggesting because the other provides a few more methods 
(like getOrElse letting you return null instead of an int) but yeah, probably 
not worth changing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-01-18 Thread GitBox


ijuma commented on PR #13040:
URL: https://github.com/apache/kafka/pull/13040#issuecomment-1387745094

   @satishd No worries, it happens to all of us. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


dajac commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1074022302


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -310,9 +307,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
   && partitionState.deletePartition) {
 val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-  Some(partitionState.leaderEpoch)

Review Comment:
   The entire group coordinator in scala will disappear at some point, 
including the adapter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


dajac commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1073982833


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
   private def maybeUpdateCoordinatorEpoch(
 partitionId: Int,
-epochOpt: Option[Int]
+epochOpt: OptionalInt
   ): Boolean = {
 val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
   if (currentEpoch == null) {
-epochOpt.map(Int.box).orNull
+if (epochOpt.isPresent) epochOpt.getAsInt
+else null
   } else {
-epochOpt match {
-  case Some(epoch) if epoch > currentEpoch => epoch
-  case _ => currentEpoch
-}
+if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) 
epochOpt.getAsInt
+else currentEpoch
   }
 })
-epochOpt.forall(_ == updatedEpoch)
+if(epochOpt.isPresent) {

Review Comment:
   OptionalInt seems better here because we effectively pass an int, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1073964326


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
   private def maybeUpdateCoordinatorEpoch(
 partitionId: Int,
-epochOpt: Option[Int]
+epochOpt: OptionalInt
   ): Boolean = {
 val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
   if (currentEpoch == null) {
-epochOpt.map(Int.box).orNull
+if (epochOpt.isPresent) epochOpt.getAsInt
+else null
   } else {
-epochOpt match {
-  case Some(epoch) if epoch > currentEpoch => epoch
-  case _ => currentEpoch
-}
+if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) 
epochOpt.getAsInt
+else currentEpoch
   }
 })
-epochOpt.forall(_ == updatedEpoch)
+if(epochOpt.isPresent) {

Review Comment:
   There's minor method differences between OptionalInt and Optional. 
That was my point. But maybe not big enough to change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1073964326


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
   private def maybeUpdateCoordinatorEpoch(
 partitionId: Int,
-epochOpt: Option[Int]
+epochOpt: OptionalInt
   ): Boolean = {
 val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
   if (currentEpoch == null) {
-epochOpt.map(Int.box).orNull
+if (epochOpt.isPresent) epochOpt.getAsInt
+else null
   } else {
-epochOpt match {
-  case Some(epoch) if epoch > currentEpoch => epoch
-  case _ => currentEpoch
-}
+if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) 
epochOpt.getAsInt
+else currentEpoch
   }
 })
-epochOpt.forall(_ == updatedEpoch)
+if(epochOpt.isPresent) {

Review Comment:
   There's minor method differences between OptionalInt and Optional\. 
That was my point. But maybe not big enough to change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


dajac commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1073960778


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
   private def maybeUpdateCoordinatorEpoch(
 partitionId: Int,
-epochOpt: Option[Int]
+epochOpt: OptionalInt
   ): Boolean = {
 val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
   if (currentEpoch == null) {
-epochOpt.map(Int.box).orNull
+if (epochOpt.isPresent) epochOpt.getAsInt
+else null
   } else {
-epochOpt match {
-  case Some(epoch) if epoch > currentEpoch => epoch
-  case _ => currentEpoch
-}
+if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) 
epochOpt.getAsInt
+else currentEpoch
   }
 })
-epochOpt.forall(_ == updatedEpoch)
+if(epochOpt.isPresent) {

Review Comment:
   > I also wonder if there is a cleaner way to write the above methods with 
the OptionalInt. But perhaps not.
   
   This is the best I came up with. 
   
   > I see the options here are limited. Did you consider using java optionals 
here too? I suppose those are a bit more heavyweight and maybe still don't 
provide much better methods.
   
   I don't understand your point here. We already use java optionals here now.



##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -511,4 +532,53 @@ class GroupCoordinatorAdapter(
 
 future
   }
+
+  override def partitionFor(groupId: String): Int = {
+coordinator.partitionFor(groupId)
+  }
+
+  override def onTransactionCompleted(

Review Comment:
   I found the old name a bit weird. I have also tried to name all these 
"callbacks" similarly. They all start with `on` now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


dajac commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1073960778


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
   private def maybeUpdateCoordinatorEpoch(
 partitionId: Int,
-epochOpt: Option[Int]
+epochOpt: OptionalInt
   ): Boolean = {
 val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
   if (currentEpoch == null) {
-epochOpt.map(Int.box).orNull
+if (epochOpt.isPresent) epochOpt.getAsInt
+else null
   } else {
-epochOpt match {
-  case Some(epoch) if epoch > currentEpoch => epoch
-  case _ => currentEpoch
-}
+if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) 
epochOpt.getAsInt
+else currentEpoch
   }
 })
-epochOpt.forall(_ == updatedEpoch)
+if(epochOpt.isPresent) {

Review Comment:
   > I also wonder if there is a cleaner way to write the above methods with 
the OptionalInt. But perhaps not.
   
   This is the best I came up with. I have also tried to name all these 
"callbacks" similarly. They all start with `on` now.
   
   > I see the options here are limited. Did you consider using java optionals 
here too? I suppose those are a bit more heavyweight and maybe still don't 
provide much better methods.
   
   I don't understand your point here. We already use java optionals here now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


dajac commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1073960778


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
   private def maybeUpdateCoordinatorEpoch(
 partitionId: Int,
-epochOpt: Option[Int]
+epochOpt: OptionalInt
   ): Boolean = {
 val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
   if (currentEpoch == null) {
-epochOpt.map(Int.box).orNull
+if (epochOpt.isPresent) epochOpt.getAsInt
+else null
   } else {
-epochOpt match {
-  case Some(epoch) if epoch > currentEpoch => epoch
-  case _ => currentEpoch
-}
+if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) 
epochOpt.getAsInt
+else currentEpoch
   }
 })
-epochOpt.forall(_ == updatedEpoch)
+if(epochOpt.isPresent) {

Review Comment:
   > I also wonder if there is a cleaner way to write the above methods with 
the OptionalInt. But perhaps not.
   
   This is the best I came up with.
   
   > I see the options here are limited. Did you consider using java optionals 
here too? I suppose those are a bit more heavyweight and maybe still don't 
provide much better methods.
   
   I don't understand your point here. We already use java optionals here now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


dajac commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1073958844


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -511,4 +532,53 @@ class GroupCoordinatorAdapter(
 
 future
   }
+
+  override def partitionFor(groupId: String): Int = {
+coordinator.partitionFor(groupId)
+  }
+
+  override def onTransactionCompleted(

Review Comment:
   I found the old name a bit weird.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1073948248


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
   private def maybeUpdateCoordinatorEpoch(
 partitionId: Int,
-epochOpt: Option[Int]
+epochOpt: OptionalInt
   ): Boolean = {
 val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
   if (currentEpoch == null) {
-epochOpt.map(Int.box).orNull
+if (epochOpt.isPresent) epochOpt.getAsInt
+else null
   } else {
-epochOpt match {
-  case Some(epoch) if epoch > currentEpoch => epoch
-  case _ => currentEpoch
-}
+if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) 
epochOpt.getAsInt
+else currentEpoch
   }
 })
-epochOpt.forall(_ == updatedEpoch)
+if(epochOpt.isPresent) {

Review Comment:
   I see the options here are limited. Did you consider using java optionals 
here too? I suppose those are a bit more heavyweight and maybe still don't 
provide much better methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-18 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1073945942


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
   private def maybeUpdateCoordinatorEpoch(
 partitionId: Int,
-epochOpt: Option[Int]
+epochOpt: OptionalInt
   ): Boolean = {
 val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
   if (currentEpoch == null) {
-epochOpt.map(Int.box).orNull
+if (epochOpt.isPresent) epochOpt.getAsInt
+else null
   } else {
-epochOpt match {
-  case Some(epoch) if epoch > currentEpoch => epoch
-  case _ => currentEpoch
-}
+if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) 
epochOpt.getAsInt
+else currentEpoch
   }
 })
-epochOpt.forall(_ == updatedEpoch)
+if(epochOpt.isPresent) {

Review Comment:
   nit: spacing.
   
   I also wonder if there is a cleaner way to write the above methods with the 
OptionalInt. But perhaps not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   5   6   7   8   9   10   >