Re: [PR] KAFKA-5863: Avoid NPE when RestClient calls expecting no-content receive content. [kafka]

2024-01-16 Thread via GitHub


gharris1727 commented on PR #13294:
URL: https://github.com/apache/kafka/pull/13294#issuecomment-1895192159

   > Wouldn't any unexpected response here simply be indicative of a bug in the 
Connect runtime since the REST client is only being used internally to forward 
requests within a Connect cluster?
   
   While a bug in the remote connect worker could produce unexpected content, I 
think it's more likely that for one of these calls to get unexpected content, 
that what is on the other end of the connection isn't actually a connect 
worker. For example, if there was a stale DNS resolution, and the request went 
to an unrelated service.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-16 Thread via GitHub


dajac commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1454773095


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -935,8 +935,9 @@ private[group] class GroupCoordinator(
   producerId,
   producerEpoch,
   RecordBatch.NO_SEQUENCE,
-  requestLocal,
-  postVerificationCallback
+  // Wrap the callback to be handled on an arbitrary request handler 
thread
+  // when transaction verification is complete.
+  KafkaRequestHandler.wrapAsyncCallback(postVerificationCallback, 
requestLocal)

Review Comment:
   We still need this in the old coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-16 Thread via GitHub


dajac commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1454771868


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig,
* @param producerId  the producer id for the producer writing to the 
transaction
* @param producerEpoch   the epoch of the producer writing to the 
transaction
* @param baseSequencethe base sequence of the first record in the batch 
we are trying to append
-   * @param requestLocalcontainer for the stateful instances scoped to 
this request -- this must correspond to the
-   *thread calling this method
* @param callbackthe method to execute once the verification is 
either completed or returns an error
*
* When the verification returns, the callback will be supplied the error if 
it exists or Errors.NONE.
* If the verification guard exists, it will also be supplied. Otherwise the 
SENTINEL verification guard will be returned.
-   * This guard can not be used for verification and any appends that attenpt 
to use it will fail.
+   * This guard can not be used for verification and any appends that attempt 
to use it will fail.
*/
   def maybeStartTransactionVerificationForPartition(
 topicPartition: TopicPartition,
 transactionalId: String,
 producerId: Long,
 producerEpoch: Short,
 baseSequence: Int,
-requestLocal: RequestLocal,
-callback: (Errors, RequestLocal, VerificationGuard) => Unit
+callback: Either[Errors, VerificationGuard] => Unit
   ): Unit = {
-def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
-newRequestLocal: RequestLocal,
-verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
-  callback(
-preAppendErrors.getOrElse(topicPartition, Errors.NONE),
-newRequestLocal,
-verificationGuards.getOrElse(topicPartition, 
VerificationGuard.SENTINEL))
+def generalizedCallback(results: Map[TopicPartition, Either[Errors, 
VerificationGuard]]): Unit = {

Review Comment:
   Ack. I will take a look at https://github.com/apache/kafka/pull/15087.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-01-16 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1454760385


##
clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+public class SubscriptionPattern {

Review Comment:
   I think we should leave it on the consumer side, as I think its 
functionalities is more to differenciate between using Java.util.Pattern and 
using regex that compatible with RE2J. If we were to use a string instead of 
SubscribedPattern, the user may get confused about why they need to use Pattern 
for the other subsribe methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-01-16 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1454761116


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:
##
@@ -86,7 +86,6 @@
 import static org.mockito.Mockito.when;
 
 public class OffsetsRequestManagerTest {
-

Review Comment:
   My bad



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-01-16 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1454760385


##
clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+public class SubscriptionPattern {

Review Comment:
   I think we should leave it on the consumer side, as I think its 
functionalities is more to differenciate between using Java.util.Pattern and 
using regex that compatible with RE2J. If we were just to use a string instead 
of SubscribedPattern, the user may get confused about why they need to use 
Pattern for the other subsribe methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16022) AsyncKafkaConsumer sometimes complains “No current assignment for partition {}”

2024-01-16 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807585#comment-17807585
 ] 

Phuc Hong Tran commented on KAFKA-16022:


[~pnee], were you seeing this exception in the FetchRequestManagerTest or was 
it some places else?

> AsyncKafkaConsumer sometimes complains “No current assignment for partition 
> {}”
> ---
>
> Key: KAFKA-16022
> URL: https://issues.apache.org/jira/browse/KAFKA-16022
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> This seems to be a timing issue that before the member receives any 
> assignment from the coordinator, the fetcher will try to find the current 
> position causing "No current assignment for partition {}".  This creates a 
> small amount of noise to the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]

2024-01-16 Thread via GitHub


cmccabe commented on PR #15197:
URL: https://github.com/apache/kafka/pull/15197#issuecomment-1895021752

   Thanks for the quick response on this one, @pprovenzano . LGTM once you 
address the one comment I made.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]

2024-01-16 Thread via GitHub


cmccabe commented on code in PR #15197:
URL: https://github.com/apache/kafka/pull/15197#discussion_r1454694471


##
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##
@@ -46,6 +46,18 @@ public final class FeaturesImage {
 ZkMigrationState.NONE
 );
 
+public static final FeaturesImage LATEST = new FeaturesImage(

Review Comment:
   Hmm... I'd rather not add `FeaturesImage.LATEST` here since it feels like 
something that is only for testing. Can we just add this to 
`ReplicaManagerTest.scala`, for now?
   
   LGTM once that's addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Removed unused CommittedOffsetsFile class. [kafka]

2024-01-16 Thread via GitHub


satishd closed pull request #15209: MINOR Removed unused CommittedOffsetsFile 
class.
URL: https://github.com/apache/kafka/pull/15209


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15851) broker under replicated due to error java.nio.BufferOverflowException

2024-01-16 Thread Yu Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807561#comment-17807561
 ] 

Yu Wang commented on KAFKA-15851:
-

got similar stack trace in 2.5.1, recovered after restart.
{code:java}
java.nio.BufferOverflowException
at java.nio.Buffer.nextPutIndex(Buffer.java:533)
at java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:796)
at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134)
at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:507)
at kafka.log.Log.$anonfun$roll$8(Log.scala:1900)
at kafka.log.Log.$anonfun$roll$2(Log.scala:1900)
at kafka.log.Log.roll(Log.scala:2322)
at kafka.log.Log.maybeRoll(Log.scala:1849)
at kafka.log.Log.$anonfun$append$2(Log.scala:1148)
at kafka.log.Log.append(Log.scala:2322)
at kafka.log.Log.appendAsFollower(Log.scala:1017)
at 
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1(Partition.scala:924)
at 
kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:917)
at 
kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:931)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:167)
at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:332)
at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:320)
at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:319)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
at scala.collection.AbstractIterable.foreach(Iterable.scala:921)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:319)
at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:135)
at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:134)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
{code}

> broker under replicated due to error java.nio.BufferOverflowException
> -
>
> Key: KAFKA-15851
> URL: https://issues.apache.org/jira/browse/KAFKA-15851
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.2
> Environment: Kafka Version: 3.3.2
> Deployment mode: zookeeper
>Reporter: wangliucheng
>Priority: Major
> Attachments: p1.png
>
>
> In my kafka cluster, kafka update 2.0 to 3.3.2 version 
> {*}first start failed{*}, because the same directory was configured
> The error is as follows:
>  
> {code:java}
> [2023-11-16 10:04:09,952] ERROR (main kafka.Kafka$ 159) Exiting Kafka due to 
> fatal exception during startup.
> java.lang.IllegalStateException: Duplicate log directories for 
> skydas_sc_tdevirsec-12 are found in both 
> /data01/kafka/log/skydas_sc_tdevirsec-12 and 
> /data07/kafka/log/skydas_sc_tdevirsec-12. It is likely because log directory 
> failure happened while broker was replacing current replica with future 
> replica. Recover broker from this failure by manually deleting one of the two 
> directories for this partition. It is recommended to delete the partition in 
> the log directory that is known to have failed recently.
>         at kafka.log.LogManager.loadLog(LogManager.scala:305)
>         at kafka.log.LogManager.$anonfun$loadLogs$14(LogManager.scala:403)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2023-11-16 10:04:09,953] INFO (kafka-shutdown-hook kafka.server.KafkaServer 
> 66) [KafkaServer id=1434] shutting down {code}
>  
>  
> *second,* remove /data07/kafka/log in log.dirs  and start kafka also reported 
> an error :
>  
> {code:java}
> [2023-11-16 10:13:10,713] INFO (ReplicaFetcherThread-3-1008 
> kafka.log.UnifiedLog 66) [UnifiedLog partition=ty_udp_full-60, 
> dir=/data04/kafka/log] Rolling new log segment (log_size = 
> 755780551/1073741824}, offset_index_size = 2621440/2621440, time_index_size = 
> 1747626/1747626, inactive_time_ms = 2970196/60480).
> [2023-11-16 10:13:10,714] ERROR (Replica

Re: [PR] KAFKA-16146: Checkpoint log-start-offset for remote log enabled topics [kafka]

2024-01-16 Thread via GitHub


kamalcph commented on PR #15201:
URL: https://github.com/apache/kafka/pull/15201#issuecomment-1894989811

   Test failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Removed unused CommittedOffsetsFile class. [kafka]

2024-01-16 Thread via GitHub


kamalcph commented on PR #15209:
URL: https://github.com/apache/kafka/pull/15209#issuecomment-1894987437

   Should we also avoid creating the 
[remote_log_snapshot](https://sourcegraph.com/github.com/apache/kafka@e563aad4eec2e08c8db54e1afebe28c746130ba4/-/blob/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java?L75)
 file under each partition directory?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]

2024-01-16 Thread via GitHub


philipnee commented on code in PR #15210:
URL: https://github.com/apache/kafka/pull/15210#discussion_r1454575608


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -2169,10 +2169,8 @@ public void 
testCommittedAuthenticationFailure(GroupProtocol groupProtocol) {
 assertThrows(AuthenticationException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
 }
 
-// TODO: this test triggers a bug with the CONSUMER group protocol 
implementation.

Review Comment:
   I'm not sure where is the bug.  The test seems to passed locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-5863: Avoid NPE when RestClient calls expecting no-content receive content. [kafka]

2024-01-16 Thread via GitHub


yashmayya commented on PR #13294:
URL: https://github.com/apache/kafka/pull/13294#issuecomment-1894930823

   > because it's inherently untrusted content that shouldn't be put into the 
log by default
   
   I'm not sure I follow why this would be inherently untrusted content? 
Wouldn't any unexpected response here simply be indicative of a bug in the 
Connect runtime since the REST client is only being used internally to forward 
requests within a Connect cluster?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807543#comment-17807543
 ] 

Matthias J. Sax commented on KAFKA-16141:
-

Thanks. I am wondering if the right fix would be, to change 
`KeyValueToTimestampedKeyValueByteStoreAdapter` to implement 
`WrappedStateStore` – in the end, it is a wrapper, but the restore code path 
does not recognize it as such, and thus cannot "unwrap" it's inner store to 
pick the right converter? – Comparing to 
`InMemoryTimestampedKeyValueStoreMarker` it also implement both `WrappedStore` 
and `TimestampedBytesStore`?

We did consider it a bug-fix to add `TimestampedBytesStore` to 
`KeyValueToTimestampedKeyValueByteStoreAdapter` because in the end, it's does 
expect `` value byte format an `put()` and also returns this format 
on `get()`.

It's just that the restore code path is not interested in some upper layer, but 
only in the most inner wrapper store type?

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]

2024-01-16 Thread via GitHub


philipnee opened a new pull request, #15210:
URL: https://github.com/apache/kafka/pull/15210

   In this PR, I'm adding sensor to the `CommitRequestManager` to record the 
necessary metrics, i.e.:
   
   ```
   commit-latency-avg
   commit-latency-max
   commit-rate
   commit-total
   committed-time-ns-total
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR Removed unused CommittedOffsetsFile class. [kafka]

2024-01-16 Thread via GitHub


satishd opened a new pull request, #15209:
URL: https://github.com/apache/kafka/pull/15209

   We will introduce the same when it is required for enhancing TBRLMM to 
consume from a specific offset when snapshots are implemented.
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807520#comment-17807520
 ] 

Almog Gavra commented on KAFKA-16141:
-

OK, after doing some more digging I don't think it's related to my change. 
Instead I believe it was caused by [https://github.com/apache/kafka/pull/14570] 
- that PR changed
{code:java}
KeyValueToTimestampedKeyValueByteStoreAdapter implements ...{code}
to
{code:java}
KeyValueToTimestampedKeyValueByteStoreAdapter implements ..., 
TimestampedBytesStore {code}
This caused WrappedStateStore#isTimestamped to return true when previously it 
returned false. This, in turn, caused us to initialize the store with the 
RecordConverts.RAW_TO_TIMESTAMED_INSTANCE as the converter (see 
StateManagerUtil#converterForStore). After a restore, the converter will 
prepend the record timestamp when it shouldn't, because an additional timestamp 
is then prepended when it goes through the adapter.

Related: https://issues.apache.org/jira/browse/KAFKA-15629

cc [~mjsax] [~hanyuzheng] 

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16131) Repeated UnsupportedVersionException logged when running Kafka 3.7.0-RC2 KRaft cluster with metadata version 3.6

2024-01-16 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano updated KAFKA-16131:
--
Affects Version/s: 3.7.0

> Repeated UnsupportedVersionException logged when running Kafka 3.7.0-RC2 
> KRaft cluster with metadata version 3.6
> 
>
> Key: KAFKA-16131
> URL: https://issues.apache.org/jira/browse/KAFKA-16131
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Assignee: Proven Provenzano
>Priority: Blocker
> Fix For: 3.7.0
>
>
> When running Kafka 3.7.0-RC2 as a KRaft cluster with metadata version set to 
> 3.6-IV2 metadata version, it throws repeated errors like this in the 
> controller logs:
> {quote}2024-01-13 16:58:01,197 INFO [QuorumController id=0] 
> assignReplicasToDirs: event failed with UnsupportedVersionException in 15 
> microseconds. (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]
> 2024-01-13 16:58:01,197 ERROR [ControllerApis nodeId=0] Unexpected error 
> handling request RequestHeader(apiKey=ASSIGN_REPLICAS_TO_DIRS, apiVersion=0, 
> clientId=1000, correlationId=14, headerVersion=2) – 
> AssignReplicasToDirsRequestData(brokerId=1000, brokerEpoch=5, 
> directories=[DirectoryData(id=w_uxN7pwQ6eXSMrOKceYIQ, 
> topics=[TopicData(topicId=bvAKLSwmR7iJoKv2yZgygQ, 
> partitions=[PartitionData(partitionIndex=2), 
> PartitionData(partitionIndex=1)]), TopicData(topicId=uNe7f5VrQgO0zST6yH1jDQ, 
> partitions=[PartitionData(partitionIndex=0)])])]) with context 
> RequestContext(header=RequestHeader(apiKey=ASSIGN_REPLICAS_TO_DIRS, 
> apiVersion=0, clientId=1000, correlationId=14, headerVersion=2), 
> connectionId='172.16.14.219:9090-172.16.14.217:53590-7', 
> clientAddress=/[172.16.14.217|http://172.16.14.217/], 
> principal=User:CN=my-cluster-kafka,O=io.strimzi, 
> listenerName=ListenerName(CONTROLPLANE-9090), securityProtocol=SSL, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.7.0), fromPrivilegedListener=false, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@71004ad2])
>  (kafka.server.ControllerApis) [quorum-controller-0-event-handler]
> java.util.concurrent.CompletionException: 
> org.apache.kafka.common.errors.UnsupportedVersionException: Directory 
> assignment is not supported yet.
> at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
>  at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
>  at 
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636)
>  at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>  at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
>  at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:880)
>  at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871)
>  at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:148)
>  at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:137)
>  at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>  at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>  at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: 
> Directory assignment is not supported yet.
> {quote}
>  
> With the metadata version set to 3.6-IV2, it makes sense that the request is 
> not supported. But the request should in such case not be sent at all.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-16 Thread via GitHub


kirktrue commented on PR #15186:
URL: https://github.com/apache/kafka/pull/15186#issuecomment-1894795635

   @ijuma / @stanislavkozlovski are either of you able to review? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]

2024-01-16 Thread via GitHub


mjsax commented on PR #15207:
URL: https://github.com/apache/kafka/pull/15207#issuecomment-1894793526

   This PR must be cherry-picked to `3.7` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: code cleanup [kafka]

2024-01-16 Thread via GitHub


mjsax commented on PR #15208:
URL: https://github.com/apache/kafka/pull/15208#issuecomment-1894793322

   This PR must be cherry-picked to `3.7` and `3.6` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]

2024-01-16 Thread via GitHub


mjsax commented on code in PR #15199:
URL: https://github.com/apache/kafka/pull/15199#discussion_r1454318352


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -71,6 +71,7 @@
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 import static org.apache.kafka.common.config.ConfigDef.parseType;
+import static 
org.apache.kafka.streams.internals.UpgradeFromValues.UPGRADE_FROM_35;

Review Comment:
   After merging and doing the follow up PR to add `3.6`, I realized that using 
this import is not what we want to do.
   
   Call for review (follow up cleanup PR): 
https://github.com/apache/kafka/pull/15208



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]

2024-01-16 Thread via GitHub


mjsax commented on PR #15199:
URL: https://github.com/apache/kafka/pull/15199#issuecomment-1894790719

   Follow-up PR to add `3.6`: https://github.com/apache/kafka/pull/15207


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]

2024-01-16 Thread via GitHub


mjsax opened a new pull request, #15207:
URL: https://github.com/apache/kafka/pull/15207

   Adds version 3.6 to the possible values for config upgrade_from.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16104: Enable additional PlaintextConsumerTest tests for new consumer [kafka]

2024-01-16 Thread via GitHub


kirktrue opened a new pull request, #15206:
URL: https://github.com/apache/kafka/pull/15206

   We reevaluated the integration tests that were disabled for the new consumer 
group protocol which _should_ be supported. The evaluation was to run the 
`PlaintextConsumerTest` suite ten times and see which tests passed and which 
failed.
   
   Based on that evaluation, the following test can now be enabled:
   
   - `testAutoCommitOnClose`
   - `testAutoCommitOnCloseAfterWakeup`
   - `testAutoCommitOnRebalance`
   - `testExpandingTopicSubscriptions`
   - `testMultiConsumerSessionTimeoutOnClose`
   - `testMultiConsumerSessionTimeoutOnStopPolling`
   - `testShrinkingTopicSubscriptions`
   
   There are three tests which consistently failed. For each, a dedicated Jira 
was created to track and fix. Those that failed:
   
   - `testPerPartitionLagMetricsCleanUpWithSubscribe` (failure rate 100%, 
KAFKA-16150)
   - `testPerPartitionLeadMetricsCleanUpWithSubscribe` (failure rate: 70%, 
KAFKA-16151)
   - `testStaticConsumerDetectsNewPartitionCreatedAfterRestart` (failure rate: 
100%, KAFKA-16152)
   
   See [KAFKA-16104](https://issues.apache.org/jira/browse/KAFKA-16104) for 
more details.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]

2024-01-16 Thread via GitHub


mjsax commented on PR #15199:
URL: https://github.com/apache/kafka/pull/15199#issuecomment-1894786932

   Merged to `trunk` and cherry-picked to `3.7` and `3.6` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]

2024-01-16 Thread via GitHub


mjsax merged PR #15199:
URL: https://github.com/apache/kafka/pull/15199


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]

2024-01-16 Thread via GitHub


mjsax commented on PR #15199:
URL: https://github.com/apache/kafka/pull/15199#issuecomment-1894782497

   I just checked the test failures on 
https://github.com/apache/kafka/pull/15151 (cf last comment) and it failed 
because `3.6` is missing.
   
   Can you also do a follow up PR for 3.6 branch which only add `3.5` to 
upgrade_from? (or use this PR and do a follow up for trunk/3.7 to add 3.6 
instead?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer

2024-01-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16104:
--
Description: 
It should be possible to enable:
 * testAutoCommitOnClose
 * testAutoCommitOnCloseAfterWakeup
 * testExpandingTopicSubscriptions
 * testShrinkingTopicSubscriptions
 * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed)
 * testMultiConsumerSessionTimeoutOnStopPolling
 * testAutoCommitOnRebalance
 * testPerPartitionLeadMetricsCleanUpWithSubscribe
 * testPerPartitionLagMetricsCleanUpWithSubscribe
 * testStaticConsumerDetectsNewPartitionCreatedAfterRestart

  was:
It should be possible to enable:
* testAutoCommitOnClose
* testAutoCommitOnCloseAfterWakeup
* testExpandingTopicSubscriptions
* testShrinkingTopicSubscriptions
* testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed)
* testAutoCommitOnRebalance
* testPerPartitionLeadMetricsCleanUpWithSubscribe
* testPerPartitionLagMetricsCleanUpWithSubscribe
* testStaticConsumerDetectsNewPartitionCreatedAfterRestart


> Enable additional PlaintextConsumerTest tests for new consumer
> --
>
> Key: KAFKA-16104
> URL: https://issues.apache.org/jira/browse/KAFKA-16104
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> It should be possible to enable:
>  * testAutoCommitOnClose
>  * testAutoCommitOnCloseAfterWakeup
>  * testExpandingTopicSubscriptions
>  * testShrinkingTopicSubscriptions
>  * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed)
>  * testMultiConsumerSessionTimeoutOnStopPolling
>  * testAutoCommitOnRebalance
>  * testPerPartitionLeadMetricsCleanUpWithSubscribe
>  * testPerPartitionLagMetricsCleanUpWithSubscribe
>  * testStaticConsumerDetectsNewPartitionCreatedAfterRestart



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [7/7] Always materialize the most recent committed offset [kafka]

2024-01-16 Thread via GitHub


jolshan commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1454293498


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -712,13 +712,14 @@ public void run() {
 try {
 // Apply the records to the state machine.
 if (result.replayRecords()) {
-result.records().forEach(record ->
+for (int i = 0; i < result.records().size(); 
i++) {
 context.coordinator.replay(
+prevLastWrittenOffset + i,

Review Comment:
   is this the only way to get the offsets? 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/7] Always materialize the most recent committed offset [kafka]

2024-01-16 Thread via GitHub


jolshan commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1454285529


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -979,15 +981,30 @@ public void replayEndTransactionMarker(
 pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
 topicOffsets.forEach((topicName, partitionOffsets) -> {
 partitionOffsets.forEach((partitionId, offsetAndMetadata) 
-> {
-log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
-"with topic {}, partition {}, and offset {}.",
-producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
-offsets.put(
+OffsetAndMetadata existingOffsetAndMetadata = 
offsets.get(
 groupId,
 topicName,
-partitionId,
-offsetAndMetadata
+partitionId
 );
+
+// We always keep the most recent committed offset 
when we have a mix of transactional and regular
+// offset commits. Without preserving information of 
the commit record offset, compaction of the
+// __consumer_offsets topic itself may result in the 
wrong offset commit being materialized.
+if (existingOffsetAndMetadata == null || 
offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) {
+log.debug("Committed transactional offset commit 
{} for producer id {} in group {} " +
+"with topic {} and partition {}.",
+offsetAndMetadata, producerId, groupId, 
topicName, partitionId);
+offsets.put(

Review Comment:
   if the offset is not the latest offset were we incorrectly saying the last 
transactional commit offset was the latest one?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -979,15 +981,30 @@ public void replayEndTransactionMarker(
 pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
 topicOffsets.forEach((topicName, partitionOffsets) -> {
 partitionOffsets.forEach((partitionId, offsetAndMetadata) 
-> {
-log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
-"with topic {}, partition {}, and offset {}.",
-producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
-offsets.put(
+OffsetAndMetadata existingOffsetAndMetadata = 
offsets.get(
 groupId,
 topicName,
-partitionId,
-offsetAndMetadata
+partitionId
 );
+
+// We always keep the most recent committed offset 
when we have a mix of transactional and regular
+// offset commits. Without preserving information of 
the commit record offset, compaction of the
+// __consumer_offsets topic itself may result in the 
wrong offset commit being materialized.
+if (existingOffsetAndMetadata == null || 
offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) {
+log.debug("Committed transactional offset commit 
{} for producer id {} in group {} " +
+"with topic {} and partition {}.",
+offsetAndMetadata, producerId, groupId, 
topicName, partitionId);
+offsets.put(

Review Comment:
   if the offset is not the latest offset were we incorrectly saying the 
transactional commit offset was the latest one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-01-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16008:
--
Issue Type: Bug  (was: Test)

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation

2024-01-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16009:
--
Issue Type: Bug  (was: Test)

> Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
> 
>
> Key: KAFKA-16009
> URL: https://issues.apache.org/jira/browse/KAFKA-16009
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
>   at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [7/7] Always materialize the most recent committed offset [kafka]

2024-01-16 Thread via GitHub


jolshan commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1454285529


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -979,15 +981,30 @@ public void replayEndTransactionMarker(
 pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
 topicOffsets.forEach((topicName, partitionOffsets) -> {
 partitionOffsets.forEach((partitionId, offsetAndMetadata) 
-> {
-log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
-"with topic {}, partition {}, and offset {}.",
-producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
-offsets.put(
+OffsetAndMetadata existingOffsetAndMetadata = 
offsets.get(
 groupId,
 topicName,
-partitionId,
-offsetAndMetadata
+partitionId
 );
+
+// We always keep the most recent committed offset 
when we have a mix of transactional and regular
+// offset commits. Without preserving information of 
the commit record offset, compaction of the
+// __consumer_offsets topic itself may result in the 
wrong offset commit being materialized.
+if (existingOffsetAndMetadata == null || 
offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) {
+log.debug("Committed transactional offset commit 
{} for producer id {} in group {} " +
+"with topic {} and partition {}.",
+offsetAndMetadata, producerId, groupId, 
topicName, partitionId);
+offsets.put(

Review Comment:
   if the offset is not the latest offset were we incorrectly saying the last 
committed offset was the latest one?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -979,15 +981,30 @@ public void replayEndTransactionMarker(
 pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
 topicOffsets.forEach((topicName, partitionOffsets) -> {
 partitionOffsets.forEach((partitionId, offsetAndMetadata) 
-> {
-log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
-"with topic {}, partition {}, and offset {}.",
-producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
-offsets.put(
+OffsetAndMetadata existingOffsetAndMetadata = 
offsets.get(
 groupId,
 topicName,
-partitionId,
-offsetAndMetadata
+partitionId
 );
+
+// We always keep the most recent committed offset 
when we have a mix of transactional and regular
+// offset commits. Without preserving information of 
the commit record offset, compaction of the
+// __consumer_offsets topic itself may result in the 
wrong offset commit being materialized.
+if (existingOffsetAndMetadata == null || 
offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) {
+log.debug("Committed transactional offset commit 
{} for producer id {} in group {} " +
+"with topic {} and partition {}.",
+offsetAndMetadata, producerId, groupId, 
topicName, partitionId);
+offsets.put(

Review Comment:
   if the offset is not the latest offset were we incorrectly saying the last 
transactionally committed offset was the latest one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/7] Always materialize the most recent committed offset [kafka]

2024-01-16 Thread via GitHub


jolshan commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1454282345


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -918,7 +920,7 @@ public void replay(
 groupId,
 topic,
 partition,
-OffsetAndMetadata.fromRecord(value)
+OffsetAndMetadata.fromRecord(offset, value)

Review Comment:
   it is interesting that the offset passed in here is new (probably not the 
offset that OffsetAndMetadata is referring to)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/7] Always materialize the most recent committed offset [kafka]

2024-01-16 Thread via GitHub


jolshan commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1454280167


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java:
##
@@ -92,30 +117,34 @@ public boolean equals(Object o) {
 
 OffsetAndMetadata that = (OffsetAndMetadata) o;
 
-if (offset != that.offset) return false;
+if (committedOffset != that.committedOffset) return false;
 if (commitTimestampMs != that.commitTimestampMs) return false;
-if (!leaderEpoch.equals(that.leaderEpoch)) return false;
-if (!metadata.equals(that.metadata)) return false;
-return expireTimestampMs.equals(that.expireTimestampMs);
+if (recordOffset != that.recordOffset) return false;
+if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false;

Review Comment:
   any reason why we changed from the .equals to Objects.equals?  I guess it is 
safer if one of them can be null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-16 Thread via GitHub


jolshan commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1454273994


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -935,8 +935,9 @@ private[group] class GroupCoordinator(
   producerId,
   producerEpoch,
   RecordBatch.NO_SEQUENCE,
-  requestLocal,
-  postVerificationCallback
+  // Wrap the callback to be handled on an arbitrary request handler 
thread
+  // when transaction verification is complete.
+  KafkaRequestHandler.wrapAsyncCallback(postVerificationCallback, 
requestLocal)

Review Comment:
   Why are we calling this here? I thought we wanted to avoid this wrap here 
and only do it for produce requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-16 Thread via GitHub


jolshan commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1454268198


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig,
* @param producerId  the producer id for the producer writing to the 
transaction
* @param producerEpoch   the epoch of the producer writing to the 
transaction
* @param baseSequencethe base sequence of the first record in the batch 
we are trying to append
-   * @param requestLocalcontainer for the stateful instances scoped to 
this request -- this must correspond to the
-   *thread calling this method
* @param callbackthe method to execute once the verification is 
either completed or returns an error
*
* When the verification returns, the callback will be supplied the error if 
it exists or Errors.NONE.
* If the verification guard exists, it will also be supplied. Otherwise the 
SENTINEL verification guard will be returned.
-   * This guard can not be used for verification and any appends that attenpt 
to use it will fail.
+   * This guard can not be used for verification and any appends that attempt 
to use it will fail.
*/
   def maybeStartTransactionVerificationForPartition(
 topicPartition: TopicPartition,
 transactionalId: String,
 producerId: Long,
 producerEpoch: Short,
 baseSequence: Int,
-requestLocal: RequestLocal,
-callback: (Errors, RequestLocal, VerificationGuard) => Unit
+callback: Either[Errors, VerificationGuard] => Unit
   ): Unit = {
-def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
-newRequestLocal: RequestLocal,
-verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
-  callback(
-preAppendErrors.getOrElse(topicPartition, Errors.NONE),
-newRequestLocal,
-verificationGuards.getOrElse(topicPartition, 
VerificationGuard.SENTINEL))
+def generalizedCallback(results: Map[TopicPartition, Either[Errors, 
VerificationGuard]]): Unit = {

Review Comment:
   https://github.com/apache/kafka/pull/15087



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-16 Thread via GitHub


jolshan commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1454267538


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig,
* @param producerId  the producer id for the producer writing to the 
transaction
* @param producerEpoch   the epoch of the producer writing to the 
transaction
* @param baseSequencethe base sequence of the first record in the batch 
we are trying to append
-   * @param requestLocalcontainer for the stateful instances scoped to 
this request -- this must correspond to the
-   *thread calling this method
* @param callbackthe method to execute once the verification is 
either completed or returns an error
*
* When the verification returns, the callback will be supplied the error if 
it exists or Errors.NONE.
* If the verification guard exists, it will also be supplied. Otherwise the 
SENTINEL verification guard will be returned.
-   * This guard can not be used for verification and any appends that attenpt 
to use it will fail.
+   * This guard can not be used for verification and any appends that attempt 
to use it will fail.
*/
   def maybeStartTransactionVerificationForPartition(
 topicPartition: TopicPartition,
 transactionalId: String,
 producerId: Long,
 producerEpoch: Short,
 baseSequence: Int,
-requestLocal: RequestLocal,
-callback: (Errors, RequestLocal, VerificationGuard) => Unit
+callback: Either[Errors, VerificationGuard] => Unit
   ): Unit = {
-def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
-newRequestLocal: RequestLocal,
-verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
-  callback(
-preAppendErrors.getOrElse(topicPartition, Errors.NONE),
-newRequestLocal,
-verificationGuards.getOrElse(topicPartition, 
VerificationGuard.SENTINEL))
+def generalizedCallback(results: Map[TopicPartition, Either[Errors, 
VerificationGuard]]): Unit = {

Review Comment:
   Please take a look at my refactor PR. I have some this to some extent.
   I'd prefer not to overhaul it again (as I did after the previous group 
coordinator change)
   Hopefully it makes this work easier too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]

2024-01-16 Thread via GitHub


mjsax commented on PR #15199:
URL: https://github.com/apache/kafka/pull/15199#issuecomment-1894743876

   Is this fix sufficient? Don't we not also need to add `3.6` as 
"upgrade_from" version?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16042: Add byte-rate metrics for topic and partition [kafka]

2024-01-16 Thread via GitHub


ex172000 commented on PR #15085:
URL: https://github.com/apache/kafka/pull/15085#issuecomment-1894726845

   FYI: We are making a similar effort here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-977%3A+Partition-Level+Throughput+Metrics


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16126: Kcontroller dynamic configurations may fail to apply at startup [kafka]

2024-01-16 Thread via GitHub


cmccabe commented on PR #15192:
URL: https://github.com/apache/kafka/pull/15192#issuecomment-1894711882

   committed, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16126: Kcontroller dynamic configurations may fail to apply at startup [kafka]

2024-01-16 Thread via GitHub


cmccabe closed pull request #15192: KAFKA-16126: Kcontroller dynamic 
configurations may fail to apply at startup
URL: https://github.com/apache/kafka/pull/15192


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]

2024-01-16 Thread via GitHub


pprovenzano commented on PR #15197:
URL: https://github.com/apache/kafka/pull/15197#issuecomment-1894699271

   Test failures are unrelated to the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-16 Thread via GitHub


dongnuo123 commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1454202465


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -524,37 +495,39 @@ public CompletableFuture 
listGroups(
 );
 }
 
-final CompletableFuture future = new 
CompletableFuture<>();
-final List results = new 
ArrayList<>();
 final Set existingPartitionSet = runtime.partitions();
-final AtomicInteger cnt = new 
AtomicInteger(existingPartitionSet.size());
 
 if (existingPartitionSet.isEmpty()) {
 return CompletableFuture.completedFuture(new 
ListGroupsResponseData());
 }
 
+final 
List>> futures =
+new ArrayList<>();
+
 for (TopicPartition tp : existingPartitionSet) {
-runtime.scheduleReadOperation(
+futures.add(runtime.scheduleReadOperation(
 "list-groups",
 tp,
 (coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
-).handle((groups, exception) -> {
-if (exception == null) {
-synchronized (results) {
-results.addAll(groups);
-}
+).exceptionally(exception -> {
+exception = Errors.maybeUnwrapException(exception);
+if (exception instanceof NotCoordinatorException) {
+return Collections.emptyList();
 } else {
-if (!(exception instanceof NotCoordinatorException)) {
-future.complete(new 
ListGroupsResponseData().setErrorCode(Errors.forException(exception).code()));
-}
-}
-if (cnt.decrementAndGet() == 0) {
-future.complete(new 
ListGroupsResponseData().setGroups(results));
+throw new CompletionException(exception);
 }
-return null;
-});
+}));
 }
-return future;
+
+return FutureUtils
+.combineFutures(futures, ArrayList::new, List::addAll)
+.thenApply(groups -> new 
ListGroupsResponseData().setGroups(groups))
+.exceptionally(exception -> handleOperationException(
+"ListGroups",
+request,
+exception,
+(error, __) -> new 
ListGroupsResponseData().setErrorCode(error.code())
+));

Review Comment:
   Yeah, it makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: populate TopicName in ConsumerGroupDescribe [kafka]

2024-01-16 Thread via GitHub


dongnuo123 opened a new pull request, #15205:
URL: https://github.com/apache/kafka/pull/15205

   The patch populates the topic name of 
`ConsumerGroupDescribeResponseData.TopicPartitions` with the corresponding 
topic id in `ConsumerGroupDescribe`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16152) Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart

2024-01-16 Thread Kirk True (Jira)
Kirk True created KAFKA-16152:
-

 Summary: Fix 
PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart
 Key: KAFKA-16152
 URL: https://issues.apache.org/jira/browse/KAFKA-16152
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, unit tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe

2024-01-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16151:
--
Summary: Fix 
PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe  (was: 
Fix PlaintextConsumerTest.testPerPartitionLedMetricsCleanUpWithSubscribe)

> Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
> -
>
> Key: KAFKA-16151
> URL: https://issues.apache.org/jira/browse/KAFKA-16151
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe

2024-01-16 Thread Kirk True (Jira)
Kirk True created KAFKA-16150:
-

 Summary: Fix 
PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
 Key: KAFKA-16150
 URL: https://issues.apache.org/jira/browse/KAFKA-16150
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, unit tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLedMetricsCleanUpWithSubscribe

2024-01-16 Thread Kirk True (Jira)
Kirk True created KAFKA-16151:
-

 Summary: Fix 
PlaintextConsumerTest.testPerPartitionLedMetricsCleanUpWithSubscribe
 Key: KAFKA-16151
 URL: https://issues.apache.org/jira/browse/KAFKA-16151
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, unit tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16149) Aggressively expire unused client connections

2024-01-16 Thread Kirk True (Jira)
Kirk True created KAFKA-16149:
-

 Summary: Aggressively expire unused client connections
 Key: KAFKA-16149
 URL: https://issues.apache.org/jira/browse/KAFKA-16149
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer, producer 
Reporter: Kirk True
Assignee: Kirk True


The goal is to minimize the number of connections from the client to the 
brokers.

On the Java client, there are potentially two types of network connections to 
brokers:
 # Connections for metadata requests
 # Connections for fetch, produce, etc. requests

The idea is to apply a much shorter idle time to client connections that have 
_only_ served metadata (type 1 above) so that they become candidates for 
expiration more quickly.

Alternatively (or additionally), a change to the way metadata requests are 
routed could be made to reduce the number of connections.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-16 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1454185664


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -141,17 +144,33 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Return topic partition metadata for the given topic, listener and index 
range. Also, return a boolean value to
+   * indicate whether there are more partitions with index equal or larger 
than the upper index.
+   *
+   * @param image   The metadata image
+   * @param topicName   The name of the topic.
+   * @param listenerNameThe listener name.
+   * @param startIndex  The smallest index of the partitions 
to be included in the result.
+   * @param upperIndex  The upper limit of the index of the 
partitions to be included in the result.
+   *Note that, the upper index can be 
larger than the largest partition index in
+   *this topic.
+   * @returnA collection of topic partition 
metadata and whether there are more partitions.
+   */
   private def getPartitionMetadataForDescribeTopicResponse(
 image: MetadataImage,
 topicName: String,
-listenerName: ListenerName
-  ): Option[List[DescribeTopicPartitionsResponsePartition]] = {
+listenerName: ListenerName,
+startIndex: Int,
+upperIndex: Int
+  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Boolean) = {
 Option(image.topics().getTopic(topicName)) match {
-  case None => None
+  case None => (None, false)
   case Some(topic) => {
-val partitions = Some(topic.partitions().entrySet().asScala.map { 
entry =>
-  val partitionId = entry.getKey
-  val partition = entry.getValue
+val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
+val endIndex = upperIndex.min(topic.partitions().size())
+for (partitionId <- startIndex until endIndex) {
+  val partition = topic.partitions().get(partitionId)

Review Comment:
   Actually it is not possible, the partition index starts with 0 and 
increments by 1.
   Then what is the case if the partition does not exist?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription

2024-01-16 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807487#comment-17807487
 ] 

Phuc Hong Tran commented on KAFKA-15538:


[~lianetm] thanks for the comments. Just to clarify though, isn't the section 
that you sent 
[here|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L537]
 supposed to be used for the SubscriptionPattern, not the 
java.util.regex.Pattern, as the list of topic to be sent which match the regex 
is already set 
[here|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L531]?

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-01-16 Thread via GitHub


Phuc-Hong-Tran commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1894635327

   Thanks @lianetm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16083) Exclude throttle time when expiring inflight requests on a connection

2024-01-16 Thread Adithya Chandra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adithya Chandra updated KAFKA-16083:

Affects Version/s: (was: 3.7.0)

> Exclude throttle time when expiring inflight requests on a connection
> -
>
> Key: KAFKA-16083
> URL: https://issues.apache.org/jira/browse/KAFKA-16083
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Adithya Chandra
>Priority: Critical
> Fix For: 3.8.0
>
>
> When expiring inflight requests, the network client does not take throttle 
> time into account. If a connection has multiple inflight requests (default of 
> 5) and each request is throttled then some of the requests can incorrectly 
> marked as expired. Subsequently the connection is closed and the client 
> establishes a new connection to the broker. This behavior leads to 
> unnecessary connections to the broker, leads to connection storms and 
> increases latencies. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16083) Exclude throttle time when expiring inflight requests on a connection

2024-01-16 Thread Adithya Chandra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adithya Chandra updated KAFKA-16083:

Affects Version/s: 3.7.0

> Exclude throttle time when expiring inflight requests on a connection
> -
>
> Key: KAFKA-16083
> URL: https://issues.apache.org/jira/browse/KAFKA-16083
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Adithya Chandra
>Priority: Critical
> Fix For: 3.8.0
>
>
> When expiring inflight requests, the network client does not take throttle 
> time into account. If a connection has multiple inflight requests (default of 
> 5) and each request is throttled then some of the requests can incorrectly 
> marked as expired. Subsequently the connection is closed and the client 
> establishes a new connection to the broker. This behavior leads to 
> unnecessary connections to the broker, leads to connection storms and 
> increases latencies. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16083) Exclude throttle time when expiring inflight requests on a connection

2024-01-16 Thread Adithya Chandra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adithya Chandra resolved KAFKA-16083.
-
Fix Version/s: 3.8.0
 Reviewer: Stanislav Kozlovski
   Resolution: Fixed

> Exclude throttle time when expiring inflight requests on a connection
> -
>
> Key: KAFKA-16083
> URL: https://issues.apache.org/jira/browse/KAFKA-16083
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Adithya Chandra
>Priority: Critical
> Fix For: 3.8.0
>
>
> When expiring inflight requests, the network client does not take throttle 
> time into account. If a connection has multiple inflight requests (default of 
> 5) and each request is throttled then some of the requests can incorrectly 
> marked as expired. Subsequently the connection is closed and the client 
> establishes a new connection to the broker. This behavior leads to 
> unnecessary connections to the broker, leads to connection storms and 
> increases latencies. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-01-16 Thread via GitHub


lianetm commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1894569927

   This is the task to closely follow 
https://issues.apache.org/jira/browse/KAFKA-14517, where the broker will 
support the new regex.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807434#comment-17807434
 ] 

Almog Gavra commented on KAFKA-16141:
-

I confirmed it’s most likely related to my change - changing it from:
final KeyValueBytesStoreSupplier persistentStoreSupplier = 
Stores.persistentKeyValueStore(persistentMemoryStoreName);
to
final KeyValueBytesStoreSupplier persistentStoreSupplier = 
Stores.persistentTimestampedKeyValueStore(persistentMemoryStoreName);
makes the test pass, so it’s almost certainly the same bug as in 
https://issues.apache.org/jira/browse/KAFKA-16046 

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16148) Implement GroupMetadataManager#onUnloaded

2024-01-16 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-16148:


 Summary: Implement GroupMetadataManager#onUnloaded
 Key: KAFKA-16148
 URL: https://issues.apache.org/jira/browse/KAFKA-16148
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim


complete all awaiting futures with NOT_COORDINATOR (for classic group)

transition all groups to DEAD.

Cancel all timers related to the unloaded group metadata manager



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-01-16 Thread via GitHub


Phuc-Hong-Tran commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1894475663

   @lianetm, thanks for the comments, I will make sure to address those points 
in my next PR.
   
   Regarding your point about passing the regex for HeartbeatRequestManager, I 
origninally included that in my code change, then I came across this PR 
https://github.com/apache/kafka/pull/14956 and decided that we need to wait for 
the broker to implement new regex logic first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-01-16 Thread via GitHub


lianetm commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1453968041


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:
##
@@ -86,7 +86,6 @@
 import static org.mockito.Mockito.when;
 
 public class OffsetsRequestManagerTest {
-

Review Comment:
   Nit: I find it's better to avoid changes in unrelated files, even if minor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-01-16 Thread via GitHub


lianetm commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1453966464


##
clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+public class SubscriptionPattern {

Review Comment:
   This class would need some doc explaining what it represents. Also not sure 
if this is the right place for it (given that this whole intention with the new 
regex is driven by the broker, but is not implemented yet). So at this point is 
not clear to me if we would prefer to define this on the broker side to be used 
there? Could be. As suggested in the Jira, maybe we should wait for the broker 
implementation of the new regex, and then align on this class? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-01-16 Thread via GitHub


lianetm commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1453958313


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -84,6 +85,9 @@ private enum SubscriptionType {
 /* the pattern user has requested */
 private Pattern subscribedPattern;
 
+/* we should rename this to something more specific */

Review Comment:
   Agree that it's confusing but can't think of a better naming. I would 
suggest though that we add a proper comment, stating that this represents the 
RE2J regex (vs the java regex represented by the `Pattern subscribedPattern` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-01-16 Thread via GitHub


lianetm commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1453953690


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -494,6 +495,16 @@ public void subscribe(Pattern pattern) {
 subscribeInternal(pattern, Optional.empty());
 }
 
+@Override
+public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {
+

Review Comment:
   Not stated in the KIP I believe, but I would say that we should log at least 
a warn indicating that this is not supported with the legacy protocol (similar 
to what the KIP states for logging a warn for the `enforceRebalance` that is 
not supported with the new protocol, and that we already do 
[here](https://github.com/apache/kafka/blob/055ff2b831193f5935f9efc2f7809f853f63de5f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1187))



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -494,6 +495,16 @@ public void subscribe(Pattern pattern) {
 subscribeInternal(pattern, Optional.empty());
 }
 
+@Override
+public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {
+
+}
+
+@Override
+public void subscribe(SubscriptionPattern pattern) {
+

Review Comment:
   same as above comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-01-16 Thread via GitHub


lianetm commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1453945617


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -753,6 +753,10 @@ public void subscribe(Pattern pattern, 
ConsumerRebalanceListener listener) {
 public void subscribe(Pattern pattern) {
 delegate.subscribe(pattern);
 }
+@Override
+public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {}

Review Comment:
   I expect we should be calling the delegate.subscribe(SubscriptionPattern..) 
here, otherwise the actual implementation in the AsyncKafkaConsumer won't be 
called. Similar to the subscribe(Pattern..) 
   (This KafkaConsumer is the user-facing api, that ends up calling the Legacy 
or Async consumer via the delegate)



##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -753,6 +753,10 @@ public void subscribe(Pattern pattern, 
ConsumerRebalanceListener listener) {
 public void subscribe(Pattern pattern) {
 delegate.subscribe(pattern);
 }
+@Override
+public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {}
+@Override
+public void subscribe(SubscriptionPattern pattern) {}

Review Comment:
   Same as above comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15538) Client support for java regex based subscription

2024-01-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807323#comment-17807323
 ] 

Lianet Magrans edited comment on KAFKA-15538 at 1/16/24 7:45 PM:
-

Hey [~phuctran], this ticket is not fully implemented yet. There are some bits 
of it at the consumer level already, that [~kirktrue] worked on just as an 
initial approach, but that needs to be reviewed and make sure nothing else need 
to be wired up to the HeartbeatRequestManager, to ensure that the list of 
topics matching the regex are sent to the broker (see 
[here|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L537]).
  We should also ensure that we re-enable all the pattern subscription related 
integration tests, that are currently disabled in the PlainTextAsyncConsumer 
(ex. 
[testPatternSubscription|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L356C7-L356C30]).
 I will re-open the issue.

Regarding documentation, for the legacy consumer all we have is the java doc 
(afaik). For the new consumer, we do have a one-pager on the AK wiki 
[here|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design].
 Hope it helps! 


was (Author: JIRAUSER300183):
Hey [~phuctran], this ticket is not fully implemented yet. There are some bits 
of it at the consumer level already, that [~kirktrue] worked on just as an 
initial approach, but that needs to be reviewed and wired up to the 
HeartbeatRequestManager, to ensure that the list of topics matching the regex 
are sent to the broker (see 
[here|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L537]).
  We should also ensure that we re-enable all the pattern subscription related 
integration tests, that are currently disabled in the PlainTextAsyncConsumer 
(ex. 
[testPatternSubscription|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L356C7-L356C30]).
 I will re-open the issue.

Regarding documentation, for the legacy consumer all we have is the java doc 
(afaik). For the new consumer, we do have a one-pager on the AK wiki 
[here|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design].
 Hope it helps! 

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-16 Thread via GitHub


philipnee commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1453879095


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() {
 assertPoll(1, commitRequestManger);
 }
 
+@Test

Review Comment:
   similar to the comment above - we autocommit on close - should the 
interceptor be triggered?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807411#comment-17807411
 ] 

Matthias J. Sax commented on KAFKA-16141:
-

Assigned to [~agavra] and marked as blocker. Might be a regression introduced 
via KIP-954.

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-01-16 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807412#comment-17807412
 ] 

Phuc Hong Tran commented on KAFKA-15561:


[~lianetm], PTAL if you have time. Thanks

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16141:

Priority: Blocker  (was: Major)

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-16 Thread via GitHub


OmniaGM commented on PR #15158:
URL: https://github.com/apache/kafka/pull/15158#issuecomment-1894343924

   > @OmniaGM It looks like there are a [few build 
failures](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15158/6/pipeline/10):
   > 
   > ```
   > [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15158/core/src/main/scala/kafka/server/DynamicConfig.scala:26:58:
 Unused import
   > [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15158/core/src/main/scala/kafka/server/KafkaConfig.scala:47:50:
 Unused import
   > ```
   
   fixed this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-16 Thread via GitHub


OmniaGM commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1453877298


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {
+// Log-level config default values
+public static final int DEFAULT_NUM_PARTITIONS = 50;
+public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024;
+public static final short DEFAULT_REPLICATION_FACTOR = 3;
+public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2;
+public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
+}

Review Comment:
   Done, and did the same to other files I created as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-16 Thread via GitHub


philipnee commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1453877053


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -844,6 +849,54 @@ public void testWakeupCommitted() {
 assertNull(consumer.wakeupTrigger().getPendingTask());
 }
 
+@Test

Review Comment:
   what's the interceptor behavior on close? if we have inflight commits before 
closing the consumer, should the interceptors be invoked? can we add tests 
around that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER

2024-01-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15878:
--
Fix Version/s: 3.8.0

> KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER
> 
>
> Key: KAFKA-15878
> URL: https://issues.apache.org/jira/browse/KAFKA-15878
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Anuj Sharma
>Priority: Major
>  Labels: oauth
> Fix For: 3.8.0
>
>
> {code:java}
> // code placeholder
> {code}
> h1. Overview
>  * This issue pertains to 
> [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
>  mechanism of Kafka authentication. 
>  * Kafka clients can use [SASL/OAUTHBEARER  
> |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism 
> by overriding the [custom call back 
> handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod]
>  . 
>  * 
> [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
>  available from v3.1  further extends the mechanism with a production grade 
> implementation. 
>  * Kafka's 
> [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
>   mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is 
> because of a more restrictive set of characters than what 
> [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] 
> recommends. 
>  * This JIRA can be considered an extension of 
> [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
>  to support the opaque tokens as well apart from the JWT tokens.
>  
> In summary the following character set should be supported as per the RFC - 
> {code:java}
> 1*( ALPHA / DIGIT /
>"-" / "." / "_" / "~" / "+" / "/" ) *"="
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-16141:
---

Assignee: Almog Gavra

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Major
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER

2024-01-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15878:
--
Labels: oauth  (was: )

> KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER
> 
>
> Key: KAFKA-15878
> URL: https://issues.apache.org/jira/browse/KAFKA-15878
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Anuj Sharma
>Priority: Major
>  Labels: oauth
>
> {code:java}
> // code placeholder
> {code}
> h1. Overview
>  * This issue pertains to 
> [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
>  mechanism of Kafka authentication. 
>  * Kafka clients can use [SASL/OAUTHBEARER  
> |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism 
> by overriding the [custom call back 
> handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod]
>  . 
>  * 
> [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
>  available from v3.1  further extends the mechanism with a production grade 
> implementation. 
>  * Kafka's 
> [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
>   mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is 
> because of a more restrictive set of characters than what 
> [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] 
> recommends. 
>  * This JIRA can be considered an extension of 
> [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
>  to support the opaque tokens as well apart from the JWT tokens.
>  
> In summary the following character set should be supported as per the RFC - 
> {code:java}
> 1*( ALPHA / DIGIT /
>"-" / "." / "_" / "~" / "+" / "/" ) *"="
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15878: KIP-768 - Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER [kafka]

2024-01-16 Thread via GitHub


kirktrue commented on PR #14818:
URL: https://github.com/apache/kafka/pull/14818#issuecomment-1894339862

   @jcme—I wrote and implemented KIP-768, so I'll take a look at this.
   
   Also, are you able to assign the Jira to yourself? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-16 Thread via GitHub


philipnee commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1453871780


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Utility class that helps the application thread to invoke user registered 
{@link OffsetCommitCallback} amd
+ * {@link org.apache.kafka.clients.consumer.ConsumerInterceptor}s. This is
+ * achieved by having the background thread register a {@link 
OffsetCommitCallbackTask} to the invoker upon the
+ * future completion, and execute the callbacks when user polls/commits/closes 
the consumer.
+ */
+public class OffsetCommitCallbackInvoker {
+private final ConsumerInterceptors interceptors;
+
+OffsetCommitCallbackInvoker(ConsumerInterceptors interceptors) {
+this.interceptors = interceptors;
+}
+
+// Thread-safe queue to store user-defined callbacks and interceptors to 
be executed
+private final BlockingQueue callbackQueue = new 
LinkedBlockingQueue<>();
+
+public void submitCommitInterceptors(final Map offsets) {
+if (!interceptors.isEmpty()) {
+callbackQueue.add(new OffsetCommitCallbackTask(
+(innerOffsets, exception) -> 
interceptors.onCommit(innerOffsets),
+offsets,
+null
+));
+}
+}
+
+public void submitUserCallback(final OffsetCommitCallback callback,
+   final Map offsets,
+   final Exception exception) {
+callbackQueue.add(new OffsetCommitCallbackTask(callback, offsets, 
exception));
+}
+
+/**
+ * @return true if an offset commit was fenced.
+ */
+public boolean executeCallbacks() {

Review Comment:
   see comment above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-16 Thread via GitHub


philipnee commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1453870149


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -93,9 +94,11 @@ public CommitRequestManager(
 final SubscriptionState subscriptions,
 final ConsumerConfig config,
 final CoordinatorRequestManager coordinatorRequestManager,
+final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
 final String groupId,
 final Optional groupInstanceId) {
-this(time, logContext, subscriptions, config, 
coordinatorRequestManager, groupId,
+this(time, logContext, subscriptions, config, 
coordinatorRequestManager,

Review Comment:
   i wonder if it would be more aesthetic to split each of them into its own 
line now it is spanning 3 lines with different widths.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-16 Thread via GitHub


philipnee commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1453868310


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -93,9 +94,11 @@ public CommitRequestManager(
 final SubscriptionState subscriptions,
 final ConsumerConfig config,
 final CoordinatorRequestManager coordinatorRequestManager,
+final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,

Review Comment:
   looking back at the implementation - now i think it is rather unnecessary to 
have these finals in the parameters.  I wonder if you think we should clean 
them up in the future...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-16 Thread via GitHub


philipnee commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1453865322


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1902,65 +1912,14 @@ private void maybeThrowFencedInstanceException() {
 }
 
 private void maybeInvokeCommitCallbacks() {
-if (callbacks() > 0) {
-invoker.executeCallbacks();
+if (offsetCommitCallbackInvoker.executeCallbacks()) {

Review Comment:
   the naming appears a bit misleading for me - if executeCallback() return 
true, it almost means the callbacks were executed correctly.  I wonder if we 
could restructure the code like
   
   ```
   invoker.executeCallbacks()
   isFenced = invoker.hasFencedException()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-16 Thread via GitHub


mumrah commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1453861304


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  private def getPartitionMetadataForDescribeTopicResponse(
+image: MetadataImage,
+topicName: String,
+listenerName: ListenerName
+  ): Option[List[DescribeTopicPartitionsResponsePartition]] = {
+Option(image.topics().getTopic(topicName)) match {
+  case None => None
+  case Some(topic) => {
+val partitions = Some(topic.partitions().entrySet().asScala.map { 
entry =>
+  val partitionId = entry.getKey
+  val partition = entry.getValue
+  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+listenerName, false)
+  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
+false)
+  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+  val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+  maybeLeader match {
+case None =>
+  val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {

Review Comment:
   This won't cause an error for the whole request right? It will just populate 
the partition-level `ErrorCode` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move ClientQuotaManagerConfig outside of core [kafka]

2024-01-16 Thread via GitHub


OmniaGM commented on code in PR #15159:
URL: https://github.com/apache/kafka/pull/15159#discussion_r1453859341


##
checkstyle/import-control-core.xml:
##
@@ -82,6 +82,7 @@
 
 
 
+

Review Comment:
   Remove it 



##
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##
@@ -32,6 +32,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Sanitizer, Time}
 import org.apache.kafka.server.config.ConfigEntityName
 import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, 
ClientQuotaType}
+import org.apache.kafka.server.config.ClientQuotaManagerConfig

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move ClientQuotaManagerConfig outside of core [kafka]

2024-01-16 Thread via GitHub


OmniaGM commented on code in PR #15159:
URL: https://github.com/apache/kafka/pull/15159#discussion_r1453859778


##
server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+public class ClientQuotaManagerConfig {
+// Always have 10 whole windows + 1 current window
+public static final int DEFAULT_NUM_QUOTA_SAMPLES = 11;
+public static final int DEFAULT_QUOTA_WINDOW_SIZE_SECONDS = 1;
+
+public final int numQuotaSamples;
+public final int quotaWindowSizeSeconds;
+
+/**
+ * Configuration settings for quota management
+ *
+ * @param numQuotaSamples The number of samples to retain in memory
+ * @param quotaWindowSizeSeconds  The time span of each sample
+ */
+public ClientQuotaManagerConfig(int numQuotaSamples, int 
quotaWindowSizeSeconds) {
+this.numQuotaSamples = numQuotaSamples;
+this.quotaWindowSizeSeconds = quotaWindowSizeSeconds;
+}
+
+public ClientQuotaManagerConfig() {
+this(DEFAULT_NUM_QUOTA_SAMPLES, DEFAULT_QUOTA_WINDOW_SIZE_SECONDS);
+}
+
+public ClientQuotaManagerConfig(int numQuotaSamples) {
+this(numQuotaSamples, DEFAULT_QUOTA_WINDOW_SIZE_SECONDS);
+}
+}

Review Comment:
   Added a line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-16 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1453854211


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = null;

Review Comment:
   @mimaison WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-16 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1453851791


##
clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java:
##
@@ -44,8 +44,15 @@ public class DirectoryConfigProvider implements 
ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(DirectoryConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = new AllowedPaths(null);

Review Comment:
   This class is required to be thread-safe in the ConfigProvider javadoc.
   
   ```suggestion
   private volatile AllowedPaths allowedPaths = new AllowedPaths(null);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-16 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1453850678


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = null;

Review Comment:
   > Could there be users who don't call configure first necessarily? I'm 
worried that throwing IllegalStateException could cause backward compatibility 
issue.
   
   Throwing IllegalStateException would be a backwards incompatible change, but 
I think it's not one that was supported in the first place. Not calling 
configure() on these particular implementations because they previously 
happened to be no-ops is coupling too closely on the internal implementation of 
these classes.
   
   It is a value judgement, and we have to determine which is more valuable. 
For the record, I think that exploiting the interface by preventing calls to 
configure() is unlikely, I was just thinking about defense in depth.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16147) Partition is assigned to two members at the same time

2024-01-16 Thread Emanuele Sabellico (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807403#comment-17807403
 ] 

Emanuele Sabellico commented on KAFKA-16147:


This is the events sequence: SUBSCRIPTION_1 is T1 and SUBSCRIPTION_2 is T1,T2
{code:java}
  /*
   * Define playbook
   */
  const struct {
    int timestamp_ms;
    int consumer;
    const vector *topics;
  } playbook[] = {/* timestamp_ms, consumer_number, subscribe-to-topics */
                  {0, 0, &SUBSCRIPTION_1}, /* Cmd 0 */
                  {4000, 1, &SUBSCRIPTION_1},  {4000, 1, &SUBSCRIPTION_1},
                  {4000, 1, &SUBSCRIPTION_1},  {4000, 2, &SUBSCRIPTION_1},
                  {6000, 3, &SUBSCRIPTION_1}, /* Cmd 5 */
                  {6000, 4, &SUBSCRIPTION_1},  {6000, 5, &SUBSCRIPTION_1},
                  {6000, 6, &SUBSCRIPTION_1},  {6000, 7, &SUBSCRIPTION_2},
                  {6000, 1, &SUBSCRIPTION_1}, /* Cmd 10 */
                  {6000, 1, &SUBSCRIPTION_2},  {6000, 1, &SUBSCRIPTION_1},
                  {6000, 2, &SUBSCRIPTION_2},  {7000, 2, &SUBSCRIPTION_1},
                  {7000, 1, &SUBSCRIPTION_2}, /* Cmd 15 */
                  {8000, 0, &SUBSCRIPTION_2},  {8000, 1, &SUBSCRIPTION_1},
                  {8000, 0, &SUBSCRIPTION_1},  {13000, 2, &SUBSCRIPTION_1},
                  {13000, 1, &SUBSCRIPTION_2}, /* Cmd 20 */
                  {13000, 5, &SUBSCRIPTION_2}, {14000, 6, &SUBSCRIPTION_2},
                  {15000, 7, &SUBSCRIPTION_1}, {15000, 1, &SUBSCRIPTION_1},
                  {15000, 5, &SUBSCRIPTION_1}, /* Cmd 25 */
                  {15000, 6, &SUBSCRIPTION_1}, {INT_MAX, 0, 0}};{code}
 

 

> Partition is assigned to two members at the same time
> -
>
> Key: KAFKA-16147
> URL: https://issues.apache.org/jira/browse/KAFKA-16147
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Emanuele Sabellico
>Priority: Major
> Attachments: broker1.log, broker2.log, broker3.log, librdkafka.log, 
> server.properties, server1.properties, server2.properties
>
>
> While running [test 0113 of 
> librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384],
>  subtest _u_multiple_subscription_changes_ have received this error saying 
> that a partition is assigned to two members at the same time.
> {code:java}
> Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] 
> which is already assigned to consumer C_5#consumer-8 {code}
> I've reconstructed this sequence:
> C_5 SUBSCRIBES TO T1
> {noformat}
> %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id 
> "(null)", current assignment "", subscribe topics 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat}
> C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12
> {noformat}
> [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, 
> targetMemberEpoch=6, state=assigning, assignedPartitions={}, 
> partitionsPendingRevocation={}, 
> partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to 
> CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}). 
> (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat}
>  
> C_5 RECEIVES TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
>  
> C_5 ACKS TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", 
> subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+

Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-16 Thread via GitHub


philipnee commented on code in PR #15186:
URL: https://github.com/apache/kafka/pull/15186#discussion_r1453840738


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -376,25 +376,21 @@ protected Map 
prepareCloseFetchSessi
 final Cluster cluster = metadata.fetch();
 Map fetchable = new HashMap<>();
 
-try {
-sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
-// set the session handler to notify close. This will set the 
next metadata request to send close message.
-sessionHandler.notifyClose();
+sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {

Review Comment:
   Thanks Kirk, for the explanation - It seems like there are cases where we 
want to clear the cache - one I can think of is when there's a topology change 
but this is probably an unnoticeable optimization - i guess the size of the 
handler lookup never grows so large that becomes a problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16147) Partition is assigned to two members at the same time

2024-01-16 Thread Emanuele Sabellico (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emanuele Sabellico updated KAFKA-16147:
---
Description: 
While running [test 0113 of 
librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384],
 subtest _u_multiple_subscription_changes_ have received this error saying that 
a partition is assigned to two members at the same time.
{code:java}
Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] 
which is already assigned to consumer C_5#consumer-8 {code}
I've reconstructed this sequence:

C_5 SUBSCRIBES TO T1
{noformat}
%7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
"rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id 
"(null)", current assignment "", subscribe topics 
"rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat}
C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12
{noformat}
[2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 topic=__consumer_offsets 
partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member 
RaTCu6RXQH-FiSl95iZzdw transitioned from CurrentAssignment(memberEpoch=6, 
previousMemberEpoch=0, targetMemberEpoch=6, state=assigning, 
assignedPartitions={}, partitionsPendingRevocation={}, 
partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to 
CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=14, 
state=stable, assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
partitionsPendingRevocation={}, partitionsPendingAssignment={}). 
(org.apache.kafka.coordinator.group.GroupMetadataManager){noformat}
 

C_5 RECEIVES TARGET ASSIGNMENT
{noformat}
%7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat response received target assignment 
"(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
(null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
 

C_5 ACKS TARGET ASSIGNMENT
{noformat}
%7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
"rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
"NULL", current assignment 
"rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], 
rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], 
rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", subscribe 
topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
%7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat response received target assignment 
"(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
(null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
 

C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are pending 
{noformat}
%7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
"rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
"NULL", current assignment "NULL", subscribe topics 
"rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], 
rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
[2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 topic=__consumer_offsets 
partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member 
RaTCu6RXQH-FiSl95iZzdw updated its subscribed topics to: 
[rdkafkatest_rnd550f20623daba04c_0113u_2, 
rdkafkatest_rnd5a91902462d61c2e_0113u_1]. 
(org.apache.kafka.coordinator.group.GroupMetadataManager)
[2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 topic=__consumer_offsets 
partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member 
RaTCu6RXQH-FiSl95iZzdw transitioned from CurrentAssignment(memberEpoch=14, 
previousMemberEpoch=6, targetMemberEpoch=14, state=stable, 
assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
partitionsPendingRevocation={}, partitionsPendingAssignment={}) to 
CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=16, 
state=revoking, assignedPartitions={}, 
partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
partitionsPendingAssignment={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 8, 9]}). 
(org.apache.kafka.coordinator.group.GroupMetadataManager)
%7|1705403452.618|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat response received target assignment ""{noformat}
C_5 FINISHES REVOCATION
{noformat}
%7|1705403452.618|CGRPJOINSTATE|C_5#consumer-8| [thrd:main]: Group 
"rdkafkatest_rnd53b4eb0c2de343_0113u" changed join state wait-assign-call -> 
steady (state up){noformat}
C_5 ACKS REVOCATION, RECEIVES T2-P0,T2-P1,T2-P2

 
{noformat}
%7|1705403452.618|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat of member id "RaTCu6RXQH-FiSl95iZ

[jira] [Commented] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException

2024-01-16 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807397#comment-17807397
 ] 

Kamal Chandraprakash commented on KAFKA-16105:
--

[~anatolypopov] 

Could you write an integration test to simulate the error scenario? You can 
refer to some of the existing 
[tests|https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java].
 Thanks!

> Reassignment of tiered topics is failing due to RemoteStorageException
> --
>
> Key: KAFKA-16105
> URL: https://issues.apache.org/jira/browse/KAFKA-16105
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Reporter: Anatolii Popov
>Priority: Critical
>
> When partition reassignment is happening for a tiered topic in most of the 
> cases it's stuck with RemoteStorageException's on follower nodes saying that 
> it can not construct remote log auxilary state:
>  
> {code:java}
> [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, 
> fetcherId=2] Error building remote log auxiliary state for test-24 
> (kafka.server.ReplicaFetcherThread)
>                                          
> org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't 
> build the state from remote store for partition: test-24, currentLeaderEpoch: 
> 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the 
> previous remote log segment metadata was not found
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259)
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106)
>                                                  at 
> kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
>                                                  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
>                                                  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
>                                                  at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>                                                  at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
>  {code}
>  
> Scenario:
> A cluster of 3 nodes with a single topic with 30 partitions. All partitions 
> have tiered segments.
> Adding 3 more nodes to the cluster and making a reassignment to move all the 
> data to new nodes.
> Behavior:
> For most of the partitions reassignment is happening smoothly.
> For some of the partitions when a new node starts to get assignments it reads 
> __remote_log_metadata topic and tries to initialize the metadata cache on 
> records with COPY

[jira] [Comment Edited] (KAFKA-16095) Update list group state type filter to include the states for the new consumer group type

2024-01-16 Thread Ritika Reddy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807394#comment-17807394
 ] 

Ritika Reddy edited comment on KAFKA-16095 at 1/16/24 6:29 PM:
---

Yep that would be merged this week, but we could still work on the state filter 
changes in parallel


was (Author: JIRAUSER300287):
Yep that would be merged this week

> Update list group state type filter to include the states for the new 
> consumer group type
> -
>
> Key: KAFKA-16095
> URL: https://issues.apache.org/jira/browse/KAFKA-16095
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ritika Reddy
>Assignee: Lan Ding
>Priority: Minor
>
> # While using *—list —state* the current accepted values correspond to the 
> classic group type states. We need to include support for the new group type 
> states.
>  ## Consumer Group: Should list the state of the group. Accepted Values: 
>  ### _UNKNOWN(“unknown”)_
>  ### {_}EMPTY{_}("empty"),
>  ### *{_}ASSIGNING{_}("assigning"),*
>  ### *{_}RECONCILING{_}("reconciling"),*
>  ### {_}STABLE{_}("stable"),
>  ### {_}DEAD{_}("dead");
>  # 
>  ## Classic Group : Should list the state of the group. Accepted Values: 
>  ### {_}UNKNOWN{_}("Unknown"),
>  ### {_}EMPTY{_}("Empty");
>  ### *{_}PREPARING_REBALANCE{_}("PreparingRebalance"),*
>  ### *{_}COMPLETING_REBALANCE{_}("CompletingRebalance"),*
>  ### {_}STABLE{_}("Stable"),
>  ### {_}DEAD{_}("Dead")



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16147) Partition is assigned to two members at the same time

2024-01-16 Thread Emanuele Sabellico (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emanuele Sabellico updated KAFKA-16147:
---
Attachment: librdkafka.log

> Partition is assigned to two members at the same time
> -
>
> Key: KAFKA-16147
> URL: https://issues.apache.org/jira/browse/KAFKA-16147
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Emanuele Sabellico
>Priority: Major
> Attachments: broker1.log, broker2.log, broker3.log, librdkafka.log, 
> server.properties, server1.properties, server2.properties
>
>
> While running [test 0113 of 
> librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384],
>  subtest _u_multiple_subscription_changes_ have received this error saying 
> that a partition is assigned to two members at the same time.
> {code:java}
> Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] 
> which is already assigned to consumer C_5#consumer-8 {code}
> I've reconstructed this sequence:
> C_5 SUBSCRIBES TO T1
> {noformat}
> %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id 
> "(null)", current assignment "", subscribe topics 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat}
> C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12
> {noformat}
> [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, 
> targetMemberEpoch=6, state=assigning, assignedPartitions={}, 
> partitionsPendingRevocation={}, 
> partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to 
> CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}). 
> (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat}
>  
> C_5 RECEIVES TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
>  
> C_5 ACKS TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", 
> subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
>  
> C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are 
> pending 
> {noformat}
> %7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment "NULL", subscribe topics 
> "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> [2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated 
> its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1]. 
> (org.apache.kafka.coordinator.group.GroupMetadataManager)
> [2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}) to 
> CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=16, state=revoking, assignedPartitions={}, 
> partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 1

[jira] [Updated] (KAFKA-16147) Partition is assigned to two members at the same time

2024-01-16 Thread Emanuele Sabellico (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emanuele Sabellico updated KAFKA-16147:
---
Attachment: (was: double-assignment.log)

> Partition is assigned to two members at the same time
> -
>
> Key: KAFKA-16147
> URL: https://issues.apache.org/jira/browse/KAFKA-16147
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Emanuele Sabellico
>Priority: Major
> Attachments: broker1.log, broker2.log, broker3.log, 
> server.properties, server1.properties, server2.properties
>
>
> While running [test 0113 of 
> librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384],
>  subtest _u_multiple_subscription_changes_ have received this error saying 
> that a partition is assigned to two members at the same time.
> {code:java}
> Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] 
> which is already assigned to consumer C_5#consumer-8 {code}
> I've reconstructed this sequence:
> C_5 SUBSCRIBES TO T1
> {noformat}
> %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id 
> "(null)", current assignment "", subscribe topics 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat}
> C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12
> {noformat}
> [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, 
> targetMemberEpoch=6, state=assigning, assignedPartitions={}, 
> partitionsPendingRevocation={}, 
> partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to 
> CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}). 
> (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat}
>  
> C_5 RECEIVES TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
>  
> C_5 ACKS TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", 
> subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
>  
> C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are 
> pending 
> {noformat}
> %7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment "NULL", subscribe topics 
> "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> [2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated 
> its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1]. 
> (org.apache.kafka.coordinator.group.GroupMetadataManager)
> [2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}) to 
> CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=16, state=revoking, assignedPartitions={}, 
> partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8,

[jira] [Commented] (KAFKA-16095) Update list group state type filter to include the states for the new consumer group type

2024-01-16 Thread Ritika Reddy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807394#comment-17807394
 ] 

Ritika Reddy commented on KAFKA-16095:
--

Yep that would be merged this week

> Update list group state type filter to include the states for the new 
> consumer group type
> -
>
> Key: KAFKA-16095
> URL: https://issues.apache.org/jira/browse/KAFKA-16095
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ritika Reddy
>Assignee: Lan Ding
>Priority: Minor
>
> # While using *—list —state* the current accepted values correspond to the 
> classic group type states. We need to include support for the new group type 
> states.
>  ## Consumer Group: Should list the state of the group. Accepted Values: 
>  ### _UNKNOWN(“unknown”)_
>  ### {_}EMPTY{_}("empty"),
>  ### *{_}ASSIGNING{_}("assigning"),*
>  ### *{_}RECONCILING{_}("reconciling"),*
>  ### {_}STABLE{_}("stable"),
>  ### {_}DEAD{_}("dead");
>  # 
>  ## Classic Group : Should list the state of the group. Accepted Values: 
>  ### {_}UNKNOWN{_}("Unknown"),
>  ### {_}EMPTY{_}("Empty");
>  ### *{_}PREPARING_REBALANCE{_}("PreparingRebalance"),*
>  ### *{_}COMPLETING_REBALANCE{_}("CompletingRebalance"),*
>  ### {_}STABLE{_}("Stable"),
>  ### {_}DEAD{_}("Dead")



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16147) Partition is assigned to two members at the same time

2024-01-16 Thread Emanuele Sabellico (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emanuele Sabellico updated KAFKA-16147:
---
Attachment: server.properties
server1.properties
server2.properties

> Partition is assigned to two members at the same time
> -
>
> Key: KAFKA-16147
> URL: https://issues.apache.org/jira/browse/KAFKA-16147
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Emanuele Sabellico
>Priority: Major
> Attachments: broker1.log, broker2.log, broker3.log, 
> double-assignment.log, server.properties, server1.properties, 
> server2.properties
>
>
> While running [test 0113 of 
> librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384],
>  subtest _u_multiple_subscription_changes_ have received this error saying 
> that a partition is assigned to two members at the same time.
> {code:java}
> Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] 
> which is already assigned to consumer C_5#consumer-8 {code}
> I've reconstructed this sequence:
> C_5 SUBSCRIBES TO T1
> {noformat}
> %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id 
> "(null)", current assignment "", subscribe topics 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat}
> C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12
> {noformat}
> [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, 
> targetMemberEpoch=6, state=assigning, assignedPartitions={}, 
> partitionsPendingRevocation={}, 
> partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to 
> CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}). 
> (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat}
>  
> C_5 RECEIVES TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
>  
> C_5 ACKS TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", 
> subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
>  
> C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are 
> pending 
> {noformat}
> %7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment "NULL", subscribe topics 
> "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> [2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated 
> its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1]. 
> (org.apache.kafka.coordinator.group.GroupMetadataManager)
> [2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}) to 
> CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=16, state=revoking, ass

[jira] [Updated] (KAFKA-16147) Partition is assigned to two members at the same time

2024-01-16 Thread Emanuele Sabellico (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emanuele Sabellico updated KAFKA-16147:
---
Description: 
While running test[ 0113 of 
librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384],
 subtest _u_multiple_subscription_changes_ have received this error saying that 
a partition is assigned to two members at the same time.
{code:java}
Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] 
which is already assigned to consumer C_5#consumer-8 {code}
I've reconstructed this sequence:

C_5 SUBSCRIBES TO T1
{noformat}
%7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
"rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id 
"(null)", current assignment "", subscribe topics 
"rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat}
C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12
{noformat}
[2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 topic=__consumer_offsets 
partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member 
RaTCu6RXQH-FiSl95iZzdw transitioned from CurrentAssignment(memberEpoch=6, 
previousMemberEpoch=0, targetMemberEpoch=6, state=assigning, 
assignedPartitions={}, partitionsPendingRevocation={}, 
partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to 
CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=14, 
state=stable, assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
partitionsPendingRevocation={}, partitionsPendingAssignment={}). 
(org.apache.kafka.coordinator.group.GroupMetadataManager){noformat}
 

C_5 RECEIVES TARGET ASSIGNMENT
{noformat}
%7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat response received target assignment 
"(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
(null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
 

C_5 ACKS TARGET ASSIGNMENT
{noformat}
%7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
"rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
"NULL", current assignment 
"rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], 
rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], 
rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", subscribe 
topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
%7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat response received target assignment 
"(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
(null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
 

C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are pending 
{noformat}
%7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
"rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
"NULL", current assignment "NULL", subscribe topics 
"rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], 
rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
[2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 topic=__consumer_offsets 
partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member 
RaTCu6RXQH-FiSl95iZzdw updated its subscribed topics to: 
[rdkafkatest_rnd550f20623daba04c_0113u_2, 
rdkafkatest_rnd5a91902462d61c2e_0113u_1]. 
(org.apache.kafka.coordinator.group.GroupMetadataManager)
[2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 topic=__consumer_offsets 
partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member 
RaTCu6RXQH-FiSl95iZzdw transitioned from CurrentAssignment(memberEpoch=14, 
previousMemberEpoch=6, targetMemberEpoch=14, state=stable, 
assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
partitionsPendingRevocation={}, partitionsPendingAssignment={}) to 
CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=16, 
state=revoking, assignedPartitions={}, 
partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
partitionsPendingAssignment={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 8, 9]}). 
(org.apache.kafka.coordinator.group.GroupMetadataManager)
%7|1705403452.618|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat response received target assignment ""{noformat}
C_5 FINISHES REVOCATION
{noformat}
%7|1705403452.618|CGRPJOINSTATE|C_5#consumer-8| [thrd:main]: Group 
"rdkafkatest_rnd53b4eb0c2de343_0113u" changed join state wait-assign-call -> 
steady (state up){noformat}
C_5 ACKS REVOCATION, RECEIVES T2-P0,T2-P1,T2-P2

 
{noformat}
%7|1705403452.618|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat of member id "RaTCu6RXQH-FiSl95iZ

  1   2   >