[jira] [Updated] (KAFKA-16222) KRaft Migration: Incorrect default user-principal quota after migration

2024-03-18 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16222:
--
Fix Version/s: 3.6.2
   3.8.0
   3.7.1

> KRaft Migration: Incorrect default user-principal quota after migration
> ---
>
> Key: KAFKA-16222
> URL: https://issues.apache.org/jira/browse/KAFKA-16222
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, migration
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Dominik
>Assignee: PoAn Yang
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> We observed that our default user quota seems not to be migrated correctly.
> Before Migration:
> bin/kafka-configs.sh --describe --all --entity-type users
> Quota configs for the *default user-principal* are 
> consumer_byte_rate=100.0, producer_byte_rate=100.0
> Quota configs for user-principal {color:#172b4d}'myuser{*}@{*}prod'{color} 
> are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8
> After Migration:
> bin/kafka-configs.sh --describe --all --entity-type users
> Quota configs for *user-principal ''* are consumer_byte_rate=100.0, 
> producer_byte_rate=100.0
> Quota configs for user-principal {color:#172b4d}'myuser{*}%40{*}prod'{color} 
> are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8
>  
> Additional finding: Our names contains a "@" which also lead to incorrect 
> after migration state.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16073) Kafka Tiered Storage: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-03-18 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16073:
--
Fix Version/s: 3.7.1

> Kafka Tiered Storage: Consumer Fetch Error Due to Delayed localLogStartOffset 
> Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]

2024-03-18 Thread via GitHub


omkreddy commented on PR #15481:
URL: https://github.com/apache/kafka/pull/15481#issuecomment-2005776638

   I am including this to 3.6.2 release plan


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-13361) Support fine-grained compression options

2024-03-18 Thread Cheng-Kai, Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828150#comment-17828150
 ] 

Cheng-Kai, Zhang edited comment on KAFKA-13361 at 3/19/24 4:05 AM:
---

Hi [~dongjin]  [~mimaison]

I am interested to this issue, and it seems to be manageable for a newbie like 
me.  Do you think I could help on this one?
My plan is to follow current structure [~mimaison]  currently working on to add 
those config. There is a draft PR in a very early stage available 
[here|https://github.com/apache/kafka/pull/1].


was (Author: JIRAUSER304479):
Hi [~dongjin]  [~mimaison]

I am interested to this issue, and it seems to be manageable for a newbie like 
me.  Do you think I could help on this one?
My plan is to follow current structure [~mimaison]  currently working on to add 
those config. There is a draft PR in a very early stage available here.

> Support fine-grained compression options
> 
>
> Key: KAFKA-13361
> URL: https://issues.apache.org/jira/browse/KAFKA-13361
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>  Labels: needs-kip
>
> Adds the following options into the Producer, Broker, and Topic 
> configurations:
>  * compression.gzip.buffer: the buffer size that feeds raw input into the 
> Deflator or is fed by the uncompressed output from the Deflator. (available: 
> [512, ), default: 8192(=8kb).)
>  * compression.snappy.block: the block size that snappy uses. (available: 
> [1024, ), default: 32768(=32kb).)
>  * compression.lz4.block: the block size that lz4 uses. (available: [4, 7], 
> (means 64kb, 256kb, 1mb, 4mb respectively), default: 4.)
>  * compression.zstd.window: enables long mode; the log of the window size 
> that zstd uses to memorize the compressing data. (available: [10, 22], 
> default: 0 (disables long mode.))
> All of the above are different but somewhat in common from the point of 
> compression process in that it impacts the memorize size during the process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16384) KRaft controller number recommendation

2024-03-18 Thread Peter (Jira)
Peter created KAFKA-16384:
-

 Summary: KRaft controller number recommendation
 Key: KAFKA-16384
 URL: https://issues.apache.org/jira/browse/KAFKA-16384
 Project: Kafka
  Issue Type: Bug
  Components: controller, kraft
Reporter: Peter


There seems to be some conflicting information about how many controllers 
should be used for a KRaft cluster. The first section listed mentions 3 or 5 
controllers may be used, but the second section mentions no more than 3 should 
be used at the moment.

https://kafka.apache.org/documentation/#kraft_voter

> A Kafka admin will typically select 3 or 5 servers for this role, depending 
> on factors like cost and the number of concurrent failures your system should 
> withstand without availability impact. A majority of the controllers must be 
> alive in order to maintain availability. With 3 controllers, the cluster can 
> tolerate 1 controller failure; with 5 controllers, the cluster can tolerate 2 
> controller failures.

https://kafka.apache.org/documentation/#kraft_deployment

> For redundancy, a Kafka cluster should use 3 controllers. More than 3 
> controllers is not recommended in critical environments. In the rare case of 
> a partial network failure it is possible for the cluster metadata quorum to 
> become unavailable. This limitation will be addressed in a future release of 
> Kafka.

 

Is 3 still the recommended number and is there more information on what the 
network issues are that could cause issues when using 5 controllers?

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1529639335


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+

Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1529639335


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+

Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]

2024-03-18 Thread via GitHub


showuon commented on PR #15481:
URL: https://github.com/apache/kafka/pull/15481#issuecomment-2005672241

   Re-triggering the CI build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13361) Support fine-grained compression options

2024-03-18 Thread Cheng-Kai, Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828150#comment-17828150
 ] 

Cheng-Kai, Zhang commented on KAFKA-13361:
--

Hi [~dongjin]  [~mimaison]

I am interested to this issue, and it seems to be manageable for a newbie like 
me.  Do you think I could help on this one?
My plan is to follow current structure [~mimaison]  currently working on to add 
those config. There is a draft PR in a very early stage available here.

> Support fine-grained compression options
> 
>
> Key: KAFKA-13361
> URL: https://issues.apache.org/jira/browse/KAFKA-13361
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>  Labels: needs-kip
>
> Adds the following options into the Producer, Broker, and Topic 
> configurations:
>  * compression.gzip.buffer: the buffer size that feeds raw input into the 
> Deflator or is fed by the uncompressed output from the Deflator. (available: 
> [512, ), default: 8192(=8kb).)
>  * compression.snappy.block: the block size that snappy uses. (available: 
> [1024, ), default: 32768(=32kb).)
>  * compression.lz4.block: the block size that lz4 uses. (available: [4, 7], 
> (means 64kb, 256kb, 1mb, 4mb respectively), default: 4.)
>  * compression.zstd.window: enables long mode; the log of the window size 
> that zstd uses to memorize the compressing data. (available: [10, 22], 
> default: 0 (disables long mode.))
> All of the above are different but somewhat in common from the point of 
> compression process in that it impacts the memorize size during the process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-18 Thread via GitHub


jolshan merged PR #15524:
URL: https://github.com/apache/kafka/pull/15524


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-18 Thread via GitHub


jolshan commented on PR #15524:
URL: https://github.com/apache/kafka/pull/15524#issuecomment-2005610376

   I am seeing it fail more often on your branch (for a few runs), but after 
merging with trunk, it seemed better. I will go ahead and merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-18 Thread via GitHub


jolshan commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1529489216


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1904,16 +2000,7 @@ public void testReconciliationProcess() {
 new ConsumerGroupHeartbeatResponseData()
 .setMemberId(memberId1)
 .setMemberEpoch(11)
-.setHeartbeatIntervalMs(5000)

Review Comment:
   so we are no longer sending a reassignment because we were incorrectly doing 
so before? (ie they weren't full requests and didn't need new assignments)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-18 Thread via GitHub


jolshan commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1529486638


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1190,10 +1190,11 @@ private 
CoordinatorResult consumerGr
 .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs);
 
 // The assignment is only provided in the following cases:
-// 1. The member reported its owned partitions;
-// 2. The member just joined or rejoined to group (epoch equals to 
zero);
-// 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
+// 1. The member sent a full request. It does so when joining or 
rejoining the group; or
+//on any errors (e.g. timeout).
+// 2. The member's assignment has been updated.
+boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 
&& subscribedTopicNames != null && ownedTopicPartitions != null);

Review Comment:
   I think I'm missing how 
   ```(rebalanceTimeoutMs != -1 && subscribedTopicNames != null && 
ownedTopicPartitions != null)```
   maps to 
   ```//  It does so when joining or rejoining the group; or on any errors 
(e.g. timeout).```
   
   Are we instead saying that this particular case -- ie setting all these 3 
fields indicates a "full" request? I think we should update the comment to make 
this a bit clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-18 Thread via GitHub


jeffkbkim commented on PR #15533:
URL: https://github.com/apache/kafka/pull/15533#issuecomment-2005338241

   > This patch changes the logic to check ownedTopicPartitions, 
subscribedTopicNames and rebalanceTimeoutMs as they are the only three non 
optional fields.
   
   They are the only three optional fields, right?
   
   Also, my understanding is that there are cases where we don't set 
subscribedTopicNames and rebalanceTimeoutMs but do set ownedTopicPartitions. Do 
you have examples of this case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14359) Idempotent Producer continues to retry on OutOfOrderSequence error when first batch fails

2024-03-18 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-14359:
--

Assignee: Justine Olshan

> Idempotent Producer continues to retry on OutOfOrderSequence error when first 
> batch fails
> -
>
> Key: KAFKA-14359
> URL: https://issues.apache.org/jira/browse/KAFKA-14359
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> When the idempotent producer does not have any state it can fall into a state 
> where the producer keeps retrying an out of order sequence. Consider the 
> following scenario where an idempotent producer has retries and delivery 
> timeout are int max (a configuration used in streams).
> 1. A producer send out several batches (up to 5) with the first one starting 
> at sequence 0.
> 2. The first batch with sequence 0 fails due to a transient error (ie, 
> NOT_LEADER_OR_FOLLOWER or a timeout error)
> 3. The second batch, say with sequence 200 comes in. Since there is no 
> previous state to invalidate it, it gets written to the log
> 4. The original batch is retried and will get an out of order sequence number
> 5. Current java client will continue to retry this batch, but it will never 
> resolve. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-03-18 Thread via GitHub


CalvinConfluent commented on PR #14706:
URL: https://github.com/apache/kafka/pull/14706#issuecomment-2005099068

   The test failures are irrelevant. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16383: use public constructor [kafka]

2024-03-18 Thread via GitHub


chia7712 commented on code in PR #15556:
URL: https://github.com/apache/kafka/pull/15556#discussion_r1529285548


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java:
##
@@ -460,7 +460,7 @@ public static abstract class AbstractTestConverter extends 
TestConverter {
 }
 
 public static class TestConverterWithPrivateConstructor extends 
TestConverter {
-private TestConverterWithPrivateConstructor() {
+public TestConverterWithPrivateConstructor() {

Review Comment:
   I don't dig in it yet, but it seems the private constructor is what it tries 
to test ( according to the naming "TestConverterWithPrivateConstructor")
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-18 Thread via GitHub


jolshan commented on PR #15524:
URL: https://github.com/apache/kafka/pull/15524#issuecomment-2004976461

   I ran "until failure" on trunk and it took about 30 runs to fail.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-03-18 Thread via GitHub


dongnuo123 commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1529263591


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationConfig.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupProtocolMigrationConfig {

Review Comment:
   Yeah I agree. It does feel a bit weird.. Let me change it to migration 
policy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-18 Thread via GitHub


artemlivshits commented on PR #15524:
URL: https://github.com/apache/kafka/pull/15524#issuecomment-2004953080

   LogDirFailureTest passed locally as well (probably just a flake) and it 
doesn't seem to use any of the code that I touched, so unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15756: [2/3] Migrate existing integration tests to run old protocol in new coordinator [kafka]

2024-03-18 Thread via GitHub


kirktrue commented on code in PR #14675:
URL: https://github.com/apache/kafka/pull/14675#discussion_r1529229638


##
core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala:
##
@@ -89,6 +96,8 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
 val producer = createProducer()
 producerSend(producer, numRecords)
 
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
"3")
+

Review Comment:
   Is this override still needed for the tests to pass?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close

2024-03-18 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828089#comment-17828089
 ] 

Kirk True commented on KAFKA-16217:
---

[~calvinliu]—I reassigned this to you as you've already started tackling it. 
Thanks!

> Transactional producer stuck in IllegalStateException during close
> --
>
> Key: KAFKA-16217
> URL: https://issues.apache.org/jira/browse/KAFKA-16217
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Calvin Liu
>Assignee: Calvin Liu
>Priority: Major
>  Labels: transactions
> Fix For: 3.8.0, 3.7.1, 3.6.3
>
>
> The producer is stuck during the close. It keeps retrying to abort the 
> transaction but it never succeeds. 
> {code:java}
> [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | 
> producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> org.apache.kafka.clients.producer.internals.Sender run - [Producer 
> clientId=producer-transaction-ben
> ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, 
> transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> Error in kafka producer I/O thread while aborting transaction:
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) 
> {code}
> With the additional log, I found the root cause. If the producer is in a bad 
> transaction state(in my case, the TransactionManager.pendingTransition was 
> set to commitTransaction and did not get cleaned), then the producer calls 
> close and tries to abort the existing transaction, the producer will get 
> stuck in the transaction abortion. It is related to the fix 
> [https://github.com/apache/kafka/pull/13591].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close

2024-03-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16217:
-

Assignee: Calvin Liu  (was: Kirk True)

> Transactional producer stuck in IllegalStateException during close
> --
>
> Key: KAFKA-16217
> URL: https://issues.apache.org/jira/browse/KAFKA-16217
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Calvin Liu
>Assignee: Calvin Liu
>Priority: Major
>  Labels: transactions
> Fix For: 3.8.0, 3.7.1, 3.6.3
>
>
> The producer is stuck during the close. It keeps retrying to abort the 
> transaction but it never succeeds. 
> {code:java}
> [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | 
> producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> org.apache.kafka.clients.producer.internals.Sender run - [Producer 
> clientId=producer-transaction-ben
> ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, 
> transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> Error in kafka producer I/O thread while aborting transaction:
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) 
> {code}
> With the additional log, I found the root cause. If the producer is in a bad 
> transaction state(in my case, the TransactionManager.pendingTransition was 
> set to commitTransaction and did not get cleaned), then the producer calls 
> close and tries to abort the existing transaction, the producer will get 
> stuck in the transaction abortion. It is related to the fix 
> [https://github.com/apache/kafka/pull/13591].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]

2024-03-18 Thread via GitHub


kirktrue commented on PR #15541:
URL: https://github.com/apache/kafka/pull/15541#issuecomment-2004890051

   @CalvinConfluent—thanks for the PR!
   
   This PR doesn't have any unit tests to verify the new behavior. Would it be 
possible to migrate the test case from your _other PR_ (#15336) to _this PR_?
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-18 Thread via GitHub


jeffkbkim commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1529191724


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.
+processor.enqueueFirst(new 
CoordinatorInternalEvent("HighWatermarkUpdate", tp, () -> {

Review Comment:
   this should be just `enqueueFirst(...)`, not `processor.enqueueFirst(...)` 
right?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.

Review Comment:
   Let's say there are two HWM advancements, to offset `h1` and `h2` 
respectively. (`h1 < h2`)
   
   The first HWM advancement to `h1` will set lastHighWatermark to `NO_OFFSET` 
and enqueueFirst() HWM update event.
   
   Before the first event runs, let's say the HWM advances to `h2`. this will 
see that lastHighWatermark is `NO_OFFSET` and will skip enqueueFirst().
   
   Doesn't this mean that all write events waiting for committed offset `h1 < 
committed_offset <= h2` cannot complete until the HWM advances again?
   
   I wonder if we can:
   * keep track of highest HWM updated 
   * only enqueueFirst if the offset to update is greater than highest HWM 
recorded
   
   Would this work?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);

[PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-18 Thread via GitHub


wernerdv opened a new pull request, #15558:
URL: https://github.com/apache/kafka/pull/15558

   Log the list of topics for which an authorization error has been received 
when try to describe configs, along with the cluster alias.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-18 Thread via GitHub


jsancio merged PR #15478:
URL: https://github.com/apache/kafka/pull/15478


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16345: Optionally URL-encode clientID and clientSecret in authorization header [kafka]

2024-03-18 Thread via GitHub


kirktrue commented on code in PR #15475:
URL: https://github.com/apache/kafka/pull/15475#discussion_r1529074359


##
clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java:
##
@@ -192,6 +192,12 @@ public class SaslConfigs {
 + " be inspected for the standard OAuth \"iss\" claim and if this 
value is set, the broker will match it exactly against what is in the JWT's 
\"iss\" claim. If there is no"
 + " match, the broker will reject the JWT and authentication will 
fail.";
 
+public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = 
"sasl.oauthbearer.header.urlencode.enable";

Review Comment:
   It was added here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1025%3A+Optionally+URL-encode+clientID+and+clientSecret+in+authorization+header



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-18 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1526788841


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -551,9 +580,11 @@ class BrokerLifecycleManager(
   }
 
   private def scheduleNextCommunication(intervalNs: Long): Unit = {
-trace(s"Scheduling next communication at 
${MILLISECONDS.convert(intervalNs, NANOSECONDS)} " +
+val nanos = if (nextSchedulingShouldBeImmediate) 0 else intervalNs

Review Comment:
   nanos =>  adjustedIntervalNs?



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest {
   Collections.emptyMap(), OptionalLong.empty())
 poll(ctx, manager, registration)
 
+def nextHeartbeatDirs(): Set[String] =
+  poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(
+.data().offlineLogDirs().asScala.map(_.toString).toSet
+assertEquals(Set.empty, nextHeartbeatDirs())
 
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
 
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), 
nextHeartbeatDirs())

Review Comment:
   It seems this could still be flaky. 
   
   `poll(ctx, manager, registration)
   `
   A CommunicationEvent c1 with a delay of 0 is scheduled in manager.eventQueue.
   
   `assertEquals(Set.empty, nextHeartbeatDirs())
   `
   c1 is processed in `ctx.mockChannelManager` and a 
BrokerHeartbeatResponseEvent b1 is added to `manager.eventQueue`. b1 is 
processed at `manager.eventQueue` and a CommunicationEvent c2 with a delay of 
100 is scheduled in `manager.eventQueue`.
   
   `manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
   `
   An OfflineDirEvent o1 is appended to `manager.eventQueue`, but not yet 
processed.
   
   `assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
   `
   nextHeartbeatDirs() causes the time to advance quickly and passes 100. c2 is 
processed at `manager.eventQueue` and a HeartBeat request h1 is added to 
`ctx.mockChannelManager`.
   
   o1 is processed at `manager.eventQueue` and a CommunicationEvent c3 is 
appended to `manager.eventQueue`.  c3 is processed at `manager.eventQueue` sets 
`nextSchedulingShouldBeImmediate` to true.
   
   h1 is processed by `ctx.mockChannelManager` and adds 
BrokerHeartbeatResponseEvent b2 to `manager.eventQueue`. `manager.eventQueue` 
processes b2 and schedules a CommunicationEvent c4 with a delay of 0 in 
`manager.eventQueue`. c4 is processed at `manager.eventQueue` and a HeartBeat 
request h2 (doesn't pick up ej8Q9_d2Ri6FXNiTxKFiow) is added to 
`ctx.mockChannelManager`.
   
   
`manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))`
   `assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), 
nextHeartbeatDirs())`
   Now the above assertion will fail since `nextHeartbeatDirs()` will pick up 
c4 which doesn't include ej8Q9_d2Ri6FXNiTxKFiow.
   
   
   
   



##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -513,4 +514,36 @@ public void close() throws InterruptedException {
 eventHandlerThread.join();
 log.info("closed event queue.");
 }
+
+/**
+ * Returns the deferred event that the queue is waiting for, idling until
+ * its deadline comes, if there is any.
+ * If the queue has immediate work to do, this returns empty.
+ * This is useful for unit tests, where to make progress, we need to
+ * speed the clock up until the next scheduled event is ready to run.
+ */
+public Optional scheduledAfterIdling() {

Review Comment:
   scheduledAfterIdling => firstDeferredIfIdling ?



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-val context = new RegistrationTestContext(configProperties)
-val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+val ctx = new RegistrationTestContext(configProperties)
+val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
 val controllerNode = new Node(3000, "localhost", 8021)
-context.controllerNodeProvider.node.set(controllerNode)
-manager.start(() => context.highestMetadataOffset.get(),
-  context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+

[jira] [Commented] (KAFKA-15282) Implement client support for KIP-848 client-side assignors

2024-03-18 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828075#comment-17828075
 ] 

Kirk True commented on KAFKA-15282:
---

[~zxcoccer]—you're certainly welcome to look at this. I would note that there's 
a little uncertainty if we need this functionality or not. That's one of the 
reasons we haven't tackled it yet.

> Implement client support for KIP-848 client-side assignors
> --
>
> Key: KAFKA-15282
> URL: https://issues.apache.org/jira/browse/KAFKA-15282
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> The client-side assignor provides the logic for the partition assignments 
> instead of on the server. Client-side assignment is the main approach used by 
> the “old protocol” for divvying up partitions. While the “new protocol” 
> favors server-side assignors, the client-side assignor will continue to be 
> used for backward compatibility, including KSQL, Connect, etc.
> Note: I _*think*_ that the client-side assignor logic and the reconciliation 
> logic can remain separate from each other. We should strive to keep the two 
> pieces unencumbered, unless it’s unavoidable.
> This task includes:
>  * Validate the client’s configuration for assignor selection
>  * Integrate with the new {{PartitionAssignor}} interface to invoke the logic 
> from the user-provided assignor implementation
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupPrepareAssignment}} RPC call using the information from the 
> {{PartitionAssignor}} above
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupInstallAssignment}} RPC call, again using the information 
> calculated by the {{PartitionAssignor}}
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito [kafka]

2024-03-18 Thread via GitHub


clolov commented on code in PR #15254:
URL: https://github.com/apache/kafka/pull/15254#discussion_r1529018575


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -190,7 +181,7 @@ public class TaskManagerTest {
 
 @org.mockito.Mock
 private InternalTopologyBuilder topologyBuilder;
-@Mock(type = MockType.DEFAULT)
+@org.mockito.Mock

Review Comment:
   Hopefully done on both accounts  !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-18 Thread via GitHub


jolshan commented on PR #15524:
URL: https://github.com/apache/kafka/pull/15524#issuecomment-2004586672

   @artemlivshits it may be worth checking if the test is failing on trunk as 
well. If so we can renew the JIRA to fix it  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use [kafka]

2024-03-18 Thread via GitHub


edoardocomar commented on PR #15530:
URL: https://github.com/apache/kafka/pull/15530#issuecomment-2004581473

   fix cherry picked to 3.6 and 3.7 branches


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use

2024-03-18 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828067#comment-17828067
 ] 

Edoardo Comar commented on KAFKA-16369:
---

fix cherry picked to 3.6 and 3.7

> Broker may not shut down when SocketServer fails to bind as Address already 
> in use
> --
>
> Key: KAFKA-16369
> URL: https://issues.apache.org/jira/browse/KAFKA-16369
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1, 3.8.0
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
> Attachments: kraft-server.log, server.log
>
>
> When in Zookeeper mode, if a port the broker should listen to is already bound
> the KafkaException: Socket server failed to bind to localhost:9092: Address 
> already in use.
> is thrown but the Broker continues to startup .
> It correctly shuts down when in KRaft mode.
> Easy to reproduce when in Zookeper mode with server.config set to listen to 
> localhost only
> {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-18 Thread via GitHub


jolshan commented on PR #15524:
URL: https://github.com/apache/kafka/pull/15524#issuecomment-2004580064

   > LogDirFailureTest > testIOExceptionDuringLogRoll(String) has been failing 
for a while -- there are some issues I tried to tackle with respect to it, but 
we ran into some issues.  See https://github.com/apache/kafka/pull/15354


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-18 Thread via GitHub


dongnuo123 commented on code in PR #15546:
URL: https://github.com/apache/kafka/pull/15546#discussion_r1529010875


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9298,6 +9298,120 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
 verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 }
 
+@Test
+public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+assertThrows(GroupIdNotFoundException.class, () ->
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(classicGroupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setServerAssignor("range")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setTopicPartitions(Collections.emptyList(;
+}
+
+@Test
+public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+String classicGroupId = "classic-group-id";
+String memberId = Uuid.randomUuid().toString();
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.build();
+ClassicGroup classicGroup = new ClassicGroup(
+new LogContext(),
+classicGroupId,
+EMPTY,
+context.time,
+context.metrics
+);
+context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+CoordinatorResult result = 
context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(classicGroupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setServerAssignor("range")
+.setRebalanceTimeoutMs(5000)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setTopicPartitions(Collections.emptyList()));
+
+assertEquals(0, result.response().errorCode());
+
assertEquals(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), 
result.records().get(0));
+assertEquals(Group.GroupType.CONSUMER,
+
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(classicGroupId, 
false).type());
+}
+
+@Test
+public void testClassicGroupJoinWithNonEmptyConsumerGroup() throws 
Exception {
+String consumerGroupId = "consumer-group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(consumerGroupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+assertEquals(Errors.GROUP_ID_NOT_FOUND.code(), 
joinResult.joinFuture.get().errorCode());
+}
+
+@Test
+public void testClassicGroupJoinWithEmptyConsumerGroup() throws Exception {
+String consumerGroupId = "consumer-group-id";
+

Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-18 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1529005304


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3040,6 +3048,7 @@ class ReplicaManagerTest {
   transactionalId = transactionalId,
   entriesPerPartition = entriesToAppend,
   responseCallback = appendCallback,
+  apiVersionErrorMapper = genericError

Review Comment:
   Is there a reason some of these are generic and some are default? I wonder 
if we can configure the test to use the "highest" enum



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-18 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1528996611


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3523,6 +3524,128 @@ public void 
testForegroundInvalidStateTransitionIsRecoverable() {
 assertFalse(transactionManager.hasOngoingTransaction());
 }
 
+@Test
+public void testAbortableTransactionExceptionInInitProducerId() {
+TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions();
+prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+prepareInitPidResponse(Errors.ABORTABLE_TRANSACTION, false, 
producerId, RecordBatch.NO_PRODUCER_EPOCH);
+runUntil(transactionManager::hasError);
+assertTrue(initPidResult.isCompleted());
+assertFalse(initPidResult.isSuccessful());
+assertThrows(AbortableTransactionException.class, 
initPidResult::await);
+assertAbortableError(AbortableTransactionException.class);
+}
+
+@Test
+public void testAbortableTransactionExceptionInAddPartitions() {
+final TopicPartition tp = new TopicPartition("foo", 0);
+
+doInitTransactions();
+
+transactionManager.beginTransaction();
+transactionManager.maybeAddPartition(tp);
+
+prepareAddPartitionsToTxn(tp, Errors.ABORTABLE_TRANSACTION);
+runUntil(transactionManager::hasError);
+assertTrue(transactionManager.lastError() instanceof 
AbortableTransactionException);
+
+assertAbortableError(AbortableTransactionException.class);
+}
+
+@Test
+public void testAbortableTransactionExceptionInFindCoordinator() {
+doInitTransactions();
+
+transactionManager.beginTransaction();
+TransactionalRequestResult sendOffsetsResult = 
transactionManager.sendOffsetsToTransaction(
+singletonMap(new TopicPartition("foo", 0), new 
OffsetAndMetadata(39L)), new ConsumerGroupMetadata(consumerGroupId));
+
+prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, 
producerId, epoch);
+runUntil(() -> !transactionManager.hasPartitionsToAdd());
+
+prepareFindCoordinatorResponse(Errors.ABORTABLE_TRANSACTION, false, 
CoordinatorType.GROUP, consumerGroupId);
+runUntil(transactionManager::hasError);
+assertTrue(transactionManager.lastError() instanceof 
AbortableTransactionException);
+
+runUntil(sendOffsetsResult::isCompleted);
+assertFalse(sendOffsetsResult.isSuccessful());
+assertTrue(sendOffsetsResult.error() instanceof 
AbortableTransactionException);
+
+assertAbortableError(AbortableTransactionException.class);
+}
+
+@Test
+public void testAbortableTransactionExceptionInEndTxn() throws 
InterruptedException {
+doInitTransactions();
+
+transactionManager.beginTransaction();
+transactionManager.maybeAddPartition(tp0);
+TransactionalRequestResult commitResult = 
transactionManager.beginCommit();
+
+Future responseFuture = appendToAccumulator(tp0);
+
+assertFalse(responseFuture.isDone());
+prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+prepareProduceResponse(Errors.NONE, producerId, epoch);
+prepareEndTxnResponse(Errors.ABORTABLE_TRANSACTION, 
TransactionResult.COMMIT, producerId, epoch);
+
+runUntil(commitResult::isCompleted);
+runUntil(responseFuture::isDone);
+
+assertThrows(KafkaException.class, commitResult::await);
+assertFalse(commitResult.isSuccessful());
+assertTrue(commitResult.isAcked());
+
+assertAbortableError(AbortableTransactionException.class);
+// make sure the exception was thrown directly from the follow-up 
calls.

Review Comment:
   nit: was this comment meant to be before further checks? I noticed this test 
was a little different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-18 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1528996611


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3523,6 +3524,128 @@ public void 
testForegroundInvalidStateTransitionIsRecoverable() {
 assertFalse(transactionManager.hasOngoingTransaction());
 }
 
+@Test
+public void testAbortableTransactionExceptionInInitProducerId() {
+TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions();
+prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+prepareInitPidResponse(Errors.ABORTABLE_TRANSACTION, false, 
producerId, RecordBatch.NO_PRODUCER_EPOCH);
+runUntil(transactionManager::hasError);
+assertTrue(initPidResult.isCompleted());
+assertFalse(initPidResult.isSuccessful());
+assertThrows(AbortableTransactionException.class, 
initPidResult::await);
+assertAbortableError(AbortableTransactionException.class);
+}
+
+@Test
+public void testAbortableTransactionExceptionInAddPartitions() {
+final TopicPartition tp = new TopicPartition("foo", 0);
+
+doInitTransactions();
+
+transactionManager.beginTransaction();
+transactionManager.maybeAddPartition(tp);
+
+prepareAddPartitionsToTxn(tp, Errors.ABORTABLE_TRANSACTION);
+runUntil(transactionManager::hasError);
+assertTrue(transactionManager.lastError() instanceof 
AbortableTransactionException);
+
+assertAbortableError(AbortableTransactionException.class);
+}
+
+@Test
+public void testAbortableTransactionExceptionInFindCoordinator() {
+doInitTransactions();
+
+transactionManager.beginTransaction();
+TransactionalRequestResult sendOffsetsResult = 
transactionManager.sendOffsetsToTransaction(
+singletonMap(new TopicPartition("foo", 0), new 
OffsetAndMetadata(39L)), new ConsumerGroupMetadata(consumerGroupId));
+
+prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, 
producerId, epoch);
+runUntil(() -> !transactionManager.hasPartitionsToAdd());
+
+prepareFindCoordinatorResponse(Errors.ABORTABLE_TRANSACTION, false, 
CoordinatorType.GROUP, consumerGroupId);
+runUntil(transactionManager::hasError);
+assertTrue(transactionManager.lastError() instanceof 
AbortableTransactionException);
+
+runUntil(sendOffsetsResult::isCompleted);
+assertFalse(sendOffsetsResult.isSuccessful());
+assertTrue(sendOffsetsResult.error() instanceof 
AbortableTransactionException);
+
+assertAbortableError(AbortableTransactionException.class);
+}
+
+@Test
+public void testAbortableTransactionExceptionInEndTxn() throws 
InterruptedException {
+doInitTransactions();
+
+transactionManager.beginTransaction();
+transactionManager.maybeAddPartition(tp0);
+TransactionalRequestResult commitResult = 
transactionManager.beginCommit();
+
+Future responseFuture = appendToAccumulator(tp0);
+
+assertFalse(responseFuture.isDone());
+prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+prepareProduceResponse(Errors.NONE, producerId, epoch);
+prepareEndTxnResponse(Errors.ABORTABLE_TRANSACTION, 
TransactionResult.COMMIT, producerId, epoch);
+
+runUntil(commitResult::isCompleted);
+runUntil(responseFuture::isDone);
+
+assertThrows(KafkaException.class, commitResult::await);
+assertFalse(commitResult.isSuccessful());
+assertTrue(commitResult.isAcked());
+
+assertAbortableError(AbortableTransactionException.class);
+// make sure the exception was thrown directly from the follow-up 
calls.

Review Comment:
   nit: was this comment meant to be before further checks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-18 Thread via GitHub


jolshan commented on PR #15486:
URL: https://github.com/apache/kafka/pull/15486#issuecomment-2004546341

   Hey @sjhajharia thanks for the updates. I think `ApiVersionErrorMapper` is 
also a bit tricky since the addPartitions change will not be related to errors 
at all. That's why I was thinking of something a bit more generic. "Operation" 
was probably the right direction rather than "errors". Sorry this part is a bit 
nuanced. Maybe something like "supportedOperation" and the comment could be 
something like the supported operation based on the client's request API 
version?
   
   And yes, I think we should bump all the API versions to support this new 
error. I can update the KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-18 Thread via GitHub


wcarlson5 commented on PR #15414:
URL: https://github.com/apache/kafka/pull/15414#issuecomment-2004539886

   @mjsax I added some testing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-03-18 Thread via GitHub


fvaleri commented on code in PR #15273:
URL: https://github.com/apache/kafka/pull/15273#discussion_r1471706520


##
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##
@@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) {
 }
 }
 
+private static Set parseProcessRoles(List processRoles, 
Map voterConnections, int nodeId) {

Review Comment:
   KafkaConfig:1537, I kept the original code here. I don't remember the exact 
reason. I have to move ProcessRole from server to server-common to avoid 
circular dependency. Let me know if this is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16297) Race condition while promoting future replica can lead to partition unavailability.

2024-03-18 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-16297:

Description: 
KIP-858 proposed that when a directory failure occurs after changing the 
assignment of a replica that's moved between two directories in the same 
broker, but before the future replica promotion completes, the broker should 
reassign the replica to inform the controller of its correct status. But this 
hasn't yet been implemented, and without it this failure may lead to indefinite 
partition unavailability.

Example scenario:
 # A broker which leads partition P receives a request to alter the replica 
from directory A to directory B.
 # The broker creates a future replica in directory B and starts a replica 
fetcher.
 # Once the future replica first catches up, the broker queues a reassignment 
to inform the controller of the directory change.
 # The next time the replica catches up, the broker briefly blocks appends and 
promotes the replica. However, before the promotion is attempted, directory A 
fails.
 # The controller was informed that P in now in directory B before it received 
the notification that directory A has failed, so it does not elect a new 
leader, and as long as the broker is online, partition A remains unavailable.

 

 

  was:
KIP-858 proposed that when a directory failure occurs after changing the 
assignment of a replica that's moved between two directories in the same 
broker, but before the future replica promotion completes, the broker should 
reassign the replica to inform the controller of its correct status. But this 
hasn't yet been implemented, and without it this failure may lead to indefinite 
partition unavailability.

Example scenario:
 # A broker which leads partition P receives a request to alter the replica 
from directory A to directory B.
 # The broker creates a future replica in directory B and starts a replica 
fetcher.
 # Once the future replica first catches up, the broker queues a reassignment 
to inform the controller of the directory change.
 # The next time the replica catches up, the broker briefly blocks appends and 
promotes the replica. However, before the promotion is attempted, directory A 
fails.
 # The controller was informed that P in now in directory B before it received 
the notification that directory A has failed, so it does not elect a new 
leader, and as long as the broker is online, partition A remains unavailable.

As per KIP-858, the broker should detect this scenario and queue a reassignment 
of P into directory ID {{{}DirectoryId.LOST{}}}.

 


> Race condition while promoting future replica can lead to partition 
> unavailability.
> ---
>
> Key: KAFKA-16297
> URL: https://issues.apache.org/jira/browse/KAFKA-16297
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>
> KIP-858 proposed that when a directory failure occurs after changing the 
> assignment of a replica that's moved between two directories in the same 
> broker, but before the future replica promotion completes, the broker should 
> reassign the replica to inform the controller of its correct status. But this 
> hasn't yet been implemented, and without it this failure may lead to 
> indefinite partition unavailability.
> Example scenario:
>  # A broker which leads partition P receives a request to alter the replica 
> from directory A to directory B.
>  # The broker creates a future replica in directory B and starts a replica 
> fetcher.
>  # Once the future replica first catches up, the broker queues a reassignment 
> to inform the controller of the directory change.
>  # The next time the replica catches up, the broker briefly blocks appends 
> and promotes the replica. However, before the promotion is attempted, 
> directory A fails.
>  # The controller was informed that P in now in directory B before it 
> received the notification that directory A has failed, so it does not elect a 
> new leader, and as long as the broker is online, partition A remains 
> unavailable.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16363) Storage crashes if dir is unavailable

2024-03-18 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-16363:
---

Assignee: (was: Igor Soarez)

> Storage crashes if dir is unavailable
> -
>
> Key: KAFKA-16363
> URL: https://issues.apache.org/jira/browse/KAFKA-16363
> Project: Kafka
>  Issue Type: Sub-task
>  Components: tools
>Affects Versions: 3.7.0
>Reporter: Igor Soarez
>Priority: Major
>
> The storage tool crashes if one of the configured log directories is 
> unavailable. 
>  
> {code:java}
> sh-4.4# ./bin/kafka-storage.sh format --ignore-formatted -t $KAFKA_CLUSTER_ID 
> -c server.properties
> [2024-03-11 17:51:05,391] ERROR Error while reading meta.properties file 
> /data/d2/meta.properties 
> (org.apache.kafka.metadata.properties.MetaPropertiesEnsemble)
> java.nio.file.AccessDeniedException: /data/d2/meta.properties
>         at 
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:90)
>         at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106)
>         at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
>         at 
> java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:218)
>         at java.base/java.nio.file.Files.newByteChannel(Files.java:380)
>         at java.base/java.nio.file.Files.newByteChannel(Files.java:432)
>         at 
> java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422)
>         at java.base/java.nio.file.Files.newInputStream(Files.java:160)
>         at 
> org.apache.kafka.metadata.properties.PropertiesUtils.readPropertiesFile(PropertiesUtils.java:77)
>         at 
> org.apache.kafka.metadata.properties.MetaPropertiesEnsemble$Loader.load(MetaPropertiesEnsemble.java:135)
>         at kafka.tools.StorageTool$.formatCommand(StorageTool.scala:431)
>         at kafka.tools.StorageTool$.main(StorageTool.scala:95)
>         at kafka.tools.StorageTool.main(StorageTool.scala)
> metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
> dirs={/data/d1: MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, 
> nodeId=101, directoryId=zm7fSw3zso9aR0AtuzsI_A), /data/metadata: 
> MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, nodeId=101, 
> directoryId=eRO8vOP7ddbpx_W2ZazjLw), /data/d2: ERROR})
> I/O error trying to read log directory /data/d2.
>  {code}
> When configured with multiple directories, Kafka tolerates some of them (but 
> not all) being inaccessible, so this tool should be able to handle the same 
> scenarios without crashing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528843407


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+

Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-18 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1528822628


##
streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+
+@Timeout(600)
+@Tag("integration")
+public class GlobalStateReprocessTest {
+private static final int NUM_BROKERS = 1;
+private static final Properties BROKER_CONFIG;
+
+static {
+BROKER_CONFIG = new Properties();
+BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 
1);
+BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
+}
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+
+@BeforeAll
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+
+private final MockTime mockTime = CLUSTER.time;
+private final String globalStore = "globalStore";
+private StreamsBuilder builder;
+private Properties streamsConfiguration;
+private KafkaStreams kafkaStreams;
+private String globalStoreTopic;
+
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws Exception {
+builder = new StreamsBuilder();
+
+createTopics();
+streamsConfiguration = new Properties();
+final String safeTestName = safeUniqueTestName(testInfo);
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
100L);
+
+final KeyValueStoreBuilder storeBuilder = new 
KeyValueStoreBuilder<>(
+

Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-18 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1528810589


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -236,6 +252,91 @@ private List topicPartitionsForStore(final 
StateStore store) {
 }
 return topicPartitions;
 }
+@SuppressWarnings("unchecked")
+private void reprocessState(final List topicPartitions,
+final Map highWatermarks,
+final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+final String storeName) {
+final Processor source = reprocessFactory.processorSupplier().get();
+source.init(globalProcessorContext);
+
+for (final TopicPartition topicPartition : topicPartitions) {
+long currentDeadline = NO_DEADLINE;
+
+globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
+final Long checkpoint = checkpointFileCache.get(topicPartition);
+if (checkpoint != null) {
+globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
+} else {
+
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = getGlobalConsumerOffset(topicPartition);
+}
+final Long highWatermark = highWatermarks.get(topicPartition);
+stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
+
+long restoreCount = 0L;
+
+while (offset < highWatermark) {
+// we add `request.timeout.ms` to `poll.ms` because `poll.ms` 
might be too short
+// to give a fetch request a fair chance to actually complete 
and we don't want to
+// start `task.timeout.ms` too early
+//
+// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
+//  `poll(pollMS)` without adding the request timeout and 
do a more precise
+//  timeout handling
+final ConsumerRecords records = 
globalConsumer.poll(pollMsPlusRequestTimeout);
+if (records.isEmpty()) {
+currentDeadline = 
maybeUpdateDeadlineOrThrow(currentDeadline);
+} else {
+currentDeadline = NO_DEADLINE;
+}
+
+for (final ConsumerRecord record : 
records.records(topicPartition)) {
+final ProcessorRecordContext recordContext =
+new ProcessorRecordContext(
+record.timestamp(),
+record.offset(),
+record.partition(),
+record.topic(),
+record.headers());
+globalProcessorContext.setRecordContext(recordContext);
+
+try {
+if (record.key() != null) {
+source.process(new Record<>(
+
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
+
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
+record.timestamp(),
+record.headers()));
+restoreCount++;
+}
+} catch (final Exception deserializationException) {
+handleDeserializationFailure(

Review Comment:
   That was my first thought too. Maybe we could refactor it a bit more, but a 
`RecordDeserializer` wants a `SourceNode`. It seemed like that would be 
changing more surfaces than necessary. But we can



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-18 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1528807717


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java:
##
@@ -68,40 +68,49 @@ ConsumerRecord deserialize(final 
ProcessorContext processo
 Optional.empty()
 );
 } catch (final Exception deserializationException) {
-final 
DeserializationExceptionHandler.DeserializationHandlerResponse response;
-try {
-response = deserializationExceptionHandler.handle(
-(InternalProcessorContext) processorContext,
-rawRecord,
-deserializationException);
-} catch (final Exception fatalUserException) {
-log.error(
-"Deserialization error callback failed after 
deserialization error for record {}",
-rawRecord,
-deserializationException);
-throw new StreamsException("Fatal user code error in 
deserialization error callback", fatalUserException);
-}
+handleDeserializationFailure(deserializationExceptionHandler, 
processorContext, deserializationException, rawRecord, log, 
droppedRecordsSensor);
+return null;

Review Comment:
   yeah we can add a note



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-18 Thread via GitHub


lianetm commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1528798734


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1650,6 +1650,102 @@ public void 
testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferen
 .setTopicPartitions(Collections.emptyList(;
 }
 
+@Test
+public void testConsumerGroupHeartbeatFullResponse() {
+String groupId = "fooup";
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+// Create a context with one consumer group containing two members.

Review Comment:
   group containing 1 member here right? 2 partitions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-18 Thread via GitHub


lianetm commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1528793006


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1190,10 +1190,11 @@ private 
CoordinatorResult consumerGr
 .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs);
 
 // The assignment is only provided in the following cases:
-// 1. The member reported its owned partitions;
-// 2. The member just joined or rejoined to group (epoch equals to 
zero);
-// 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
+// 1. The member sent a full request. It does so when joining or 
rejoining the group; or
+//on any errors (e.g. timeout).
+// 2. The member's assignment has been updated.
+boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 
&& subscribedTopicNames != null && ownedTopicPartitions != null);

Review Comment:
   nice, perfectly aligned with the client side. Just for the record, along 
with the `rebalanceTimeout`, `topics` and `assignment`, the client will also 
include the server assignor in any full request, but only if it's configured, 
so agree on not including it here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+

[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined

2024-03-18 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16375:
---
Labels: client-transitions-issues kip-848-client-support  (was: 
kip-848-client-support)

> Fix logic for discarding reconciliation if member rejoined
> --
>
> Key: KAFKA-16375
> URL: https://issues.apache.org/jira/browse/KAFKA-16375
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> The current implementation of the new consumer discards the result of a 
> reconciliation if the member rejoined, based on a comparison of the member 
> epoch at the start and end of the reconciliation. If the epochs changed the 
> reconciliation is discarded. This is not right because the member epoch could 
> be incremented without an assignment change. This should be fixed to ensure 
> that the reconciliation is discarded if the member rejoined, probably based 
> on a flag that truly reflects that it went through a transition to joining. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+

Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-18 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+

[jira] [Assigned] (KAFKA-15517) Improve MirrorMaker logging in case of authorization errors

2024-03-18 Thread Dmitry Werner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Werner reassigned KAFKA-15517:
-

Assignee: Dmitry Werner

> Improve MirrorMaker logging in case of authorization errors
> ---
>
> Key: KAFKA-15517
> URL: https://issues.apache.org/jira/browse/KAFKA-15517
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Mickael Maison
>Assignee: Dmitry Werner
>Priority: Major
>
> In case MirrorMaker is missing DESCRIBE_CONFIGS on the source cluster, all 
> youget in the logs are lines like:
> {noformat}
> 2023-09-27 11:56:54,989 ERROR 
> [my-cluster-source->my-cluster-target.MirrorSourceConnector|worker] Scheduler 
> for MirrorSourceConnector caught exception in scheduled task: refreshing 
> topics (org.apache.kafka.connect.mirror.Scheduler) [Scheduler for 
> MirrorSourceConnector-refreshing topics]
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicAuthorizationException: Topic 
> authorization failed.
> {noformat}
> It would be good to report the exact call that failed and include the cluster 
> as well to make it easy to figure out which permissions are missing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls

2024-03-18 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827990#comment-17827990
 ] 

Lianet Magrans commented on KAFKA-15551:


Also, given how tight the deadline is to get the fix/PR in, I would suggest we 
focus on the new consumer only. If we find things that could be improved in 
this sense in the old one, we could file a separate Jira for it and tackle it 
afterwards. 

> Evaluate conditions for short circuiting consumer API calls
> ---
>
> Key: KAFKA-15551
> URL: https://issues.apache.org/jira/browse/KAFKA-15551
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, events
> Fix For: 3.8.0
>
>
> For conditions like:
>  * Committing empty offset
>  * Fetching offsets for empty partitions
>  * Getting empty topic partition position
> Should be short circuit possibly at the API level.
> As a bonus, we should double-check whether the existing {{KafkaConsumer}} 
> implementation suffers from this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls

2024-03-18 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827989#comment-17827989
 ] 

Lianet Magrans commented on KAFKA-15551:


Hey [~zxcoccer], thanks for jumping in! This one should definitely be a simple 
one, as I know that we do handle it already for some API calls (ex. validate 
positions early return if no positions to validate 
[here|https://github.com/apache/kafka/blob/5c929874b88b3b96f650de0f733d93d42ac535a4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java#L227]).
 Still, we wanted to verify that we are doing similarly for all the API calls, 
to ensure that we're not doing unneeded processing/requests.
We want to have this for 3.8, so the deadline is really tight, but if you have 
availability it would be a great help, feel free to re-assign it to you and 
ping me anytime if you have questions. Thanks!  

> Evaluate conditions for short circuiting consumer API calls
> ---
>
> Key: KAFKA-15551
> URL: https://issues.apache.org/jira/browse/KAFKA-15551
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, events
> Fix For: 3.8.0
>
>
> For conditions like:
>  * Committing empty offset
>  * Fetching offsets for empty partitions
>  * Getting empty topic partition position
> Should be short circuit possibly at the API level.
> As a bonus, we should double-check whether the existing {{KafkaConsumer}} 
> implementation suffers from this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito [kafka]

2024-03-18 Thread via GitHub


cadonna commented on code in PR #15254:
URL: https://github.com/apache/kafka/pull/15254#discussion_r1528601954


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -190,7 +181,7 @@ public class TaskManagerTest {
 
 @org.mockito.Mock
 private InternalTopologyBuilder topologyBuilder;
-@Mock(type = MockType.DEFAULT)
+@org.mockito.Mock

Review Comment:
   @clolov If you resolve Ismaels comment, could you also please remove the 
`Mockito` prefix for other method calls like `verify()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use

2024-03-18 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827976#comment-17827976
 ] 

Edoardo Comar commented on KAFKA-16369:
---

fix merged in trunk thanks [~showuon]

> Broker may not shut down when SocketServer fails to bind as Address already 
> in use
> --
>
> Key: KAFKA-16369
> URL: https://issues.apache.org/jira/browse/KAFKA-16369
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1, 3.8.0
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
> Attachments: kraft-server.log, server.log
>
>
> When in Zookeeper mode, if a port the broker should listen to is already bound
> the KafkaException: Socket server failed to bind to localhost:9092: Address 
> already in use.
> is thrown but the Broker continues to startup .
> It correctly shuts down when in KRaft mode.
> Easy to reproduce when in Zookeper mode with server.config set to listen to 
> localhost only
> {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use

2024-03-18 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-16369:
--
Fix Version/s: 3.8.0

> Broker may not shut down when SocketServer fails to bind as Address already 
> in use
> --
>
> Key: KAFKA-16369
> URL: https://issues.apache.org/jira/browse/KAFKA-16369
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1, 3.8.0
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: kraft-server.log, server.log
>
>
> When in Zookeeper mode, if a port the broker should listen to is already bound
> the KafkaException: Socket server failed to bind to localhost:9092: Address 
> already in use.
> is thrown but the Broker continues to startup .
> It correctly shuts down when in KRaft mode.
> Easy to reproduce when in Zookeper mode with server.config set to listen to 
> localhost only
> {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use

2024-03-18 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-16369:
--
Fix Version/s: 3.6.2
   3.7.1

> Broker may not shut down when SocketServer fails to bind as Address already 
> in use
> --
>
> Key: KAFKA-16369
> URL: https://issues.apache.org/jira/browse/KAFKA-16369
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1, 3.8.0
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
> Attachments: kraft-server.log, server.log
>
>
> When in Zookeeper mode, if a port the broker should listen to is already bound
> the KafkaException: Socket server failed to bind to localhost:9092: Address 
> already in use.
> is thrown but the Broker continues to startup .
> It correctly shuts down when in KRaft mode.
> Easy to reproduce when in Zookeper mode with server.config set to listen to 
> localhost only
> {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] use public constructor [kafka]

2024-03-18 Thread via GitHub


johnnychhsu opened a new pull request, #15556:
URL: https://github.com/apache/kafka/pull/15556

   ## Context
   This test failed in several PR, and from one of the failed build, there was 
an error log
   ```
   [2024-03-12 13:58:12,744] ERROR Failed to discover HeaderConverter in 
classpath: Unable to instantiate TestConverterWithPrivateConstructor: Plugin 
class default constructor must be public 
(org.apache.kafka.connect.runtime.isolation.ReflectionScanner:138)
   java.lang.IllegalAccessException: class 
org.apache.kafka.connect.runtime.isolation.ReflectionScanner cannot access a 
member of class 
org.apache.kafka.connect.integration.ConnectorValidationIntegrationTest$TestConverterWithPrivateConstructor
 with modifiers "private"
   ```
   in [this 
build](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15463/4/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplicateFromLatest__/).
 
   Jira ticket: 
[KAFKA-16383](https://issues.apache.org/jira/browse/KAFKA-16383).
   
   I guess this could be the potential cause.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito [kafka]

2024-03-18 Thread via GitHub


ijuma commented on code in PR #15254:
URL: https://github.com/apache/kafka/pull/15254#discussion_r1528552125


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -190,7 +181,7 @@ public class TaskManagerTest {
 
 @org.mockito.Mock
 private InternalTopologyBuilder topologyBuilder;
-@Mock(type = MockType.DEFAULT)
+@org.mockito.Mock

Review Comment:
   Can we now import this and avoid the fully qualified name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16383) fix flaky test IdentityReplicationIntegrationTest.testReplicateFromLatest()

2024-03-18 Thread Johnny Hsu (Jira)
Johnny Hsu created KAFKA-16383:
--

 Summary: fix flaky test 
IdentityReplicationIntegrationTest.testReplicateFromLatest()
 Key: KAFKA-16383
 URL: https://issues.apache.org/jira/browse/KAFKA-16383
 Project: Kafka
  Issue Type: Bug
Reporter: Johnny Hsu
Assignee: Johnny Hsu


Build link: 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15463/4/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplicateFromLatest__/]

 

This test failed in build in several PR, which is flaky



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [WIP] Splitting consumer tests [kafka]

2024-03-18 Thread via GitHub


lianetm closed pull request #15535: [WIP] Splitting consumer tests
URL: https://github.com/apache/kafka/pull/15535


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15951) MissingSourceTopicException should include topic names

2024-03-18 Thread sanghyeok An (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827968#comment-17827968
 ] 

sanghyeok An commented on KAFKA-15951:
--

Hi, [~mjsax] !

May i take this issue and work for it? 

I take a look at code already, and i think i can handle it.

> MissingSourceTopicException should include topic names
> --
>
> Key: KAFKA-15951
> URL: https://issues.apache.org/jira/browse/KAFKA-15951
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> As the title say – we don't include topic names in all cases, what make it 
> hard for users to identify the root cause more clearly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KIP-780: Support fine-grained compression options [kafka]

2024-03-18 Thread via GitHub


KevinZTW opened a new pull request, #1:
URL: https://github.com/apache/kafka/pull/1

   KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-780%3A+Support+fine-grained+compression+options#KIP780:Supportfinegrainedcompressionoptions-Producer
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]

2024-03-18 Thread via GitHub


johnnychhsu commented on code in PR #15483:
URL: https://github.com/apache/kafka/pull/15483#discussion_r1528503006


##
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java:
##
@@ -78,6 +111,10 @@ public Measurable measurable() {
 }
 }
 
+/**
+ * Set the metric config.

Review Comment:
   I understand that for the config setter, users should not use it because 
it's aimed for server-side caller. However, why are users not allowed to fetch 
the config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-18 Thread via GitHub


johnnychhsu commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1528497039


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -79,9 +79,34 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+  def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch()
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch(topic=topicNameWithCustomConfigs)
+// In LogAppendTime's case, if the timestamps are the same, we choose the 
offset of the first record

Review Comment:
   @showuon just updated, thanks for the review!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]

2024-03-18 Thread via GitHub


Vaibhav-Nazare commented on PR #13817:
URL: https://github.com/apache/kafka/pull/13817#issuecomment-2003833677

   Hi @mimaison additional jenkinsfile for ppc64le has been added , also as per 
my understanding some changes would be required at 
https://ci-builds.apache.org/job/Kafka/job/kafka/ to have a new job configured 
for ppc64le


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-18 Thread via GitHub


johnnychhsu commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1528477660


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -79,9 +79,34 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+  def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch()
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch(topic=topicNameWithCustomConfigs)
+// In LogAppendTime's case, if the timestamps are the same, we choose the 
offset of the first record

Review Comment:
   totally agree, let me check this, thanks for the suggestion! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription

2024-03-18 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827950#comment-17827950
 ] 

Phuc Hong Tran commented on KAFKA-15538:


[~lianetm] [~kirktrue], I have a question. What component supposed to update 
the subscription after metadata got fetched?

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, newbie, regex
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito [kafka]

2024-03-18 Thread via GitHub


clolov commented on code in PR #15254:
URL: https://github.com/apache/kafka/pull/15254#discussion_r1528403432


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -785,7 +776,6 @@ public void shouldNotReturnStateUpdaterTasksInOwnedTasks() {
 final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
 final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
 
-when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask));

Review Comment:
   This was reported as unnecessary stubbing by Mockito



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2629,7 +2601,6 @@ public void 
shouldNotCommitNonCorruptedRestoringActiveTasksAndNotCommitRunningSt
 when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, 
corruptedTask)));
 when(tasks.task(taskId02)).thenReturn(corruptedTask);
 final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-when(stateUpdater.getTasks()).thenReturn(mkSet(activeRestoringTask, 
standbyTask));

Review Comment:
   This was reported as unnecessary stubbing by Mockito



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito [kafka]

2024-03-18 Thread via GitHub


clolov commented on PR #15254:
URL: https://github.com/apache/kafka/pull/15254#issuecomment-2003726516

   Heya @cadonna! This should be rebased, tests ought to be passing and all 
comments are addressed . Let me know if there is something else you would 
suggest improving!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Expose earliest local timestamp via the GetOffsetShell [kafka]

2024-03-18 Thread via GitHub


clolov commented on PR #14788:
URL: https://github.com/apache/kafka/pull/14788#issuecomment-2003676807

   Closing this in favour of https://issues.apache.org/jira/browse/KAFKA-15857


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Expose earliest local timestamp via the GetOffsetShell [kafka]

2024-03-18 Thread via GitHub


clolov closed pull request #14788: MINOR: Expose earliest local timestamp via 
the GetOffsetShell
URL: https://github.com/apache/kafka/pull/14788


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16322) Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1

2024-03-18 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827933#comment-17827933
 ] 

Chia-Ping Tsai commented on KAFKA-16322:


[~omkreddy] thanks for backporting this.

> Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1
> --
>
> Key: KAFKA-16322
> URL: https://issues.apache.org/jira/browse/KAFKA-16322
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Johnny Hsu
>Priority: Major
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> https://devhub.checkmarx.com/cve-details/CVE-2023-50572/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Revert to Gradle 8.5 [kafka]

2024-03-18 Thread via GitHub


jlprat commented on PR #15553:
URL: https://github.com/apache/kafka/pull/15553#issuecomment-2003653879

   I managed to reproduce the issue with Gradle 8.6 once, but now that I'm 
trying to reproduce it again, it seems that incremental compilation works as 
expected (I tried it on `trunk` and modified `kafka.admin.AclCommandTest.scala` 
as well).
   I think we should 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update dependencies [kafka]

2024-03-18 Thread via GitHub


jlprat commented on code in PR #15404:
URL: https://github.com/apache/kafka/pull/15404#discussion_r1528355399


##
gradle/dependencies.gradle:
##
@@ -100,10 +100,10 @@ versions += [
   commonsCli: "1.4",
   commonsValidator: "1.7",
   dropwizardMetrics: "4.1.12.1",
-  gradle: "8.5",
+  gradle: "8.6",

Review Comment:
   I see some inconsistencies with this report.
   I managed to reproduce it only once. But when I tried it again, incremental 
compilation worked as expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Update upgrade docs to refer 3.6.2 version [kafka]

2024-03-18 Thread via GitHub


omkreddy opened a new pull request, #15554:
URL: https://github.com/apache/kafka/pull/15554

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-18 Thread via GitHub


lucasbru merged PR #15511:
URL: https://github.com/apache/kafka/pull/15511


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1527973416


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftWindowOpen = false;
+boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;
 final KeyValue, 
LeftOrRightValue> next = it.next();
 final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = next.key;
-final LeftOrRightValue value = next.value;
-final K key = timestampedKeyAndJoinSide.getKey();
 final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
 sharedTimeTracker.minTime = timestamp;
 
-// Skip next records if window has not closed
+// Skip next records if window has not closed yet
+// We rely on the  
ordering of KeyValueIterator
 final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
 if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
 if (timestampedKeyAndJoinSide.isLeftSide()) {
-outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-} else {
-outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-}
-if (outerJoinLeftBreak && outerJoinRightBreak) {
-break; // there are no more candidates to emit on 
left-outerJoin-side and
-// right-outerJoin-side
+outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
 } else {
-continue; // there are possibly candidates left on 
the other outerJoin-side
+outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
 }
 }
 
-final VOut nullJoinedValue;
-if (isLeftSide) {
-nullJoinedValue = joiner.apply(key,
-value.getLeftValue(),
-value.getRightValue());
-} else {
-nullJoinedValue = joiner.apply(key,
-(V1) value.getRightValue(),
-(V2) value.getLeftValue());
+if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
+// if windows are open for both joinSides we can break 
since there are no more candidates to emit
+break;
+}  else if (windowOpenForJoinSide(outerJoinLeftWindowOpen, 
outerJoinRightWindowOpen, timestampedKeyAndJoinSide)) {
+// else if  window is open only for this joinSide we 
continue with the next outer record
+continue;
 }
 
-context().forward(
-
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
-);
-
-if (prevKey != null && 
!prevKey.equals(timestampedKeyAndJoinSide)) {
-// blind-delete the previous key from the outer window 
store now it is emitted;
-// we do this because this delete would remove the 
whole list of values of the same key,
-// and hence if we delete eagerly and then fail, we 
would miss emitting join results of the later
-// values in the list.
-// we do not use delete() calls since it would incur 
extra get()
-store.put(prevKey, null);
+final K key = timestampedKeyAndJoinSide.getKey();
+final LeftOrRightValue leftOrRightValue = 
next.value;
+final VOut nullJoinedValue = getNullJoinedValue(key, 
leftOrRightValue);
+if (nullJoinedValue != null) {

Review Comment:
   Don't know.
   I guess null-checks are the default in my system ;-).
   Removed the null check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to 

Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1527968309


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -727,7 +801,7 @@ public void testWindowing() {
 }
 
 @Test
-public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
+public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {

Review Comment:
   Ok.
   sorry, can revert this ofcourse.



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -727,7 +801,7 @@ public void testWindowing() {
 }
 
 @Test
-public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
+public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {

Review Comment:
   Ok.
   sorry, can revert this of course.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1528271251


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -511,14 +511,88 @@ public void testGracePeriod() {
 // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
 // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
-inputTopic2.pipeInput(0, "dummy", 211);
+inputTopic2.pipeInput(0, "dummy", 112);
 processor.checkAndClearProcessResult(
 new KeyValueTimestamp<>(1, "null+a1", 0L),
 new KeyValueTimestamp<>(0, "A0+null", 0L)
 );
 }
 }
 
+@Test
+public void testEmitAllNonJoinedResultsForAsymmetricWindow() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.outerJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+// push one item to the primary stream; this should not produce 
any items because there are no joins
+// and window has not ended
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29) }
+// --> w2 = {}
+inputTopic1.pipeInput(0, "A0", 29L);
+processor.checkAndClearProcessResult();
+
+// push another item to the primary stream; this should not 
produce any items because there are no joins
+// and window has not ended
+// w1 = { 0:A0 (ts: 29) }
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+// --> w2 = {}
+inputTopic1.pipeInput(1, "A1", 30L);
+processor.checkAndClearProcessResult();
+
+// push one item to the other stream; this should not produce any 
items because there are no joins
+// and window has not ended
+// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+// --> w2 = { 2:a2 (ts: 31) }
+inputTopic2.pipeInput(2, "a2", 31L);
+processor.checkAndClearProcessResult();
+
+// push another item to the other stream; this should produce no 
joined-items because there are no joins 

Review Comment:
   Good idea, added a step with right hand side record on ts=37



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1528270350


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -511,14 +511,88 @@ public void testGracePeriod() {
 // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
 // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
-inputTopic2.pipeInput(0, "dummy", 211);
+inputTopic2.pipeInput(0, "dummy", 112);
 processor.checkAndClearProcessResult(
 new KeyValueTimestamp<>(1, "null+a1", 0L),
 new KeyValueTimestamp<>(0, "A0+null", 0L)
 );
 }
 }
 
+@Test
+public void testEmitAllNonJoinedResultsForAsymmetricWindow() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.outerJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+// push one item to the primary stream; this should not produce 
any items because there are no joins
+// and window has not ended
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29) }
+// --> w2 = {}
+inputTopic1.pipeInput(0, "A0", 29L);
+processor.checkAndClearProcessResult();
+
+// push another item to the primary stream; this should not 
produce any items because there are no joins
+// and window has not ended
+// w1 = { 0:A0 (ts: 29) }
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+// --> w2 = {}
+inputTopic1.pipeInput(1, "A1", 30L);
+processor.checkAndClearProcessResult();
+
+// push one item to the other stream; this should not produce any 
items because there are no joins
+// and window has not ended
+// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+// --> w2 = { 2:a2 (ts: 31) }
+inputTopic2.pipeInput(2, "a2", 31L);
+processor.checkAndClearProcessResult();
+

Review Comment:
   Good idea, added a step with right hand side record on ts=36



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config

2024-03-18 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-16272:
-

Assignee: Sagar Rao

> Update connect_distributed_test.py to support KIP-848’s group protocol config
> -
>
> Key: KAFKA-16272
> URL: https://issues.apache.org/jira/browse/KAFKA-16272
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Sagar Rao
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in {{connect_distributed_test.py}} 
> to support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use [kafka]

2024-03-18 Thread via GitHub


edoardocomar commented on PR #15530:
URL: https://github.com/apache/kafka/pull/15530#issuecomment-2003461536

   thanks @showuon !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use [kafka]

2024-03-18 Thread via GitHub


edoardocomar merged PR #15530:
URL: https://github.com/apache/kafka/pull/15530


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use [kafka]

2024-03-18 Thread via GitHub


edoardocomar commented on code in PR #15530:
URL: https://github.com/apache/kafka/pull/15530#discussion_r1528223058


##
core/src/test/scala/unit/kafka/server/KafkaServerTest.scala:
##
@@ -42,6 +43,24 @@ class KafkaServerTest extends QuorumTestHarness {
 TestUtils.shutdownServers(Seq(server1, server2))
   }
 
+  @Test
+  def testListenerPortAlreadyInUse(): Unit = {
+val serverSocket = new ServerSocket(0, 0, InetAddress.getLoopbackAddress)
+
+var kafkaServer : Option[KafkaServer] = None
+try {
+  TestUtils.waitUntilTrue(() => serverSocket.isBound, "Server socket 
failed to bind.")
+  // start a server with listener on the port already bound
+  assertThrows(classOf[RuntimeException],
+() => kafkaServer = 
Option(createServerWithListenerOnPort(serverSocket.getLocalPort)),
+"Exepected RuntimeException because of KafkaServer startup failure due 
do address already in use"

Review Comment:
   fixed and rephrased, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-18 Thread via GitHub


showuon commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1528195441


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -79,9 +79,34 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+  def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch()
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch(topic=topicNameWithCustomConfigs)
+// In LogAppendTime's case, if the timestamps are the same, we choose the 
offset of the first record

Review Comment:
   These codes to verify the logAppendTime has duplicated quite a lot in this 
test suite. Could we extract the into another method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: [Tiered] Update localLogStartOffset before deleting segments in memory t… [kafka]

2024-03-18 Thread via GitHub


omkreddy commented on PR #15141:
URL: https://github.com/apache/kafka/pull/15141#issuecomment-2003349984

   If we want this to be 3.6.2 release, we need to merge the PR in next couple 
of days. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16322) Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1

2024-03-18 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16322:
--
Fix Version/s: 3.7.1

> Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1
> --
>
> Key: KAFKA-16322
> URL: https://issues.apache.org/jira/browse/KAFKA-16322
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Johnny Hsu
>Priority: Major
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> https://devhub.checkmarx.com/cve-details/CVE-2023-50572/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16322) Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1

2024-03-18 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16322:
--
Fix Version/s: 3.6.2

> Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1
> --
>
> Key: KAFKA-16322
> URL: https://issues.apache.org/jira/browse/KAFKA-16322
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Johnny Hsu
>Priority: Major
> Fix For: 3.6.2, 3.8.0
>
>
> https://devhub.checkmarx.com/cve-details/CVE-2023-50572/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16210) Upgrade jose4j to 0.9.4

2024-03-18 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16210:
--
Fix Version/s: 3.6.2

> Upgrade jose4j to 0.9.4
> ---
>
> Key: KAFKA-16210
> URL: https://issues.apache.org/jira/browse/KAFKA-16210
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Major
> Fix For: 3.7.0, 3.6.2, 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-18 Thread Stanislav Spiridonov (Jira)
Stanislav Spiridonov created KAFKA-16382:


 Summary: Kafka Streams drop NULL values after reset
 Key: KAFKA-16382
 URL: https://issues.apache.org/jira/browse/KAFKA-16382
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.1
Reporter: Stanislav Spiridonov


Kafka Streams (KTable) drops null values after full reset.

See 
[https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
 for sample topology

Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
 # Start example - 1st round
 # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
 # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
 # Stop application 
 # Run kafka-streams-application-reset 
{code:java}
call bin/windows/kafka-streams-application-reset --application-id 
nullproblem-example^
 --input-topics "NULL-IN,NULL-IN-AUX"^
 --bootstrap-server "localhost:9092"
{code}

 # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
running yet)
 # Start example - 2nd round
 # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
 # Expected output "A1:anull, A1:ab, A1:"

The issue is NOT reproduced if application just restarted (skip step 5). 

The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Revert to Gradle 8.5 [kafka]

2024-03-18 Thread via GitHub


jlprat commented on PR #15553:
URL: https://github.com/apache/kafka/pull/15553#issuecomment-2003177508

   I accidentally committed the `core/data` files from some failed test run.
   @dajac feel free to review now. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Revert to Gradle 8.5 [kafka]

2024-03-18 Thread via GitHub


jlprat commented on PR #15553:
URL: https://github.com/apache/kafka/pull/15553#issuecomment-2003170494

   Oops, you are right @dajac I don't know what happened. I'll check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Revert to Gradle 8.5 [kafka]

2024-03-18 Thread via GitHub


dajac commented on PR #15553:
URL: https://github.com/apache/kafka/pull/15553#issuecomment-2003168800

   @jlprat Thanks for the PR. It looks like the PR contains unwanted files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update dependencies [kafka]

2024-03-18 Thread via GitHub


jlprat commented on code in PR #15404:
URL: https://github.com/apache/kafka/pull/15404#discussion_r1528029938


##
gradle/dependencies.gradle:
##
@@ -100,10 +100,10 @@ versions += [
   commonsCli: "1.4",
   commonsValidator: "1.7",
   dropwizardMetrics: "4.1.12.1",
-  gradle: "8.5",
+  gradle: "8.6",

Review Comment:
   https://github.com/apache/kafka/pull/15553 PR reverting to the old version.
   
   @pasharik have you reported the bug (if it is indeed a bug in Gradle) to the 
upstream project?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Revert to Gradle 8.5 [kafka]

2024-03-18 Thread via GitHub


jlprat opened a new pull request, #15553:
URL: https://github.com/apache/kafka/pull/15553

   Rollback to Gradle 8.5.
   When upgrading to Gradle 8.6, Scala incremental compilation seems to be 
broken. See https://github.com/apache/kafka/pull/15404#discussion_r1526739190
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >