Re: [PR] KAFKA-16045 Fix flaky testMigrateTopicDeletions [kafka]

2024-04-29 Thread via GitHub


chia7712 commented on PR #15082:
URL: https://github.com/apache/kafka/pull/15082#issuecomment-2084345952

   @mumrah Do you still work at this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value [kafka]

2024-04-29 Thread via GitHub


mjsax commented on PR #15615:
URL: https://github.com/apache/kafka/pull/15615#issuecomment-2084340094

   @AyoubOm -- Thanks for the PR! I did not forget about it. Just did not find 
time yet to take a look. Still plan to get this merged for 3.8 release... Sorry 
for the long wait, and thanks for being patient!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12317: fix documentation [kafka]

2024-04-29 Thread via GitHub


mjsax commented on PR #15689:
URL: https://github.com/apache/kafka/pull/15689#issuecomment-2084338816

   Thanks for the PR @florin-akermann. I agree with @AyoubOm that both 
statements you intend to remove are still correct? Seems the stream-stream join 
section did not talk about null-keys and thus left/outer join do not need an 
update? Might be worth to update the stream-stream inner join row, to point out 
that null-key records are dropped?
   
   I also just checked FK-left join, and it still says (this statement is 
missing for inner-join though):
   > Records for which the foreignKeyExtractor produces null are ignored and do 
not trigger a join. If you want to join with null foreign keys, use a suitable 
sentinel value to do so (i.e. "NULL" for a String field, or -1 for an 
auto-incrementing integer field).
   
   It seems https://github.com/apache/kafka/pull/14107 remove it incorrectly 
for inner join instead of left join?
   
   Might be worth to go over the docs once more to double check (I made a pass, 
but might have missed something).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-29 Thread via GitHub


chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1584100563


##
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java:
##
@@ -106,6 +106,7 @@ public List getAdditionalExtensions() {
 },
 (AfterTestExecutionCallback) context -> clusterShim.stop(),
 new ClusterInstanceParameterResolver(clusterShim),
+new GenericParameterResolver<>(clusterShim, 
ZkClusterInstance.class),

Review Comment:
   This change involve the conflicts. For another, it seems to me this could 
encourage developers to use specific type of `ClusterInstance`. I agree there 
are some test cases requiring the specific cluster type. They can cast the 
`ClusterInstance` to either `zk` and `Raft` so it seems to me enabling 
injection is overkill.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-29 Thread via GitHub


chia7712 commented on code in PR #15808:
URL: https://github.com/apache/kafka/pull/15808#discussion_r1584095279


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -1086,4 +1087,10 @@ class RemoteIndexCacheTest {
   case e @ (_ : NoSuchFileException | _ : UncheckedIOException) => 
Optional.empty()
 }
   }
+
+  private def numThreadsRunning(threadNamePrefix: String, isDaemon: Boolean): 
mutable.Set[Thread] = {

Review Comment:
   We can fix the build error by rewriting this function via java lambda. For 
example:
   ```scala
 private def numThreadsRunning(threadNamePrefix: String, isDaemon: 
Boolean): java.util.Set[Thread] = {
   Thread.getAllStackTraces.keySet.stream().filter { t =>
 isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
   }.collect(Collectors.toSet)
 }
   ```



##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -23,25 +23,26 @@ import org.apache.kafka.common.{TopicIdPartition, 
TopicPartition, Uuid}
 import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
 import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, 
RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.RemoteIndexCache.{DIR_NAME, 
Entry, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteDeletedSuffixIndexFileName, 
remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, 
remoteTimeIndexFileName, remoteTransactionIndexFile, 
remoteTransactionIndexFileName}
-import org.apache.kafka.storage.internals.log.{AbortedTxn, 
CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, 
RemoteIndexCache, TimeIndex, TransactionIndex}
+import org.apache.kafka.storage.internals.log.RemoteIndexCache._
+import org.apache.kafka.storage.internals.log._
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.EnumSource
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.any
-import org.mockito.invocation.InvocationOnMock
 import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.io.{File, FileInputStream, IOException, PrintWriter, 
UncheckedIOException}
+import java.io._

Review Comment:
   We should avoid using `_` since it could cause naming conflicts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]

2024-04-29 Thread via GitHub


mjsax closed pull request #14446: KIP-759: new DSL operation on KStreams 
interface
URL: https://github.com/apache/kafka/pull/14446


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]

2024-04-29 Thread via GitHub


mjsax commented on PR #14446:
URL: https://github.com/apache/kafka/pull/14446#issuecomment-2084319053

   Closing this PR. Replace by #15740 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-29 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842229#comment-17842229
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

Cool. You can find details on the wiki: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] 
– if you don't get to it, also totally ok; Sophie mentioned that she might also 
be able to pick it up (but most likely not for 3.8 release...)

(Btw: the wiki account creation via self-service is currently broken, but we 
can create an account manually if you don't have one.)

Would you be interested to do a PR to update the JavaDocs in the meantime to 
fix them?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]

2024-04-29 Thread via GitHub


mjsax commented on PR #12988:
URL: https://github.com/apache/kafka/pull/12988#issuecomment-2084303833

   @ashmeet13 -- any updates on this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16644:

Description: 
We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the tests use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduced bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 

  was:
We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the test use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduces bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 


> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16644:

Summary: FK join emits duplicate tombstone on left-side delete  (was: FK 
join emit duplicate tombstone on left-side delete)

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the test use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduces bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16644) FK join emit duplicate tombstone on left-side delete

2024-04-29 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16644:
---

 Summary: FK join emit duplicate tombstone on left-side delete
 Key: KAFKA-16644
 URL: https://issues.apache.org/jira/browse/KAFKA-16644
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Matthias J. Sax


We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the test use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduces bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2024-04-29 Thread via GitHub


mjsax commented on code in PR #14107:
URL: https://github.com/apache/kafka/pull/14107#discussion_r1584066077


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##
@@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) {
 
 @Override
 public void process(final Record> record) {
+// clear cashed hash from previous record
+recordHash = null;
 // drop out-of-order records from versioned tables (cf. KIP-914)
 if (useVersionedSemantics && !record.value().isLatest) {
 LOG.info("Skipping out-of-order record from versioned table 
while performing table-table join.");
 droppedRecordsSensor.record();
 return;
 }
+if (leftJoin) {
+leftJoinInstructions(record);
+} else {
+defaultJoinInstructions(record);
+}
+}
 
-final long[] currentHash = record.value().newValue == null ?
-null :
-Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-final int partition = context().recordMetadata().get().partition();
+private void leftJoinInstructions(final Record> record) {
 if (record.value().oldValue != null) {
 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+}
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);

Review Comment:
   @florin-akermann @wcarlson5 -- Seems we introduces a bug here.
   
   Filed: https://issues.apache.org/jira/browse/KAFKA-16644 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16467) Add README to docs folder

2024-04-29 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16467.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Add README to docs folder
> -
>
> Key: KAFKA-16467
> URL: https://issues.apache.org/jira/browse/KAFKA-16467
> Project: Kafka
>  Issue Type: Improvement
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
> Fix For: 3.8.0
>
>
> We don't have a guide in project root folder or docs folder to show how to 
> run local website. It's good to provide a way to run document with kafka-site 
> repository.
>  
> Option 1: Add links to wiki page 
> [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes]
>  and 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. 
> Option 2: Show how to run the document within container. For example: moving 
> `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16467: Add README to docs folder [kafka]

2024-04-29 Thread via GitHub


showuon merged PR #15664:
URL: https://github.com/apache/kafka/pull/15664


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16627) Remove ClusterConfig parameter in BeforeEach and AfterEach

2024-04-29 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16627.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Remove ClusterConfig parameter in BeforeEach and AfterEach
> --
>
> Key: KAFKA-16627
> URL: https://issues.apache.org/jira/browse/KAFKA-16627
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Minor
> Fix For: 3.8.0
>
>
> In the past we modify configs like server broker properties by modifying the 
> ClusterConfig reference passed to BeforeEach and AfterEach based on the 
> requirements of the tests.
> While after KAFKA-16560, the ClusterConfig become immutable, modify the 
> ClusterConfig reference no longer reflects any changes to the test cluster. 
> Then pass ClusterConfig to BeforeEach and AfterEach become redundant. We 
> should remove this behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16627: Remove ClusterConfig parameter in BeforeEach and AfterEach [kafka]

2024-04-29 Thread via GitHub


chia7712 merged PR #15824:
URL: https://github.com/apache/kafka/pull/15824


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Replaced Utils.join() with JDK API. [kafka]

2024-04-29 Thread via GitHub


chia7712 commented on code in PR #15823:
URL: https://github.com/apache/kafka/pull/15823#discussion_r1583986944


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1051,13 +1051,13 @@ public static ValidString in(String... validStrings) {
 public void ensureValid(String name, Object o) {
 String s = (String) o;
 if (!validStrings.contains(s)) {
-throw new ConfigException(name, o, "String must be one of: " + 
Utils.join(validStrings, ", "));
+throw new ConfigException(name, o, "String must be one of: " + 
String.join(",", validStrings));

Review Comment:
   `,` -> `, `



##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1079,12 +1079,12 @@ public static CaseInsensitiveValidString in(String... 
validStrings) {
 public void ensureValid(String name, Object o) {
 String s = (String) o;
 if (s == null || 
!validStrings.contains(s.toUpperCase(Locale.ROOT))) {
-throw new ConfigException(name, o, "String must be one of 
(case insensitive): " + Utils.join(validStrings, ", "));
+throw new ConfigException(name, o, "String must be one of 
(case insensitive): " + String.join(",", validStrings));
 }
 }
 
 public String toString() {
-return "(case insensitive) [" + Utils.join(validStrings, ", ") + 
"]";
+return "(case insensitive) [" + String.join(",", validStrings) + 
"]";

Review Comment:
   ditto



##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1051,13 +1051,13 @@ public static ValidString in(String... validStrings) {
 public void ensureValid(String name, Object o) {
 String s = (String) o;
 if (!validStrings.contains(s)) {
-throw new ConfigException(name, o, "String must be one of: " + 
Utils.join(validStrings, ", "));
+throw new ConfigException(name, o, "String must be one of: " + 
String.join(",", validStrings));
 }
 
 }
 
 public String toString() {
-return "[" + Utils.join(validStrings, ", ") + "]";
+return "[" + String.join(",", validStrings) + "]";

Review Comment:
   ditto



##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1205,7 +1205,8 @@ public void ensureValid(String name, Object value) {
 }
 
 if (!foundIllegalCharacters.isEmpty()) {
-throw new ConfigException(name, value, "String may not contain 
control sequences but had the following ASCII chars: " + 
Utils.join(foundIllegalCharacters, ", "));
+throw new ConfigException(name, value, "String may not contain 
control sequences but had the following ASCII chars: " +
+
foundIllegalCharacters.stream().map(Object::toString).collect(Collectors.joining(",")));

Review Comment:
   ditto



##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1079,12 +1079,12 @@ public static CaseInsensitiveValidString in(String... 
validStrings) {
 public void ensureValid(String name, Object o) {
 String s = (String) o;
 if (s == null || 
!validStrings.contains(s.toUpperCase(Locale.ROOT))) {
-throw new ConfigException(name, o, "String must be one of 
(case insensitive): " + Utils.join(validStrings, ", "));

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12572: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-04-29 Thread via GitHub


chia7712 commented on PR #10428:
URL: https://github.com/apache/kafka/pull/10428#issuecomment-2083932644

   @dongjinleekr Do you have free time to fix the conflicts? I'd like to review 
it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16643) Add ModifierOrder checkstyle rule

2024-04-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842195#comment-17842195
 ] 

Chia-Ping Tsai commented on KAFKA-16643:


there is a jira for it https://issues.apache.org/jira/browse/KAFKA-10787

maybe we can discuss that in the jira

> Add ModifierOrder checkstyle rule
> -
>
> Key: KAFKA-16643
> URL: https://issues.apache.org/jira/browse/KAFKA-16643
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Greg Harris
>Priority: Minor
>
> Checkstyle offers the ModifierOrder rule: 
> [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that 
> Kafka violates in a lot of places. We should decide if this is a checkstyle 
> rule we should be following or not, and potentially enable it moving forward.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-04-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842192#comment-17842192
 ] 

Chia-Ping Tsai commented on KAFKA-16639:


{quote}
I took this because I wasn't sure if you wanted me to look into it.  Please go 
ahead assign it back.  We would love to review it. Thanks. Chia-Ping Tsai 
{quote}

Sorry I just make sure there is someone who can have a fix for it, and won't 
have duplicate PR. Thanks for your response (and reviews in the future). We 
will prepare the patch ASAP.

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16643) Add ModifierOrder checkstyle rule

2024-04-29 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842190#comment-17842190
 ] 

Greg Harris commented on KAFKA-16643:
-

I think that might be a different checkstyle rule: 
[https://checkstyle.sourceforge.io/checks/imports/importorder.html] this is for 
the modifiers on variables/methods etc (static final abstract etc)

> Add ModifierOrder checkstyle rule
> -
>
> Key: KAFKA-16643
> URL: https://issues.apache.org/jira/browse/KAFKA-16643
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Greg Harris
>Priority: Minor
>
> Checkstyle offers the ModifierOrder rule: 
> [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that 
> Kafka violates in a lot of places. We should decide if this is a checkstyle 
> rule we should be following or not, and potentially enable it moving forward.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-29 Thread via GitHub


chia7712 merged PR #15788:
URL: https://github.com/apache/kafka/pull/15788


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-29 Thread via GitHub


chia7712 commented on PR #15788:
URL: https://github.com/apache/kafka/pull/15788#issuecomment-2083898472

   ```
   ./gradlew cleanTest :streams:test --tests 
SlidingWindowedKStreamIntegrationTest.shouldRestoreAfterJoinRestart :tools:test 
--tests MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful 
--tests MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful --tests 
ListConsumerGroupTest.testListGroupCommandConsumerProtocol :storage:test 
--tests TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates --tests 
TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout 
:metadata:test --tests QuorumControllerTest.testBootstrapZkMigrationRecord 
--tests QuorumControllerTest.testNoOpRecordWriteAfterTimeout :trogdor:test 
--tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated 
:connect:mirror:test --tests 
IdentityReplicationIntegrationTest.testSyncTopicConfigs --tests 
MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs --tests 
MirrorConnectorsIntegrationTransactionsTest.testReplicateSourceDefault --tests 
MirrorConnectorsIntegrationExactlyOnceTest.testSyn
 cTopicConfigs :core:test --tests ConsumerBounceTest.testClose --tests 
ConsumerBounceTest.testConsumptionWithBrokerFailures --tests 
PlaintextConsumerTest.testSimpleConsumption --tests 
PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords
 --tests PlaintextAdminIntegrationTest.testDescribeConfigsForLog4jLogLevels 
--tests PlaintextConsumerTest.testPartitionsForAutoCreate --tests 
SaslMultiMechanismConsumerTest.testCoordinatorFailover --tests 
ControllerIntegrationTest.testTopicIdPersistsThroughControllerRestart --tests 
ZooKeeperClientTest.testBlockOnRequestCompletionFromStateChangeHandler --tests 
ReplicationQuotasTest.shouldBootstrapTwoBrokersWithFollowerThrottle
   ```
   I don't observe related error and they pass on my local. will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16643) Add ModifierOrder checkstyle rule

2024-04-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842189#comment-17842189
 ] 

Chia-Ping Tsai commented on KAFKA-16643:


I have saw a lot of conflicts caused by the inconsistent import order...

> Add ModifierOrder checkstyle rule
> -
>
> Key: KAFKA-16643
> URL: https://issues.apache.org/jira/browse/KAFKA-16643
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Greg Harris
>Priority: Minor
>
> Checkstyle offers the ModifierOrder rule: 
> [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that 
> Kafka violates in a lot of places. We should decide if this is a checkstyle 
> rule we should be following or not, and potentially enable it moving forward.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16643) Add ModifierOrder checkstyle rule

2024-04-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842188#comment-17842188
 ] 

Chia-Ping Tsai commented on KAFKA-16643:


+10 to this jira!

> Add ModifierOrder checkstyle rule
> -
>
> Key: KAFKA-16643
> URL: https://issues.apache.org/jira/browse/KAFKA-16643
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Greg Harris
>Priority: Minor
>
> Checkstyle offers the ModifierOrder rule: 
> [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that 
> Kafka violates in a lot of places. We should decide if this is a checkstyle 
> rule we should be following or not, and potentially enable it moving forward.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-04-29 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842187#comment-17842187
 ] 

Philip Nee edited comment on KAFKA-16639 at 4/29/24 11:55 PM:
--

hey - sorry.  I took this because I wasn't sure if you wanted me to look into 
it.  Please go ahead assign it back.  We would love to review it. Thanks. 
[~chia7712] 


was (Author: JIRAUSER283568):
hey - sorry.  I took this because I wasn't sure if you wanted me to look into 
it.  Please go ahead assign it back.  We would love to review it. THanks.

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15897) Flaky Test: testWrongIncarnationId() – kafka.server.ControllerRegistrationManagerTest

2024-04-29 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-15897.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Flaky Test: testWrongIncarnationId() – 
> kafka.server.ControllerRegistrationManagerTest
> -
>
> Key: KAFKA-15897
> URL: https://issues.apache.org/jira/browse/KAFKA-15897
> Project: Kafka
>  Issue Type: Test
>Reporter: Apoorv Mittal
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.8.0
>
>
> Build run: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/21/tests/
>  
> {code:java}
> org.opentest4j.AssertionFailedError: expected: <(false,1,0)> but was: 
> <(true,0,0)>Stacktraceorg.opentest4j.AssertionFailedError: expected: 
> <(false,1,0)> but was: <(true,0,0)>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>at 
> app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)  
> at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)  
> at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)  
> at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)   
>   at 
> app//kafka.server.ControllerRegistrationManagerTest.$anonfun$testWrongIncarnationId$3(ControllerRegistrationManagerTest.scala:228)
>at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>  at 
> app//kafka.server.ControllerRegistrationManagerTest.testWrongIncarnationId(ControllerRegistrationManagerTest.scala:226)
>   at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
> at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
>at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>  at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
>at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
> at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
>  at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>   at 
> 

[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-04-29 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842187#comment-17842187
 ] 

Philip Nee commented on KAFKA-16639:


hey - sorry.  I took this because I wasn't sure if you wanted me to look into 
it.  Please go ahead assign it back.  We would love to review it. THanks.

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15897: fix ControllerRegistrationManagerTest [kafka]

2024-04-29 Thread via GitHub


chia7712 merged PR #15828:
URL: https://github.com/apache/kafka/pull/15828


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15897: fix ControllerRegistrationManagerTest [kafka]

2024-04-29 Thread via GitHub


chia7712 commented on PR #15828:
URL: https://github.com/apache/kafka/pull/15828#issuecomment-2083890768

   loop the tests 300 times, and the flaky is gone. I will merge it to see what 
happens to our CI :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-04-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842186#comment-17842186
 ] 

Chia-Ping Tsai commented on KAFKA-16639:


[~pnee] I just notice that you have took over this jira. Do you plan to offer a 
patch for it? If you have no free cycles, we can file a PR for it and then be 
waiting for your reviews :)

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-29 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842185#comment-17842185
 ] 

Sal Sorrentino commented on KAFKA-16514:


Bug, feature gap; /təˈmeɪ.t̬oʊ/ , /təˈmɑː.təʊ/. I only know I followed the 
programmatic interface provided but could not achieve the advertised results 
without perusing source code to find the undocumented secret sauce ;)


Your point IS taken however. I am happy to create a KIP if that is the best 
path forward, but am unfamiliar with the process and will need to do some 
reading. I don't think there is an urgent need to get this in 3.8, but I think 
documenting the issue and the workaround is appropriate in the short term.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16637) KIP-848 does not work well

2024-04-29 Thread sanghyeok An (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842184#comment-17842184
 ] 

sanghyeok An commented on KAFKA-16637:
--

[~kirktrue] thanks for your comments. 

I didn't catch issued you mentioned. i will check it out! 

However,  to provide a non-zero duration, still does not work well;

!image-2024-04-30-08-33-06-367.png|width=839,height=289!

I changed the my code. (Duration.ofSeconds(1)). 

However, same logs are printed. 

!image-2024-04-30-08-33-50-435.png|width=1116,height=139!

 

Is there any workaround? 

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16637) KIP-848 does not work well

2024-04-29 Thread sanghyeok An (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanghyeok An updated KAFKA-16637:
-
Attachment: image-2024-04-30-08-33-06-367.png

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16637) KIP-848 does not work well

2024-04-29 Thread sanghyeok An (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanghyeok An updated KAFKA-16637:
-
Attachment: image-2024-04-30-08-33-50-435.png

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] a slight change. [kafka]

2024-04-29 Thread via GitHub


gharris1727 commented on PR #15812:
URL: https://github.com/apache/kafka/pull/15812#issuecomment-2083856829

   Hi @gongxuanzhang and thank you for the contribution!
   
   This is actually an issue in a lot of places, I checked with this 
checkstyle.xml rule:
   ```
   
   ```
   and `./gradlew checkstyleMain checkstyleTest --continue`.
   
   While fixing this in one place is good, it would make sense to try and fix 
this everywhere if we decide to address it at all. I've created 
https://issues.apache.org/jira/browse/KAFKA-16643 for this, and you can take 
this on if you're interested. If so, please see the contributing guide: 
https://kafka.apache.org/contributing.html and join the mailing list and JIRA.
   
   | module | violations |
   |---|---|
   | `:streams:upgrade-system-tests-24:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-22:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-23:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-26:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-28:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-30:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-25:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-31:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-27:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-32:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-36:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-35:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-34:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-33:checkstyleTest` | 1 |
   | `:streams:upgrade-system-tests-37:checkstyleTest` | 1 |
   | `:generator:checkstyleMain` | 2 |
   | `:connect:mirror-client:checkstyleMain` | 1 |
   | `:connect:api:checkstyleMain` | 1 |
   | `:connect:json:checkstyleMain` | 1 |
   | `:server-common:checkstyleMain` | 10 |
   | `:raft:checkstyleMain` | 11 |
   | `:storage:checkstyleMain` | 2 |
   | `:trogdor:checkstyleMain` | 17 |
   | `:server:checkstyleMain` | 122 |
   | `:connect:mirror:checkstyleMain` | 3 |
   | `:connect:test-plugins:checkstyleMain` | 2 |
   | `:tools:checkstyleMain` | 9 |
   | `:storage:storage-api:checkstyleMain` | 20 |
   | `:streams:examples:checkstyleMain` | 5 |
   | `:streams:test-utils:checkstyleMain` | 2 |
   | `:group-coordinator:checkstyleMain` | 46 |
   | `:metadata:checkstyleMain` | 72 |
   | `:raft:checkstyleTest` | 5 |
   | `:connect:runtime:checkstyleMain` | 2 |
   | `:group-coordinator:checkstyleTest` | 10 |
   | `:server-common:checkstyleTest` | 4 |
   | `:trogdor:checkstyleTest` | 3 |
   | `:metadata:checkstyleTest` | 73 |
   | `:streams:test-utils:checkstyleTest` | 12 |
   | `:streams:checkstyleMain` | 57 |
   | `:clients:checkstyleTest` | 87 |
   | `:clients:checkstyleMain` | 122 |
   | `:core:checkstyleMain` | 1 |
   | `:shell:checkstyleMain` | 10 |
   | `:core:checkstyleTest` | 12 |
   | `:shell:checkstyleTest` | 2 |
   | `:jmh-benchmarks:checkstyleMain` | 1 |
   | `:connect:mirror:checkstyleTest` | 4 |
   | `:connect:runtime:checkstyleTest` | 15 |
   | `:streams:checkstyleTest` | 201 |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16643) Add ModifierOrder checkstyle rule

2024-04-29 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16643:
---

 Summary: Add ModifierOrder checkstyle rule
 Key: KAFKA-16643
 URL: https://issues.apache.org/jira/browse/KAFKA-16643
 Project: Kafka
  Issue Type: Task
  Components: build
Reporter: Greg Harris


Checkstyle offers the ModifierOrder rule: 
[https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that 
Kafka violates in a lot of places. We should decide if this is a checkstyle 
rule we should be following or not, and potentially enable it moving forward.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-29 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842183#comment-17842183
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

I would not call it a bug, but more like a feature gap. Historically, 
consumer.close() sends a leave group request. For KS, we only added an internal 
config to the consumer to disable sending the leave group request.

Independently, static group membership was added, and a new admit API was added 
to remove a static member from a group. The purpose for this admin feature was 
(IIRC), to allow triggering a rebalance for static groups (which usually run 
with high session timeouts) for the case of a crashed member that won't come 
back again.

Thus, it's two independent features that just don't play nice with each other. 
– In additional, we combine both features with KafkaStreams#close(CloseOptions) 
but given how the APIs are build, it only works for static members.

Thus, there is not really a reason, but it just happens that it all was 
implemented this way given historic context etc.

I am in favor of doing a KIP to add something similar like "CloseOption" to 
Consumer#close() (independent of static membership of not). [~sal.sorrentino] 
Would you be interested to pick it up and write a KIP? We might still be able 
to get it into 3.8 release if we hurry up.

[~lianetm] – what is the purpose of -2 code? In the end, not sending any 
request, with a large enough session timeout, no rebalance would be triggered 
anyway? What does change is we send -2 instead of just not sending any leaver 
group request on close()?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16637) KIP-848 does not work well

2024-04-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16637:
--
Fix Version/s: 3.8.0

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16637) KIP-848 does not work well

2024-04-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16637:
--
Component/s: clients
 consumer

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16637) KIP-848 does not work well

2024-04-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16637:
--
Labels: kip-848-client-support  (was: )

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16637) KIP-848 does not work well

2024-04-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16637:
-

Assignee: Kirk True

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-04-29 Thread via GitHub


pasharik opened a new pull request, #15830:
URL: https://github.com/apache/kafka/pull/15830

   - Updating `AclCommandTest` to support KRaft
   - Tests which are using `AclAuthoriser` are not updated, as they are 
expected to be removed after full migration to KRaft
   - Changed `AclCommand` console output for adding and removing ACLs.
   In the original implementation, `listAcls()` method was called directly from 
`addAcls()` and `removeAcls()` methods, which caused a race condition in KRaft 
mode, so the test become flaky
 - This is quite a big change for our UI, as I understand. Should it go 
throuhg KIP process?
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16637) KIP-848 does not work well

2024-04-29 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842160#comment-17842160
 ] 

Lianet Magrans commented on KAFKA-16637:


Hey [~chickenchickenlove], thanks for reporting this. I wonder if you're 
hitting a known issue with the consumer api and timeous (being fixed with 
https://issues.apache.org/jira/browse/KAFKA-16200 and 
https://issues.apache.org/jira/browse/KAFKA-15974). I tried your code, changing 
only the call poll from what you had `kafkaConsumer.poll(Duration.ZERO)` , to 
provide a non-zero duration, and it all worked as expected. So I guess it could 
be related to the timeout enforcement issues being fixed on the consumer side. 

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Minor
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on PR #15798:
URL: https://github.com/apache/kafka/pull/15798#issuecomment-2083638319

   TODO: This patch is waiting on https://github.com/apache/kafka/pull/15818 to 
make sure the response future is not completed until the records are 
successfully replayed and persisted


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1583740908


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1169,6 +1173,64 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
 }
 }
 
+/**
+ * Validates if the received classic member protocols are supported by the 
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId  The joining member id.
+ * @param protocolType  The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+private void throwIfClassicProtocolIsNotSupported(
+ConsumerGroup group,
+String memberId,
+String protocolType,
+JoinGroupRequestProtocolCollection protocols
+) {
+if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+}
+}
+
+/**
+ * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+ * All the protocols have the same subscription, so the method picks a 
random one.
+ *
+ * @param protocols The JoinGroupRequestProtocolCollection.
+ * @return The Subscription.
+ */
+private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+JoinGroupRequestProtocolCollection protocols
+) {
+try {
+return ConsumerProtocol.deserializeSubscription(

Review Comment:
   Yeah it makes sense. I can open another pr once this one is merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-04-29 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1583698116


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -240,27 +185,33 @@ private void emitNonJoinedOuterRecords(
 // There might be an outer record for the other joinSide 
which window has not closed yet
 // We rely on the  
ordering of KeyValueIterator
 final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
-if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs
+>= sharedTimeTracker.streamTime) {
 if (timestampedKeyAndJoinSide.isLeftSide()) {
-outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
+outerJoinLeftWindowOpen =
+true; // there are no more candidates to 
emit on left-outerJoin-side
 } else {
-outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
+outerJoinRightWindowOpen =
+true; // there are no more candidates to 
emit on right-outerJoin-side
 }
 // We continue with the next outer record
 continue;
 }
-
+
 final K key = timestampedKeyAndJoinSide.getKey();
-final LeftOrRightValue leftOrRightValue = 
next.value;
-final VOut nullJoinedValue = getNullJoinedValue(key, 
leftOrRightValue);
+final LeftOrRightValue leftOrRightValue = 
next.value;
+final VThis thisValue = getThisValue(leftOrRightValue);
+final VOther otherValue = getOtherValue(leftOrRightValue);
+final VOut nullJoinedValue = joiner.apply(key, thisValue, 
otherValue);
 context().forward(
-
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
+
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
 );

Review Comment:
   nit: This could be brought out to a method which depends only on `record` 
and `next`.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##


Review Comment:
   > No more un safe type casting to be found in this class. Not that much has 
changed in this abstract class. I have moved some code in methods.
   
   Very nice! I really like the new types, and the abstract methods are very 
minimal.
   
   > Side Note: I don't have the correct formatter configured :/ I couldn't 
find any contribution notes to set the correct one.
   
   I'm not aware of a standard formatter either. I remember tweaking the 
IntelliJ default rules slightly, but taking a look at them now I'm not seeing 
evidence of what I changed.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##


Review Comment:
   The left and right classes are perfect in my opinion.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,161 +42,106 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {
 private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
-private final String otherWindowName;
+private final boolean outer;
+private final ValueJoinerWithKey joiner;
 private final long joinBeforeMs;
 private final long joinAfterMs;
 private final long joinGraceMs;
+private final String otherWindowName;
+private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 private final boolean enableSpuriousResultFix;
+private final Optional outerJoinWindowName;
 private final long windowsBeforeMs;
 private final long windowsAfterMs;
 
-private final boolean outer;
-private final boolean isLeftSide;
-private final Optional outerJoinWindowName;
-private final ValueJoinerWithKey joiner;
-
-private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
-KStreamKStreamJoin(final boolean isLeftSide,
-   final String 

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1583691897


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10855,544 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception {
+int minSessionTimeout = 50;
+int maxSessionTimeout = 100;
+String groupId = "group-id";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withClassicGroupMinSessionTimeoutMs(minSessionTimeout)
+.withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout)
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
+.build();
+
+JoinGroupRequestData requestWithSmallSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(minSessionTimeout - 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithSmallSessionTimeout));
+
+JoinGroupRequestData requestWithLargeSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(maxSessionTimeout + 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithLargeSessionTimeout));
+}
+
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() 
{
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testConsumerGroupJoinWithNewDynamicMember() throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+  

[PR] WIP [kafka]

2024-04-29 Thread via GitHub


cadonna opened a new pull request, #15829:
URL: https://github.com/apache/kafka/pull/15829

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1583691897


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10855,544 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception {
+int minSessionTimeout = 50;
+int maxSessionTimeout = 100;
+String groupId = "group-id";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withClassicGroupMinSessionTimeoutMs(minSessionTimeout)
+.withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout)
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
+.build();
+
+JoinGroupRequestData requestWithSmallSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(minSessionTimeout - 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithSmallSessionTimeout));
+
+JoinGroupRequestData requestWithLargeSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(maxSessionTimeout + 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithLargeSessionTimeout));
+}
+
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() 
{
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testConsumerGroupJoinWithNewDynamicMember() throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+  

[jira] [Created] (KAFKA-16642) Update KafkaConsumerTest to show parameters in test lists

2024-04-29 Thread Kirk True (Jira)
Kirk True created KAFKA-16642:
-

 Summary: Update KafkaConsumerTest to show parameters in test lists
 Key: KAFKA-16642
 URL: https://issues.apache.org/jira/browse/KAFKA-16642
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, unit tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


{{KafkaConsumerTest}} was recently updated to make many of its tests 
parameterized to exercise both the {{CLASSIC}} and {{CONSUMER}} group 
protocols. However, in some of the tools in which [lists of tests are 
provided|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.clients.consumer.KafkaConsumerTest=FLAKY],
 say, for analysis, the group protocol information is not exposed. For example, 
one test ({{{}testReturnRecordsDuringRebalance{}}}) is flaky, but it's 
difficult to know at a glance which group protocol is causing the problem 
because the list simply shows:
{quote}{{testReturnRecordsDuringRebalance(GroupProtocol)[1]}}
{quote}
Ideally, it would expose more information, such as:
{quote}{{testReturnRecordsDuringRebalance(GroupProtocol=CONSUMER)}}
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16642) Update KafkaConsumerTest to show parameters in test lists

2024-04-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16642:
--
Priority: Minor  (was: Major)

> Update KafkaConsumerTest to show parameters in test lists
> -
>
> Key: KAFKA-16642
> URL: https://issues.apache.org/jira/browse/KAFKA-16642
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> {{KafkaConsumerTest}} was recently updated to make many of its tests 
> parameterized to exercise both the {{CLASSIC}} and {{CONSUMER}} group 
> protocols. However, in some of the tools in which [lists of tests are 
> provided|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.clients.consumer.KafkaConsumerTest=FLAKY],
>  say, for analysis, the group protocol information is not exposed. For 
> example, one test ({{{}testReturnRecordsDuringRebalance{}}}) is flaky, but 
> it's difficult to know at a glance which group protocol is causing the 
> problem because the list simply shows:
> {quote}{{testReturnRecordsDuringRebalance(GroupProtocol)[1]}}
> {quote}
> Ideally, it would expose more information, such as:
> {quote}{{testReturnRecordsDuringRebalance(GroupProtocol=CONSUMER)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-04-29 Thread via GitHub


lianetm commented on PR #15803:
URL: https://github.com/apache/kafka/pull/15803#issuecomment-2083525470

   Thanks for the catch and the fix @AndrewJSchofield, left a nit, but LGTM. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]

2024-04-29 Thread via GitHub


lianetm commented on code in PR #15803:
URL: https://github.com/apache/kafka/pull/15803#discussion_r1583641482


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws 
Exception {
 }
 }
 
+/**
+ * Tests that calling {@link Thread#interrupt()} before {@link 
KafkaConsumer#poll(Duration)}
+ * causes {@link InterruptException} to be thrown.
+ */
+@Test
+public void testPollThrowsInterruptExceptionIfInterrupted() {
+consumer = newConsumer();
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+consumer.assign(singleton(tp));
+
+// interrupt the thread and call poll
+try {
+Thread.currentThread().interrupt();
+assertThrows(InterruptException.class, () -> 
consumer.poll(Duration.ZERO));
+} finally {
+// clear interrupted state again since this thread may be reused 
by JUnit

Review Comment:
   Just to make the comment accurate, I expect that we need to flip the 
interrupted flag here so that the assertion down below polling again does not 
throw.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1583634669


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1288,25 +1353,15 @@ private 
CoordinatorResult consumerGr
 
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
 .setClientId(clientId)
 .setClientHost(clientHost)
+.setClassicMemberMetadata(null)

Review Comment:
   Our default value is already null. This is necessary as 
`updatedMemberBuilder = new ConsumerGroupMember.Builder(member)` can have 
non-null classicMemberMetadata and we need it to update the members that have 
changed from the classic protocol to the consumer protocol.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15342) Considering upgrading to Mockito 5.4.1 or later

2024-04-29 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842126#comment-17842126
 ] 

Greg Harris commented on KAFKA-15342:
-

[~chia7712] I think there may be more value to the integration test suite in 
the future: KAFKA-16242, as the unitTest target can be used for strict CI 
gating to prevent obviously broken builds.

> Considering upgrading to Mockito 5.4.1 or later
> ---
>
> Key: KAFKA-15342
> URL: https://issues.apache.org/jira/browse/KAFKA-15342
> Project: Kafka
>  Issue Type: Task
>  Components: unit tests
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 4.0.0
>
>
> We're currently stuck on Mockito 4.x.y because the 5.x.y line requires Java 
> 11 and, until we begin to work on Kafka 4.0.0, we continue to support Java 8.
> Either directly before, or after releasing Kafka 4.0.0, we should try to 
> upgrade to a version of Mockito on the 5.x.y line.
> If we're able to use a version that includes 
> [https://github.com/mockito/mockito/pull/3078|https://github.com/mockito/mockito/pull/3078,]
>  (which should be included in either a 5.4.1 or 5.5.0 release), we should 
> also revert the change made for 
> https://issues.apache.org/jira/browse/KAFKA-14682, which is just a temporary 
> workaround. Care should be taken that, after reverting that change, unused 
> stubbings are still correctly reported during our CI builds.
> If the effort required to upgrade our Mockito version is too high, we can 
> either downgrade the severity of this ticket, or split it out into separate 
> subtasks for each to-be-upgraded module.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on code in PR #15818:
URL: https://github.com/apache/kafka/pull/15818#discussion_r1583572562


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1560,13 +1561,15 @@ private List 
consumerGroupStaticMemberGroupLeave(
  * @param records   The list of records to be applied to the state.
  * @return The append future to be applied.
  */
-private CompletableFuture consumerGroupFenceMember(
+private  CoordinatorResult consumerGroupFenceMember(
 ConsumerGroup group,
 ConsumerGroupMember member,
-List records
+List records,
+T response
 ) {
 if (validateOnlineDowngrade(group, member.memberId())) {
-return convertToClassicGroup(group, member.memberId(), records);
+CompletableFuture appendFuture = 
convertToClassicGroup(group, member.memberId(), records);

Review Comment:
   moved `CoordinatorResult` to `convertToClassicGroup`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]

2024-04-29 Thread via GitHub


dongnuo123 commented on code in PR #15818:
URL: https://github.com/apache/kafka/pull/15818#discussion_r1583571521


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java:
##
@@ -44,8 +44,8 @@ public class CoordinatorResult {
 private final CompletableFuture appendFuture;
 
 /**
- * The boolean indicating whether to replay the records. Without 
specifying.
- * The default value is {@code appendFuture == null}.
+ * The boolean indicating whether to replay the records.
+ * The default value is {@code true} unless specified otherwise.
  */
 private final Boolean replayRecords;

Review Comment:
   Then let's go with `replayRecords ? 1 : 0`. I guess it doesn't matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]

2024-04-29 Thread via GitHub


dajac commented on code in PR #15818:
URL: https://github.com/apache/kafka/pull/15818#discussion_r1583565827


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java:
##
@@ -44,8 +44,8 @@ public class CoordinatorResult {
 private final CompletableFuture appendFuture;
 
 /**
- * The boolean indicating whether to replay the records. Without 
specifying.
- * The default value is {@code appendFuture == null}.
+ * The boolean indicating whether to replay the records.
+ * The default value is {@code true} unless specified otherwise.
  */
 private final Boolean replayRecords;

Review Comment:
   I am not sure. The one that I gave was auto-generated by intellij.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-29 Thread via GitHub


kirktrue commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2083394774

   @cadonna—In most places I removed use of a `Timer` to calculate the 
deadline. Event classes no longer require a `Timer`, it is the caller who must 
call `CompletableEvent.calculateDeadlineMs()` when creating the event.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-29 Thread via GitHub


gharris1727 commented on PR #15620:
URL: https://github.com/apache/kafka/pull/15620#issuecomment-2083373419

   Test failures appear unrelated, there's a targeted 
RemoteLogMetadataSerdeTest for this logic, and the storage tests appear to pass 
for me locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-29 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1583520696


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -998,7 +958,7 @@ public List partitionsFor(String topic, 
Duration timeout) {
 wakeupTrigger.setActiveTask(topicMetadataEvent.future());
 try {
 Map> topicMetadata =
-applicationEventHandler.addAndGet(topicMetadataEvent, 
timer);
+applicationEventHandler.addAndGet(topicMetadataEvent);

Review Comment:
   Removed use of `Timer` in favor of calculating the deadline from the `time` 
and `timeout` directly.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1107,7 +1067,7 @@ public Map 
offsetsForTimes(Map

Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-29 Thread via GitHub


kirktrue commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2083370524

   > Here I have a comment, I could not put at the right location in the code:
   > 
   > On line 1362, in `commitSync()` the consumer waits on the `commitFuture` 
with a timer. I think, it should not wait on a timer there since we already 
wait on a timer in the background thread.
   
   I agree. What about the timed wait in 
`awaitPendingAsyncCommitsAndExecuteCommitCallbacks()`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation

2024-04-29 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842121#comment-17842121
 ] 

Greg Harris commented on KAFKA-15905:
-

Rather than re-reading the checkpoints topic, we could add source offsets to 
the emitted records, effectively writing each checkpoint twice. The framework 
could manage the offset consumer on our behalf, simplifying the mirror 
checkpoint task and preventing the need for credentials to consume from the 
checkpoints topic.

> Restarts of MirrorCheckpointTask should not permanently interrupt offset 
> translation
> 
>
> Key: KAFKA-15905
> URL: https://issues.apache.org/jira/browse/KAFKA-15905
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Major
>
> Executive summary: When the MirrorCheckpointTask restarts, it loses the state 
> of checkpointsPerConsumerGroup, which limits offset translation to records 
> mirrored after the latest restart.
> For example, if 1000 records are mirrored and the OffsetSyncs are read by 
> MirrorCheckpointTask, the emitted checkpoints are cached, and translation can 
> happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more 
> records are mirrored, translation can happen at the ~1500th record, but no 
> longer at the ~500th record.
> Context:
> Before KAFKA-13659, MM2 made translation decisions based on the 
> incompletely-initialized OffsetSyncStore, and the checkpoint could appear to 
> go backwards temporarily during restarts. To fix this, we forced the 
> OffsetSyncStore to initialize completely before translation could take place, 
> ensuring that the latest OffsetSync had been read, and thus providing the 
> most accurate translation.
> Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. 
> Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to 
> allow for translation of earlier offsets. This came with the caveat that the 
> cache's sparseness allowed translations to go backwards permanently. To 
> prevent this behavior, a cache of the latest Checkpoints was kept in the 
> MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset 
> translation remained restricted to the fully-initialized OffsetSyncStore.
> Effectively, the MirrorCheckpointTask ensures that it translates based on an 
> OffsetSync emitted during it's lifetime, to ensure that no previous 
> MirrorCheckpointTask emitted a later sync. If we can read the checkpoints 
> emitted by previous generations of MirrorCheckpointTask, we can still ensure 
> that checkpoints are monotonic, while allowing translation further back in 
> history.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-29 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842119#comment-17842119
 ] 

Greg Harris commented on KAFKA-16622:
-

To clarify your specific points:

> but the problem here is that if the consumer never fully catches up once, we 
> will never have a checkpoint.

If the consumer never catches up later than the most recent MM2 checkpoint task 
restart, it will not have a checkpoint. In the above example, it needed to get 
past .

> If as initial state the 
>{color:#00}OffsetSyncStore.{color}{color:#871094}offsetSyncs 
>{color}contained a distribution of {color:#00}OffsetSync rather than just 
>multiple copies of the last {color}{color:#00}OffsetSync , Checkpoints 
>would be computed earlier I think{color}

{color:#00}This is the effect a solution to KAFKA-15905 would have. If we 
can restore the state of the `checkpointsPerConsumerGroup` variable, then it 
will be safe to keep those offset syncs in the sync store.{color}

> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> ---
>
> Key: KAFKA-16622
> URL: https://issues.apache.org/jira/browse/KAFKA-16622
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0, 3.6.2, 3.8.0
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: connect.log.2024-04-26-10.zip, 
> edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 1 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the target cluster
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
>   --topic source.checkpoints.internal \
>   --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
>--from-beginning
> -> no record appears in the checkpoint topic until the consumer reaches the 
> end of the topic (ie its consumer group lag gets down to 0).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16577: New consumer fails with stop within allotted timeout in consumer_test.py system test [kafka]

2024-04-29 Thread via GitHub


kirktrue commented on PR #15784:
URL: https://github.com/apache/kafka/pull/15784#issuecomment-2083344885

   @cadonna—can you review this PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-29 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842117#comment-17842117
 ] 

Greg Harris commented on KAFKA-16622:
-

Yeah [~ecomar] from the final state of the OffsetSyncStore, this appears to be 
working as intended:

 
{noformat}
[2024-04-26 10:58:44,557] TRACE [MirrorCheckpointConnector|task-0] New sync 
OffsetSync{topicPartition=mytopic-0, upstreamOffset=19998, 
downstreamOffset=19998} applied, new state is 
[19998:19998,19987:19987,19965:19965,19921:19921,19822:19822,19635:19635,19415:19415,18964:18964,18095:18095,16500:16500,:]
 (org.apache.kafka.connect.mirror.OffsetSyncStore:176){noformat}
The gaps are 11, 22, 44, 99, 187, 220, 451, 869, 1595 which follow the 
approximate exponential that I would expect. Instead of the ~5 syncs I expected 
there's 9, which is better than I estimated because you have the offset.lag.max 
low.

I would say the title of this issue isn't quite accurate now that we've 
investigated it, as the translation can happen at these intermediate points in 
addition to the end of the topic. If you had a consumer group with offset 19635 
or 19636, that would be translated exactly, but a consumer group with offset 
19700 would translate to 19636 and have some lag/reprocessing. This is 
intentional, as we made a trade-off about memory usage and precision to 
prioritize accuracy in the offset translation algorithm. You can see the 
discussion about this here: 
[https://lists.apache.org/thread/7qzxm1727y8rtrw6ds7t6hltkm55j5po] and here: 
[https://github.com/apache/kafka/pull/13178] to see more motivation for the 
current algorithm.

I understand your concern though, and you're correct that KAFKA-15905 will help 
for the offsets between 0 and , and KAFKA-16364 will help with offsets 
close to the end of the topic.

I also just opened this ticket: 
https://issues.apache.org/jira/browse/KAFKA-16641 for another improvement I 
thought of. It has a risk of mis-translating offsets for topics with gaps, but 
should be better than the old pre KAFKA-12468 algorithm, so we can discuss if 
it requires a configuration, and maybe it can be included in a KIP with 
KAFKA-16364.

 

> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> ---
>
> Key: KAFKA-16622
> URL: https://issues.apache.org/jira/browse/KAFKA-16622
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0, 3.6.2, 3.8.0
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: connect.log.2024-04-26-10.zip, 
> edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 1 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the target cluster
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
>   --topic source.checkpoints.internal \
>   --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
>--from-beginning
> -> no record appears in the checkpoint topic until the consumer reaches the 
> end of the topic (ie its consumer group lag gets down to 0).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-29 Thread via GitHub


hachikuji commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1583479619


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##
@@ -807,4 +809,62 @@ private static void writeSnapshotFooterRecord(
 builder.appendSnapshotFooterMessage(timestamp, 
snapshotFooterRecord);
 }
 }
+
+public static MemoryRecords withKRaftVersionRecord(
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+ByteBuffer buffer,
+KRaftVersionRecord kraftVersionRecord
+) {
+writeKRaftVersionRecord(buffer, initialOffset, timestamp, leaderEpoch, 
kraftVersionRecord);
+buffer.flip();
+return MemoryRecords.readableRecords(buffer);
+}
+
+private static void writeKRaftVersionRecord(
+ByteBuffer buffer,
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+KRaftVersionRecord kraftVersionRecord
+) {
+try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+TimestampType.CREATE_TIME, initialOffset, timestamp,
+RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
+false, true, leaderEpoch, buffer.capacity())
+) {
+builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord);
+}
+}
+
+public static MemoryRecords withVotersRecord(
+long initialOffset,
+long timestamp,
+int leaderEpoch,
+ByteBuffer buffer,
+VotersRecord votersRecord
+) {
+writeVotersRecord(buffer, initialOffset, timestamp, leaderEpoch, 
votersRecord);

Review Comment:
   We are discarding the `MemoryRecords` created by the builder and creating a 
new one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-29 Thread via GitHub


chia7712 merged PR #15645:
URL: https://github.com/apache/kafka/pull/15645


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16641) MM2 offset translation should interpolate between sparse OffsetSyncs

2024-04-29 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16641:
---

 Summary: MM2 offset translation should interpolate between sparse 
OffsetSyncs
 Key: KAFKA-16641
 URL: https://issues.apache.org/jira/browse/KAFKA-16641
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Greg Harris


Right now, the OffsetSyncStore keeps a sparse offset store, with exponential 
spacing between syncs. This can leave large gaps in translation, where offsets 
are translated much more conservatively than necessary.

The dominant way to use MirrorMaker2 is in a "single writer" fashion, where the 
target topic is only written to by a single mirror maker 2. When a topic 
without gaps is replicated, contiguous blocks of offsets are preserved. For 
example:

Say that MM2 mirrors 100 records, and emits two syncs: 0:100 and 100:200. We 
can detect when the gap between the upstream and downstream offsets is the same 
using subtraction, and then assume that 50:150 is also a valid translation. If 
the source topic has gaps, or goes through a restart, we should expect a 
discontinuity in the offset syncs, like 0:100 and 100:250 or 0:100 and 100:150.

This may allow us to restore much of the offset translation precision that was 
lost for simple contiguous topics, without additional memory usage, but at the 
risk of mis-translating some pathological situations when the source topic has 
gaps. This might be able to be enabled unconditionally, or enabled via a 
configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-29 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1583466928


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.GroupType.CONSUMER;
+
+class ConsumerGroupExecutor {
+
+private ConsumerGroupExecutor() {
+}
+
+static AutoCloseable buildConsumerGroup(String brokerAddress,
+int numberOfConsumers,
+String groupId,
+String topic,
+String groupProtocol,
+Optional remoteAssignor,
+Map customConfigs,
+boolean syncCommit) {
+return buildConsumers(
+brokerAddress,
+numberOfConsumers,
+groupId,
+groupProtocol,
+topic,
+RangeAssignor.class.getName(),
+remoteAssignor,
+customConfigs,
+syncCommit
+);
+}
+
+static AutoCloseable buildClassicGroup(String brokerAddress,
+   int numberOfConsumers,
+   String groupId,
+   String topic,
+   String assignmentStrategy,
+   Map customConfigs,
+   boolean syncCommit) {
+return buildConsumers(
+brokerAddress,
+numberOfConsumers,
+groupId,
+GroupProtocol.CLASSIC.name,
+topic,
+assignmentStrategy,
+Optional.empty(),
+customConfigs,
+syncCommit
+);
+}
+
+private static AutoCloseable buildConsumers(
+String brokerAddress,
+int numberOfConsumers,
+String groupId,
+String groupProtocol,
+String topic,
+String assignmentStrategy,
+Optional remoteAssignor,
+Map customConfigs,
+boolean syncCommit
+) {
+List> allConfigs = IntStream.range(0, 
numberOfConsumers)
+.mapToObj(ignored ->
+composeConfigs(
+  

[jira] [Updated] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test

2024-04-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16576:
--
Priority: Minor  (was: Blocker)

> New consumer fails with assert in consumer_test.py’s test_consumer_failure 
> system test
> --
>
> Key: KAFKA-16576
> URL: https://issues.apache.org/jira/browse/KAFKA-16576
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: flaky-test, kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test intermittently fails with the following 
> error:
> {code}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   42.582 seconds
> AssertionError()
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 399, in test_consumer_failure
> assert partition_owner is not None
> AssertionError
> Notify
> {code}
> Affected tests:
>  * {{test_consumer_failure}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-29 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1583452334


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1169,8 +1129,7 @@ private Map 
beginningOrEndOffset(Collection 
offsetAndTimestampMap;
 offsetAndTimestampMap = applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
+listOffsetsEvent);

Review Comment:
   Yep, I made the change to use the deadline directly (vs. a `Timer`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-29 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1583451774


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java:
##
@@ -27,19 +30,34 @@
 public abstract class CompletableBackgroundEvent extends BackgroundEvent 
implements CompletableEvent {
 
 private final CompletableFuture future;
+private final long deadlineMs;
 
-protected CompletableBackgroundEvent(final Type type) {
+protected CompletableBackgroundEvent(final Type type, final Timer timer) {
 super(type);
 this.future = new CompletableFuture<>();
+Objects.requireNonNull(timer);
+
+long currentTimeMs = timer.currentTimeMs();
+long remainingMs = timer.remainingMs();
+
+if (currentTimeMs > Long.MAX_VALUE - remainingMs)
+this.deadlineMs = Long.MAX_VALUE;
+else
+this.deadlineMs = currentTimeMs + remainingMs;

Review Comment:
   Per the above, I added `CompletableEvent.calculateDeadlineMs()` to keep the 
code in a shared location.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-29 Thread via GitHub


chia7712 commented on code in PR #15745:
URL: https://github.com/apache/kafka/pull/15745#discussion_r1583452740


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -162,32 +162,39 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 controllers = annot.controllers();
 }
 
-if (brokers <= 0 || controllers <= 0) {
-throw new IllegalArgumentException("Number of brokers/controllers 
must be greater than zero.");
+final int disksPerBroker;
+if (annot.disksPerBroker() == 0) {
+disksPerBroker = defaults.disksPerBroker();

Review Comment:
   We can leverage ternary conditional operator to make it more simple. For 
example:
   ```java
   Type type = annot.clusterType() == Type.DEFAULT ? 
defaults.clusterType() : annot.clusterType();
   Map serverProperties = new HashMap<>();
   for (ClusterConfigProperty property : defaults.serverProperties()) {
   serverProperties.put(property.key(), property.value());
   }
   for (ClusterConfigProperty property : annot.serverProperties()) {
   serverProperties.put(property.key(), property.value());
   }
   
   ClusterConfig config = ClusterConfig.builder()
   .setType(type)
   .setBrokers(annot.brokers() == 0 ? defaults.brokers() : 
annot.brokers())
   .setControllers(annot.controllers() == 0 ? 
defaults.controllers() : annot.controllers())
   .setDisksPerBroker(annot.disksPerBroker() == 0 ? 
defaults.disksPerBroker() : annot.disksPerBroker())
   .setAutoStart(annot.autoStart() == AutoStart.DEFAULT ? 
defaults.autoStart() : annot.autoStart() == AutoStart.YES)
   .setName(annot.name().isEmpty() ? null : annot.name())
   .setName(annot.listener().isEmpty() ? null : annot.listener())
   .setServerProperties(serverProperties)
   .setSecurityProtocol(annot.securityProtocol())
   .setMetadataVersion(annot.metadataVersion())
   .build();
   
   type.invocationContexts(context.getRequiredTestMethod().getName(), 
config, testInvocations);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-29 Thread via GitHub


kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1583452722


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -946,8 +907,7 @@ public Map 
committed(final Set> 
listTopics(Duration timeout) {
 final AllTopicsMetadataEvent topicMetadataEvent = new 
AllTopicsMetadataEvent(timer);

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test

2024-04-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16576:
-

Assignee: (was: Kirk True)

> New consumer fails with assert in consumer_test.py’s test_consumer_failure 
> system test
> --
>
> Key: KAFKA-16576
> URL: https://issues.apache.org/jira/browse/KAFKA-16576
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Minor
>  Labels: flaky-test, kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test intermittently fails with the following 
> error:
> {code}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   42.582 seconds
> AssertionError()
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 399, in test_consumer_failure
> assert partition_owner is not None
> AssertionError
> Notify
> {code}
> Affected tests:
>  * {{test_consumer_failure}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583450532


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +982,61 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map updateSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicCount = new 
HashMap<>(this.subscribedTopicNames);
+if (oldMember != null) {
+oldMember.subscribedTopicNames().forEach(topicName ->
+subscribedTopicCount.compute(topicName, 
ConsumerGroup::decValue)
+);
+}
+
+if (newMember != null) {
+newMember.subscribedTopicNames().forEach(topicName ->
+subscribedTopicCount.compute(topicName, 
ConsumerGroup::incValue)
+);
+}
+
+return subscribedTopicCount;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * If all members are subscribed to the same set of topics, the type is 
homogeneous.
+ * Otherwise, it is heterogeneous.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ * @param numOfMembers  The total number of members in the 
group.
+ * @param subscriptionType  The current subscription type of the 
group.
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames,
+int numOfMembers,
+SubscriptionType subscriptionType
+) {
+if (subscribedTopicNames.isEmpty()) return subscriptionType;

Review Comment:
   discussed offline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-29 Thread via GitHub


kirktrue commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1583450102


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -98,12 +93,11 @@ public boolean canSendRequest(final long currentTimeMs) {
  * is a request in-flight.
  */
 public boolean requestInFlight() {
-return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs;
+return requestInFlight;

Review Comment:
   @philipnee—let us know if you're OK to leave the method name as is. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-29 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1583446833


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});
+
+if (minIsrRecords.isEmpty()) return;
+if (topicMap.size() == minIsrRecords.size()) {
+// If all the min isr config updates are on the topic level, we 
can trigger a simpler update just on the
+// updated topics.
+
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
+new ArrayList<>(topicMap.keySet()),
+topicName -> topicMap.get(topicName))
+);
+return;
+}
+
+// Because it may require multiple layer look up for the min ISR 
config value. Build a config data copy and apply
+// the config updates to it. Use this config copy for the min ISR look 
up.
+Map> configDataCopy = 
new HashMap<>(configData);
+SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new 
LogContext("dummy-config-update"));
+for (ConfigRecord record : minIsrRecords) {
+replayInternal(record, configDataCopy, localSnapshotRegistry);
+}

Review Comment:
   This is the implementation challenge part of this PR. To find the effective 
min ISR value, it requires checking topic config -> dynamic broker config -> 
default broker config -> ... 
   Let's say the user updates the default broker config:
   1. All the topics could be affected.
   2. The effective min ISR values should be recalculated.
   3. We need to generate the partition change records along with the config 
change records, which means the ReplicationControlManager can't use the regular 
methods for the effective min ISR value. The value should be determined by the 
config records and the current configs.
   
   I found it easier to make a copy of the configs and apply the min ISR 
updates on the copy. Then let the ReplicationControlManager check all the 
partitions with the config copy.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move metrics configs out of KafkaConfig [kafka]

2024-04-29 Thread via GitHub


chia7712 merged PR #15822:
URL: https://github.com/apache/kafka/pull/15822


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]

2024-04-29 Thread via GitHub


dajac commented on code in PR #15818:
URL: https://github.com/apache/kafka/pull/15818#discussion_r1583442656


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1560,13 +1561,15 @@ private List 
consumerGroupStaticMemberGroupLeave(
  * @param records   The list of records to be applied to the state.
  * @return The append future to be applied.
  */
-private CompletableFuture consumerGroupFenceMember(
+private  CoordinatorResult consumerGroupFenceMember(
 ConsumerGroup group,
 ConsumerGroupMember member,
-List records
+List records,
+T response
 ) {
 if (validateOnlineDowngrade(group, member.memberId())) {
-return convertToClassicGroup(group, member.memberId(), records);
+CompletableFuture appendFuture = 
convertToClassicGroup(group, member.memberId(), records);

Review Comment:
   `convertToClassicGroup` is only used by this method, would it make sense to 
move `CoordinatorResult` to it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]

2024-04-29 Thread via GitHub


dajac commented on code in PR #15818:
URL: https://github.com/apache/kafka/pull/15818#discussion_r1583441095


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1499,33 +1499,28 @@ private 
CoordinatorResult consumerGr
 ) throws ApiException {
 ConsumerGroup group = consumerGroup(groupId);
 List records = new ArrayList<>();

Review Comment:
   Should we push this one down too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using

2024-04-29 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842110#comment-17842110
 ] 

Greg Harris commented on KAFKA-16640:
-

Okay, I understand now. SGTM!

> Replace TestUtils#resource by scala.util.Using
> --
>
> Key: KAFKA-16640
> URL: https://issues.apache.org/jira/browse/KAFKA-16640
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
>
> `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we 
> don't need to have custom try-resource function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using

2024-04-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842107#comment-17842107
 ] 

Chia-Ping Tsai commented on KAFKA-16640:


{quote}
does this depend on https://issues.apache.org/jira/browse/KAFKA-12895 ?
{quote}

no, we can complete this in 3.8 since we can use `Using` through 
`scala-collection-compat` in scala 2.12

> Replace TestUtils#resource by scala.util.Using
> --
>
> Key: KAFKA-16640
> URL: https://issues.apache.org/jira/browse/KAFKA-16640
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
>
> `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we 
> don't need to have custom try-resource function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]

2024-04-29 Thread via GitHub


chia7712 commented on code in PR #15773:
URL: https://github.com/apache/kafka/pull/15773#discussion_r1583431374


##
server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java:
##
@@ -86,7 +86,7 @@ public class ServerLogConfigs {
 
 public static final String LOG_DELETE_DELAY_MS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG);
 public static final long LOG_DELETE_DELAY_MS_DEFAULT = 6L;
-public static final String LOG_DELETE_DELAY_MS_DOC = "The amount of time 
to wait before deleting a file from the filesystem";
+public static final String LOG_DELETE_DELAY_MS_DOC = "The amount of time 
to wait before deleting a file from the filesystem. If the value is 0 and there 
is no file to delete, the system will wait 1 second, or the job may keep 
running.";

Review Comment:
   "1 second"? or "1 millisecond"?
   
   Also, we need to highlight "busy waiting" for users



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using

2024-04-29 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842104#comment-17842104
 ] 

Greg Harris commented on KAFKA-16640:
-

Hi [~chia7712] does this depend on 
https://issues.apache.org/jira/browse/KAFKA-12895 ? If so we should wait to 
implement this after the 3.8 release.

> Replace TestUtils#resource by scala.util.Using
> --
>
> Key: KAFKA-16640
> URL: https://issues.apache.org/jira/browse/KAFKA-16640
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
>
> `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we 
> don't need to have custom try-resource function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-29 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1583424779


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});

Review Comment:
   If `min.insync.replicas` is not set on the topic config level, the effective 
`min.insync.replicas` of a topic will change if default broker config is 
updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-29 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1583423622


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -260,6 +279,7 @@ private ApiError incrementalAlterConfigResource(
 if (error.isFailure()) {
 return error;
 }
+maybeTriggerPartitionUpdateOnMinIsrChange(newRecords);

Review Comment:
   `legacyAlterConfigResource` has supported. Adding UT to cover it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-29 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1583422586


##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -159,4 +160,14 @@ BrokersToIsrs.PartitionsOnReplicaIterator 
partitionsWithBrokerInElr(int brokerId
 }
 return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false);
 }
+
+BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithElr() {
+Map topicMap = new HashMap<>();
+for (Map map : elrMembers.values()) {
+if (map != null) {

Review Comment:
   Done, it can't be null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-04-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842099#comment-17842099
 ] 

Chia-Ping Tsai commented on KAFKA-16639:


check the code again. It seems there are two possible bugs if we change the 
state to `LEAVING` before receiving the heartbeat response

1. 
https://github.com/apache/kafka/commit/8c0488b887be4a9178563f1d72514010f83b8614 
ignore the response and leave the member id be empty. That obstructs from 
leaving the group as server will reject the request (epoch=-1 and member is 
empty)

2. 
https://github.com/apache/kafka/blob/636e65aa6b3558a7ae239ce69579b62ab3377fcb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212
 could ignore the request due to in-flight request.

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-04-29 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reassigned KAFKA-16639:
--

Assignee: Philip Nee  (was: Chia-Ping Tsai)

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-29 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1583356709


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java:
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class ConnectorOffsetBackingStoreTest {
+
+// Serialized
+private static final byte[] OFFSET_KEY_SERIALIZED = 
"key-serialized".getBytes();
+private static final byte[] OFFSET_VALUE_SERIALIZED = 
"value-serialized".getBytes();
+
+private static final KafkaException PRODUCE_EXCEPTION = new 
KafkaException();
+
+private final ByteArraySerializer byteArraySerializer = new 
ByteArraySerializer();
+
+@Test
+public void 
testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() {
+MockProducer connectorStoreProducer = 
createMockProducer();
+MockProducer workerStoreProducer = 
createMockProducer();
+KafkaOffsetBackingStore connectorStore = createStore("topic1", 
connectorStoreProducer);
+KafkaOffsetBackingStore workerStore = createStore("topic2", 
workerStoreProducer);
+
+ConnectorOffsetBackingStore offsetBackingStore = 
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+() -> LoggingContext.forConnector("source-connector"),
+workerStore,
+connectorStore,
+"offsets-topic",
+mock(TopicAdmin.class));
+
+AtomicBoolean callbackInvoked = new AtomicBoolean();
+AtomicReference callbackResult = new AtomicReference<>();
+AtomicReference callbackError = new AtomicReference<>();
+
+Future setFuture = 
offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY_SERIALIZED, null), 
(error, result) -> {
+callbackInvoked.set(true);
+callbackResult.set(result);
+callbackError.set(error);
+});

Review Comment:
   Nit: could it also be helpful to make sure we don't prematurely complete our 
callbacks?
   
   ```suggestion
   });
   
   assertFalse("Store callback should not be invoked before underlying 
producer callback", callbackInvoked.get());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL 

Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583294148


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+Map subscribedTopicsMemberCount = new HashMap<>();

Review Comment:
   I initialized it in the method to update, and we return the map. Currently 
there is no getter for the map, I can create another getter to return the map 
and not just the list of subscribed topic names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583294148


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+Map subscribedTopicsMemberCount = new HashMap<>();

Review Comment:
   I initialized it in the method to update, and we return the map. Currently 
there is no getter for the map, I can create another getter to return the map 
and not just the subscribed topic names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583294148


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+Map subscribedTopicsMemberCount = new HashMap<>();

Review Comment:
   I initialized it in the method to update, and we return the map. THe map 
itself is private so I can't initialize it directly, unless I create another 
getter to return the map and not just the subscribed topic names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r158025


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1350,6 +1351,11 @@ private 
CoordinatorResult consumerGr
 .withMembers(group.members())
 .withStaticMembers(group.staticMembers())
 .withSubscriptionMetadata(subscriptionMetadata)
+.withSubscriptionType(ConsumerGroup.subscriptionType(
+subscribedTopicsMemberCount,
+group.numMembers(),

Review Comment:
   An alternative may be to compare the counts within the Map without 
considering the group size. This could work because we do not accept empty 
subscriptions from a member. -> this works as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583332135


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1350,6 +1351,11 @@ private 
CoordinatorResult consumerGr
 .withMembers(group.members())
 .withStaticMembers(group.staticMembers())
 .withSubscriptionMetadata(subscriptionMetadata)
+.withSubscriptionType(ConsumerGroup.subscriptionType(
+subscribedTopicsMemberCount,
+group.numMembers(),

Review Comment:
   yeah I was actually thinking the same, I saw that the member map is updated 
in the getOrCreateMember step so I figured the memberCount is updated, unless a 
member is removed, I'm not sure if that updates elsewhere, or I might be 
overlooking something



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-04-29 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2083099739

   Rely on #15766 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >