[jira] [Updated] (KAFKA-16222) KRaft Migration: Incorrect default user-principal quota after migration
[ https://issues.apache.org/jira/browse/KAFKA-16222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-16222: -- Fix Version/s: 3.6.2 3.8.0 3.7.1 > KRaft Migration: Incorrect default user-principal quota after migration > --- > > Key: KAFKA-16222 > URL: https://issues.apache.org/jira/browse/KAFKA-16222 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.7.0, 3.6.1 >Reporter: Dominik >Assignee: PoAn Yang >Priority: Blocker > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > We observed that our default user quota seems not to be migrated correctly. > Before Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for the *default user-principal* are > consumer_byte_rate=100.0, producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}@{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > After Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for *user-principal ''* are consumer_byte_rate=100.0, > producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}%40{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > > Additional finding: Our names contains a "@" which also lead to incorrect > after migration state. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16073) Kafka Tiered Storage: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-16073: -- Fix Version/s: 3.7.1 > Kafka Tiered Storage: Consumer Fetch Error Due to Delayed localLogStartOffset > Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]
omkreddy commented on PR #15481: URL: https://github.com/apache/kafka/pull/15481#issuecomment-2005776638 I am including this to 3.6.2 release plan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13361) Support fine-grained compression options
[ https://issues.apache.org/jira/browse/KAFKA-13361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828150#comment-17828150 ] Cheng-Kai, Zhang edited comment on KAFKA-13361 at 3/19/24 4:05 AM: --- Hi [~dongjin] [~mimaison] I am interested to this issue, and it seems to be manageable for a newbie like me. Do you think I could help on this one? My plan is to follow current structure [~mimaison] currently working on to add those config. There is a draft PR in a very early stage available [here|https://github.com/apache/kafka/pull/1]. was (Author: JIRAUSER304479): Hi [~dongjin] [~mimaison] I am interested to this issue, and it seems to be manageable for a newbie like me. Do you think I could help on this one? My plan is to follow current structure [~mimaison] currently working on to add those config. There is a draft PR in a very early stage available here. > Support fine-grained compression options > > > Key: KAFKA-13361 > URL: https://issues.apache.org/jira/browse/KAFKA-13361 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > Labels: needs-kip > > Adds the following options into the Producer, Broker, and Topic > configurations: > * compression.gzip.buffer: the buffer size that feeds raw input into the > Deflator or is fed by the uncompressed output from the Deflator. (available: > [512, ), default: 8192(=8kb).) > * compression.snappy.block: the block size that snappy uses. (available: > [1024, ), default: 32768(=32kb).) > * compression.lz4.block: the block size that lz4 uses. (available: [4, 7], > (means 64kb, 256kb, 1mb, 4mb respectively), default: 4.) > * compression.zstd.window: enables long mode; the log of the window size > that zstd uses to memorize the compressing data. (available: [10, 22], > default: 0 (disables long mode.)) > All of the above are different but somewhat in common from the point of > compression process in that it impacts the memorize size during the process. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16384) KRaft controller number recommendation
Peter created KAFKA-16384: - Summary: KRaft controller number recommendation Key: KAFKA-16384 URL: https://issues.apache.org/jira/browse/KAFKA-16384 Project: Kafka Issue Type: Bug Components: controller, kraft Reporter: Peter There seems to be some conflicting information about how many controllers should be used for a KRaft cluster. The first section listed mentions 3 or 5 controllers may be used, but the second section mentions no more than 3 should be used at the moment. https://kafka.apache.org/documentation/#kraft_voter > A Kafka admin will typically select 3 or 5 servers for this role, depending > on factors like cost and the number of concurrent failures your system should > withstand without availability impact. A majority of the controllers must be > alive in order to maintain availability. With 3 controllers, the cluster can > tolerate 1 controller failure; with 5 controllers, the cluster can tolerate 2 > controller failures. https://kafka.apache.org/documentation/#kraft_deployment > For redundancy, a Kafka cluster should use 3 controllers. More than 3 > controllers is not recommended in critical environments. In the rare case of > a partial network failure it is possible for the cluster metadata quorum to > become unavailable. This limitation will be addressed in a future release of > Kafka. Is 3 still the recommended number and is there more information on what the network issues are that could cause issues when using 5 controllers? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1529639335 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1529639335 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +
Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]
showuon commented on PR #15481: URL: https://github.com/apache/kafka/pull/15481#issuecomment-2005672241 Re-triggering the CI build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13361) Support fine-grained compression options
[ https://issues.apache.org/jira/browse/KAFKA-13361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828150#comment-17828150 ] Cheng-Kai, Zhang commented on KAFKA-13361: -- Hi [~dongjin] [~mimaison] I am interested to this issue, and it seems to be manageable for a newbie like me. Do you think I could help on this one? My plan is to follow current structure [~mimaison] currently working on to add those config. There is a draft PR in a very early stage available here. > Support fine-grained compression options > > > Key: KAFKA-13361 > URL: https://issues.apache.org/jira/browse/KAFKA-13361 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > Labels: needs-kip > > Adds the following options into the Producer, Broker, and Topic > configurations: > * compression.gzip.buffer: the buffer size that feeds raw input into the > Deflator or is fed by the uncompressed output from the Deflator. (available: > [512, ), default: 8192(=8kb).) > * compression.snappy.block: the block size that snappy uses. (available: > [1024, ), default: 32768(=32kb).) > * compression.lz4.block: the block size that lz4 uses. (available: [4, 7], > (means 64kb, 256kb, 1mb, 4mb respectively), default: 4.) > * compression.zstd.window: enables long mode; the log of the window size > that zstd uses to memorize the compressing data. (available: [10, 22], > default: 0 (disables long mode.)) > All of the above are different but somewhat in common from the point of > compression process in that it impacts the memorize size during the process. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan merged PR #15524: URL: https://github.com/apache/kafka/pull/15524 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan commented on PR #15524: URL: https://github.com/apache/kafka/pull/15524#issuecomment-2005610376 I am seeing it fail more often on your branch (for a few runs), but after merging with trunk, it seemed better. I will go ahead and merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]
jolshan commented on code in PR #15533: URL: https://github.com/apache/kafka/pull/15533#discussion_r1529489216 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -1904,16 +2000,7 @@ public void testReconciliationProcess() { new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId1) .setMemberEpoch(11) -.setHeartbeatIntervalMs(5000) Review Comment: so we are no longer sending a reassignment because we were incorrectly doing so before? (ie they weren't full requests and didn't need new assignments) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]
jolshan commented on code in PR #15533: URL: https://github.com/apache/kafka/pull/15533#discussion_r1529486638 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1190,10 +1190,11 @@ private CoordinatorResult consumerGr .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs); // The assignment is only provided in the following cases: -// 1. The member reported its owned partitions; -// 2. The member just joined or rejoined to group (epoch equals to zero); -// 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { +// 1. The member sent a full request. It does so when joining or rejoining the group; or +//on any errors (e.g. timeout). +// 2. The member's assignment has been updated. +boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 && subscribedTopicNames != null && ownedTopicPartitions != null); Review Comment: I think I'm missing how ```(rebalanceTimeoutMs != -1 && subscribedTopicNames != null && ownedTopicPartitions != null)``` maps to ```// It does so when joining or rejoining the group; or on any errors (e.g. timeout).``` Are we instead saying that this particular case -- ie setting all these 3 fields indicates a "full" request? I think we should update the comment to make this a bit clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]
jeffkbkim commented on PR #15533: URL: https://github.com/apache/kafka/pull/15533#issuecomment-2005338241 > This patch changes the logic to check ownedTopicPartitions, subscribedTopicNames and rebalanceTimeoutMs as they are the only three non optional fields. They are the only three optional fields, right? Also, my understanding is that there are cases where we don't set subscribedTopicNames and rebalanceTimeoutMs but do set ownedTopicPartitions. Do you have examples of this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14359) Idempotent Producer continues to retry on OutOfOrderSequence error when first batch fails
[ https://issues.apache.org/jira/browse/KAFKA-14359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-14359: -- Assignee: Justine Olshan > Idempotent Producer continues to retry on OutOfOrderSequence error when first > batch fails > - > > Key: KAFKA-14359 > URL: https://issues.apache.org/jira/browse/KAFKA-14359 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When the idempotent producer does not have any state it can fall into a state > where the producer keeps retrying an out of order sequence. Consider the > following scenario where an idempotent producer has retries and delivery > timeout are int max (a configuration used in streams). > 1. A producer send out several batches (up to 5) with the first one starting > at sequence 0. > 2. The first batch with sequence 0 fails due to a transient error (ie, > NOT_LEADER_OR_FOLLOWER or a timeout error) > 3. The second batch, say with sequence 200 comes in. Since there is no > previous state to invalidate it, it gets written to the log > 4. The original batch is retried and will get an out of order sequence number > 5. Current java client will continue to retry this batch, but it will never > resolve. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on PR #14706: URL: https://github.com/apache/kafka/pull/14706#issuecomment-2005099068 The test failures are irrelevant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16383: use public constructor [kafka]
chia7712 commented on code in PR #15556: URL: https://github.com/apache/kafka/pull/15556#discussion_r1529285548 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ## @@ -460,7 +460,7 @@ public static abstract class AbstractTestConverter extends TestConverter { } public static class TestConverterWithPrivateConstructor extends TestConverter { -private TestConverterWithPrivateConstructor() { +public TestConverterWithPrivateConstructor() { Review Comment: I don't dig in it yet, but it seems the private constructor is what it tries to test ( according to the naming "TestConverterWithPrivateConstructor") -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan commented on PR #15524: URL: https://github.com/apache/kafka/pull/15524#issuecomment-2004976461 I ran "until failure" on trunk and it took about 30 runs to fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dongnuo123 commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1529263591 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationConfig.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum GroupProtocolMigrationConfig { Review Comment: Yeah I agree. It does feel a bit weird.. Let me change it to migration policy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
artemlivshits commented on PR #15524: URL: https://github.com/apache/kafka/pull/15524#issuecomment-2004953080 LogDirFailureTest passed locally as well (probably just a flake) and it doesn't seem to use any of the code that I touched, so unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15756: [2/3] Migrate existing integration tests to run old protocol in new coordinator [kafka]
kirktrue commented on code in PR #14675: URL: https://github.com/apache/kafka/pull/14675#discussion_r1529229638 ## core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala: ## @@ -89,6 +96,8 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { val producer = createProducer() producerSend(producer, numRecords) + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "3") + Review Comment: Is this override still needed for the tests to pass? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
[ https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828089#comment-17828089 ] Kirk True commented on KAFKA-16217: --- [~calvinliu]—I reassigned this to you as you've already started tackling it. Thanks! > Transactional producer stuck in IllegalStateException during close > -- > > Key: KAFKA-16217 > URL: https://issues.apache.org/jira/browse/KAFKA-16217 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.7.0, 3.6.1 >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > Labels: transactions > Fix For: 3.8.0, 3.7.1, 3.6.3 > > > The producer is stuck during the close. It keeps retrying to abort the > transaction but it never succeeds. > {code:java} > [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | > producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > org.apache.kafka.clients.producer.internals.Sender run - [Producer > clientId=producer-transaction-ben > ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, > transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > Error in kafka producer I/O thread while aborting transaction: > java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` > because the previous call to `commitTransaction` timed out and must be retried > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) > at java.base/java.lang.Thread.run(Thread.java:1583) > at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) > {code} > With the additional log, I found the root cause. If the producer is in a bad > transaction state(in my case, the TransactionManager.pendingTransition was > set to commitTransaction and did not get cleaned), then the producer calls > close and tries to abort the existing transaction, the producer will get > stuck in the transaction abortion. It is related to the fix > [https://github.com/apache/kafka/pull/13591]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close
[ https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16217: - Assignee: Calvin Liu (was: Kirk True) > Transactional producer stuck in IllegalStateException during close > -- > > Key: KAFKA-16217 > URL: https://issues.apache.org/jira/browse/KAFKA-16217 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.7.0, 3.6.1 >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > Labels: transactions > Fix For: 3.8.0, 3.7.1, 3.6.3 > > > The producer is stuck during the close. It keeps retrying to abort the > transaction but it never succeeds. > {code:java} > [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | > producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > org.apache.kafka.clients.producer.internals.Sender run - [Producer > clientId=producer-transaction-ben > ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, > transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] > Error in kafka producer I/O thread while aborting transaction: > java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` > because the previous call to `commitTransaction` timed out and must be retried > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) > at java.base/java.lang.Thread.run(Thread.java:1583) > at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) > {code} > With the additional log, I found the root cause. If the producer is in a bad > transaction state(in my case, the TransactionManager.pendingTransition was > set to commitTransaction and did not get cleaned), then the producer calls > close and tries to abort the existing transaction, the producer will get > stuck in the transaction abortion. It is related to the fix > [https://github.com/apache/kafka/pull/13591]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]
kirktrue commented on PR #15541: URL: https://github.com/apache/kafka/pull/15541#issuecomment-2004890051 @CalvinConfluent—thanks for the PR! This PR doesn't have any unit tests to verify the new behavior. Would it be possible to migrate the test case from your _other PR_ (#15336) to _this PR_? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
jeffkbkim commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1529191724 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. +processor.enqueueFirst(new CoordinatorInternalEvent("HighWatermarkUpdate", tp, () -> { Review Comment: this should be just `enqueueFirst(...)`, not `processor.enqueueFirst(...)` right? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. Review Comment: Let's say there are two HWM advancements, to offset `h1` and `h2` respectively. (`h1 < h2`) The first HWM advancement to `h1` will set lastHighWatermark to `NO_OFFSET` and enqueueFirst() HWM update event. Before the first event runs, let's say the HWM advances to `h2`. this will see that lastHighWatermark is `NO_OFFSET` and will skip enqueueFirst(). Doesn't this mean that all write events waiting for committed offset `h1 < committed_offset <= h2` cannot complete until the HWM advances again? I wonder if we can: * keep track of highest HWM updated * only enqueueFirst if the offset to update is greater than highest HWM recorded Would this work? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp);
[PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]
wernerdv opened a new pull request, #15558: URL: https://github.com/apache/kafka/pull/15558 Log the list of topics for which an authorization error has been received when try to describe configs, along with the cluster alias. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Log reason for deleting a kraft snapshot [kafka]
jsancio merged PR #15478: URL: https://github.com/apache/kafka/pull/15478 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16345: Optionally URL-encode clientID and clientSecret in authorization header [kafka]
kirktrue commented on code in PR #15475: URL: https://github.com/apache/kafka/pull/15475#discussion_r1529074359 ## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ## @@ -192,6 +192,12 @@ public class SaslConfigs { + " be inspected for the standard OAuth \"iss\" claim and if this value is set, the broker will match it exactly against what is in the JWT's \"iss\" claim. If there is no" + " match, the broker will reject the JWT and authentication will fail."; +public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = "sasl.oauthbearer.header.urlencode.enable"; Review Comment: It was added here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1025%3A+Optionally+URL-encode+clientID+and+clientSecret+in+authorization+header -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1526788841 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -551,9 +580,11 @@ class BrokerLifecycleManager( } private def scheduleNextCommunication(intervalNs: Long): Unit = { -trace(s"Scheduling next communication at ${MILLISECONDS.convert(intervalNs, NANOSECONDS)} " + +val nanos = if (nextSchedulingShouldBeImmediate) 0 else intervalNs Review Comment: nanos => adjustedIntervalNs? ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest { Collections.emptyMap(), OptionalLong.empty()) poll(ctx, manager, registration) +def nextHeartbeatDirs(): Set[String] = + poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData( +.data().offlineLogDirs().asScala.map(_.toString).toSet +assertEquals(Set.empty, nextHeartbeatDirs()) manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) +assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs()) manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) +assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), nextHeartbeatDirs()) Review Comment: It seems this could still be flaky. `poll(ctx, manager, registration) ` A CommunicationEvent c1 with a delay of 0 is scheduled in manager.eventQueue. `assertEquals(Set.empty, nextHeartbeatDirs()) ` c1 is processed in `ctx.mockChannelManager` and a BrokerHeartbeatResponseEvent b1 is added to `manager.eventQueue`. b1 is processed at `manager.eventQueue` and a CommunicationEvent c2 with a delay of 100 is scheduled in `manager.eventQueue`. `manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) ` An OfflineDirEvent o1 is appended to `manager.eventQueue`, but not yet processed. `assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs()) ` nextHeartbeatDirs() causes the time to advance quickly and passes 100. c2 is processed at `manager.eventQueue` and a HeartBeat request h1 is added to `ctx.mockChannelManager`. o1 is processed at `manager.eventQueue` and a CommunicationEvent c3 is appended to `manager.eventQueue`. c3 is processed at `manager.eventQueue` sets `nextSchedulingShouldBeImmediate` to true. h1 is processed by `ctx.mockChannelManager` and adds BrokerHeartbeatResponseEvent b2 to `manager.eventQueue`. `manager.eventQueue` processes b2 and schedules a CommunicationEvent c4 with a delay of 0 in `manager.eventQueue`. c4 is processed at `manager.eventQueue` and a HeartBeat request h2 (doesn't pick up ej8Q9_d2Ri6FXNiTxKFiow) is added to `ctx.mockChannelManager`. `manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))` `assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), nextHeartbeatDirs())` Now the above assertion will fail since `nextHeartbeatDirs()` will pick up c4 which doesn't include ej8Q9_d2Ri6FXNiTxKFiow. ## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ## @@ -513,4 +514,36 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + +/** + * Returns the deferred event that the queue is waiting for, idling until + * its deadline comes, if there is any. + * If the queue has immediate work to do, this returns empty. + * This is useful for unit tests, where to make progress, we need to + * speed the clock up until the next scheduled event is ready to run. + */ +public Optional scheduledAfterIdling() { Review Comment: scheduledAfterIdling => firstDeferredIfIdling ? ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest { @Test def testKraftJBODMetadataVersionUpdateEvent(): Unit = { -val context = new RegistrationTestContext(configProperties) -val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) +val ctx = new RegistrationTestContext(configProperties) +val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) val controllerNode = new Node(3000, "localhost", 8021) -context.controllerNodeProvider.node.set(controllerNode) -manager.start(() => context.highestMetadataOffset.get(), - context.mockChannelManager, context.clusterId, context.advertisedListeners, +
[jira] [Commented] (KAFKA-15282) Implement client support for KIP-848 client-side assignors
[ https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828075#comment-17828075 ] Kirk True commented on KAFKA-15282: --- [~zxcoccer]—you're certainly welcome to look at this. I would note that there's a little uncertainty if we need this functionality or not. That's one of the reasons we haven't tackled it yet. > Implement client support for KIP-848 client-side assignors > -- > > Key: KAFKA-15282 > URL: https://issues.apache.org/jira/browse/KAFKA-15282 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: kip-848-client-support > Fix For: 4.0.0 > > > The client-side assignor provides the logic for the partition assignments > instead of on the server. Client-side assignment is the main approach used by > the “old protocol” for divvying up partitions. While the “new protocol” > favors server-side assignors, the client-side assignor will continue to be > used for backward compatibility, including KSQL, Connect, etc. > Note: I _*think*_ that the client-side assignor logic and the reconciliation > logic can remain separate from each other. We should strive to keep the two > pieces unencumbered, unless it’s unavoidable. > This task includes: > * Validate the client’s configuration for assignor selection > * Integrate with the new {{PartitionAssignor}} interface to invoke the logic > from the user-provided assignor implementation > * Implement the necessary logic around the request/response from the > {{ConsumerGroupPrepareAssignment}} RPC call using the information from the > {{PartitionAssignor}} above > * Implement the necessary logic around the request/response from the > {{ConsumerGroupInstallAssignment}} RPC call, again using the information > calculated by the {{PartitionAssignor}} > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito [kafka]
clolov commented on code in PR #15254: URL: https://github.com/apache/kafka/pull/15254#discussion_r1529018575 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -190,7 +181,7 @@ public class TaskManagerTest { @org.mockito.Mock private InternalTopologyBuilder topologyBuilder; -@Mock(type = MockType.DEFAULT) +@org.mockito.Mock Review Comment: Hopefully done on both accounts ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan commented on PR #15524: URL: https://github.com/apache/kafka/pull/15524#issuecomment-2004586672 @artemlivshits it may be worth checking if the test is failing on trunk as well. If so we can renew the JIRA to fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use [kafka]
edoardocomar commented on PR #15530: URL: https://github.com/apache/kafka/pull/15530#issuecomment-2004581473 fix cherry picked to 3.6 and 3.7 branches -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828067#comment-17828067 ] Edoardo Comar commented on KAFKA-16369: --- fix cherry picked to 3.6 and 3.7 > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.6.2, 3.8.0, 3.7.1 > > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan commented on PR #15524: URL: https://github.com/apache/kafka/pull/15524#issuecomment-2004580064 > LogDirFailureTest > testIOExceptionDuringLogRoll(String) has been failing for a while -- there are some issues I tried to tackle with respect to it, but we ran into some issues. See https://github.com/apache/kafka/pull/15354 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]
dongnuo123 commented on code in PR #15546: URL: https://github.com/apache/kafka/pull/15546#discussion_r1529010875 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -9298,6 +9298,120 @@ public void testOnConsumerGroupStateTransitionOnLoading() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } +@Test +public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() { +String classicGroupId = "classic-group-id"; +String memberId = Uuid.randomUuid().toString(); +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.build(); +ClassicGroup classicGroup = new ClassicGroup( +new LogContext(), +classicGroupId, +EMPTY, +context.time, +context.metrics +); +context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); +assertThrows(GroupIdNotFoundException.class, () -> +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(classicGroupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setServerAssignor("range") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setTopicPartitions(Collections.emptyList(; +} + +@Test +public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { +String classicGroupId = "classic-group-id"; +String memberId = Uuid.randomUuid().toString(); +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.build(); +ClassicGroup classicGroup = new ClassicGroup( +new LogContext(), +classicGroupId, +EMPTY, +context.time, +context.metrics +); +context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + +CoordinatorResult result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(classicGroupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setServerAssignor("range") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setTopicPartitions(Collections.emptyList())); + +assertEquals(0, result.response().errorCode()); + assertEquals(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), result.records().get(0)); +assertEquals(Group.GroupType.CONSUMER, + context.groupMetadataManager.getOrMaybeCreateConsumerGroup(classicGroupId, false).type()); +} + +@Test +public void testClassicGroupJoinWithNonEmptyConsumerGroup() throws Exception { +String consumerGroupId = "consumer-group-id"; +String memberId = Uuid.randomUuid().toString(); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setState(MemberState.STABLE) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.build())) +.build(); + +JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(consumerGroupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); + +GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); +assertEquals(Errors.GROUP_ID_NOT_FOUND.code(), joinResult.joinFuture.get().errorCode()); +} + +@Test +public void testClassicGroupJoinWithEmptyConsumerGroup() throws Exception { +String consumerGroupId = "consumer-group-id"; +
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1529005304 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3040,6 +3048,7 @@ class ReplicaManagerTest { transactionalId = transactionalId, entriesPerPartition = entriesToAppend, responseCallback = appendCallback, + apiVersionErrorMapper = genericError Review Comment: Is there a reason some of these are generic and some are default? I wonder if we can configure the test to use the "highest" enum -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1528996611 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -3523,6 +3524,128 @@ public void testForegroundInvalidStateTransitionIsRecoverable() { assertFalse(transactionManager.hasOngoingTransaction()); } +@Test +public void testAbortableTransactionExceptionInInitProducerId() { +TransactionalRequestResult initPidResult = transactionManager.initializeTransactions(); +prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); +runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); +assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + +prepareInitPidResponse(Errors.ABORTABLE_TRANSACTION, false, producerId, RecordBatch.NO_PRODUCER_EPOCH); +runUntil(transactionManager::hasError); +assertTrue(initPidResult.isCompleted()); +assertFalse(initPidResult.isSuccessful()); +assertThrows(AbortableTransactionException.class, initPidResult::await); +assertAbortableError(AbortableTransactionException.class); +} + +@Test +public void testAbortableTransactionExceptionInAddPartitions() { +final TopicPartition tp = new TopicPartition("foo", 0); + +doInitTransactions(); + +transactionManager.beginTransaction(); +transactionManager.maybeAddPartition(tp); + +prepareAddPartitionsToTxn(tp, Errors.ABORTABLE_TRANSACTION); +runUntil(transactionManager::hasError); +assertTrue(transactionManager.lastError() instanceof AbortableTransactionException); + +assertAbortableError(AbortableTransactionException.class); +} + +@Test +public void testAbortableTransactionExceptionInFindCoordinator() { +doInitTransactions(); + +transactionManager.beginTransaction(); +TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction( +singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), new ConsumerGroupMetadata(consumerGroupId)); + +prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch); +runUntil(() -> !transactionManager.hasPartitionsToAdd()); + +prepareFindCoordinatorResponse(Errors.ABORTABLE_TRANSACTION, false, CoordinatorType.GROUP, consumerGroupId); +runUntil(transactionManager::hasError); +assertTrue(transactionManager.lastError() instanceof AbortableTransactionException); + +runUntil(sendOffsetsResult::isCompleted); +assertFalse(sendOffsetsResult.isSuccessful()); +assertTrue(sendOffsetsResult.error() instanceof AbortableTransactionException); + +assertAbortableError(AbortableTransactionException.class); +} + +@Test +public void testAbortableTransactionExceptionInEndTxn() throws InterruptedException { +doInitTransactions(); + +transactionManager.beginTransaction(); +transactionManager.maybeAddPartition(tp0); +TransactionalRequestResult commitResult = transactionManager.beginCommit(); + +Future responseFuture = appendToAccumulator(tp0); + +assertFalse(responseFuture.isDone()); +prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); +prepareProduceResponse(Errors.NONE, producerId, epoch); +prepareEndTxnResponse(Errors.ABORTABLE_TRANSACTION, TransactionResult.COMMIT, producerId, epoch); + +runUntil(commitResult::isCompleted); +runUntil(responseFuture::isDone); + +assertThrows(KafkaException.class, commitResult::await); +assertFalse(commitResult.isSuccessful()); +assertTrue(commitResult.isAcked()); + +assertAbortableError(AbortableTransactionException.class); +// make sure the exception was thrown directly from the follow-up calls. Review Comment: nit: was this comment meant to be before further checks? I noticed this test was a little different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1528996611 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -3523,6 +3524,128 @@ public void testForegroundInvalidStateTransitionIsRecoverable() { assertFalse(transactionManager.hasOngoingTransaction()); } +@Test +public void testAbortableTransactionExceptionInInitProducerId() { +TransactionalRequestResult initPidResult = transactionManager.initializeTransactions(); +prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); +runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); +assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + +prepareInitPidResponse(Errors.ABORTABLE_TRANSACTION, false, producerId, RecordBatch.NO_PRODUCER_EPOCH); +runUntil(transactionManager::hasError); +assertTrue(initPidResult.isCompleted()); +assertFalse(initPidResult.isSuccessful()); +assertThrows(AbortableTransactionException.class, initPidResult::await); +assertAbortableError(AbortableTransactionException.class); +} + +@Test +public void testAbortableTransactionExceptionInAddPartitions() { +final TopicPartition tp = new TopicPartition("foo", 0); + +doInitTransactions(); + +transactionManager.beginTransaction(); +transactionManager.maybeAddPartition(tp); + +prepareAddPartitionsToTxn(tp, Errors.ABORTABLE_TRANSACTION); +runUntil(transactionManager::hasError); +assertTrue(transactionManager.lastError() instanceof AbortableTransactionException); + +assertAbortableError(AbortableTransactionException.class); +} + +@Test +public void testAbortableTransactionExceptionInFindCoordinator() { +doInitTransactions(); + +transactionManager.beginTransaction(); +TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction( +singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), new ConsumerGroupMetadata(consumerGroupId)); + +prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch); +runUntil(() -> !transactionManager.hasPartitionsToAdd()); + +prepareFindCoordinatorResponse(Errors.ABORTABLE_TRANSACTION, false, CoordinatorType.GROUP, consumerGroupId); +runUntil(transactionManager::hasError); +assertTrue(transactionManager.lastError() instanceof AbortableTransactionException); + +runUntil(sendOffsetsResult::isCompleted); +assertFalse(sendOffsetsResult.isSuccessful()); +assertTrue(sendOffsetsResult.error() instanceof AbortableTransactionException); + +assertAbortableError(AbortableTransactionException.class); +} + +@Test +public void testAbortableTransactionExceptionInEndTxn() throws InterruptedException { +doInitTransactions(); + +transactionManager.beginTransaction(); +transactionManager.maybeAddPartition(tp0); +TransactionalRequestResult commitResult = transactionManager.beginCommit(); + +Future responseFuture = appendToAccumulator(tp0); + +assertFalse(responseFuture.isDone()); +prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); +prepareProduceResponse(Errors.NONE, producerId, epoch); +prepareEndTxnResponse(Errors.ABORTABLE_TRANSACTION, TransactionResult.COMMIT, producerId, epoch); + +runUntil(commitResult::isCompleted); +runUntil(responseFuture::isDone); + +assertThrows(KafkaException.class, commitResult::await); +assertFalse(commitResult.isSuccessful()); +assertTrue(commitResult.isAcked()); + +assertAbortableError(AbortableTransactionException.class); +// make sure the exception was thrown directly from the follow-up calls. Review Comment: nit: was this comment meant to be before further checks? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on PR #15486: URL: https://github.com/apache/kafka/pull/15486#issuecomment-2004546341 Hey @sjhajharia thanks for the updates. I think `ApiVersionErrorMapper` is also a bit tricky since the addPartitions change will not be related to errors at all. That's why I was thinking of something a bit more generic. "Operation" was probably the right direction rather than "errors". Sorry this part is a bit nuanced. Maybe something like "supportedOperation" and the comment could be something like the supported operation based on the client's request API version? And yes, I think we should bump all the API versions to support this new error. I can update the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]
wcarlson5 commented on PR #15414: URL: https://github.com/apache/kafka/pull/15414#issuecomment-2004539886 @mjsax I added some testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]
fvaleri commented on code in PR #15273: URL: https://github.com/apache/kafka/pull/15273#discussion_r1471706520 ## raft/src/main/java/org/apache/kafka/raft/RaftConfig.java: ## @@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) { } } +private static Set parseProcessRoles(List processRoles, Map voterConnections, int nodeId) { Review Comment: KafkaConfig:1537, I kept the original code here. I don't remember the exact reason. I have to move ProcessRole from server to server-common to avoid circular dependency. Let me know if this is fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16297) Race condition while promoting future replica can lead to partition unavailability.
[ https://issues.apache.org/jira/browse/KAFKA-16297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-16297: Description: KIP-858 proposed that when a directory failure occurs after changing the assignment of a replica that's moved between two directories in the same broker, but before the future replica promotion completes, the broker should reassign the replica to inform the controller of its correct status. But this hasn't yet been implemented, and without it this failure may lead to indefinite partition unavailability. Example scenario: # A broker which leads partition P receives a request to alter the replica from directory A to directory B. # The broker creates a future replica in directory B and starts a replica fetcher. # Once the future replica first catches up, the broker queues a reassignment to inform the controller of the directory change. # The next time the replica catches up, the broker briefly blocks appends and promotes the replica. However, before the promotion is attempted, directory A fails. # The controller was informed that P in now in directory B before it received the notification that directory A has failed, so it does not elect a new leader, and as long as the broker is online, partition A remains unavailable. was: KIP-858 proposed that when a directory failure occurs after changing the assignment of a replica that's moved between two directories in the same broker, but before the future replica promotion completes, the broker should reassign the replica to inform the controller of its correct status. But this hasn't yet been implemented, and without it this failure may lead to indefinite partition unavailability. Example scenario: # A broker which leads partition P receives a request to alter the replica from directory A to directory B. # The broker creates a future replica in directory B and starts a replica fetcher. # Once the future replica first catches up, the broker queues a reassignment to inform the controller of the directory change. # The next time the replica catches up, the broker briefly blocks appends and promotes the replica. However, before the promotion is attempted, directory A fails. # The controller was informed that P in now in directory B before it received the notification that directory A has failed, so it does not elect a new leader, and as long as the broker is online, partition A remains unavailable. As per KIP-858, the broker should detect this scenario and queue a reassignment of P into directory ID {{{}DirectoryId.LOST{}}}. > Race condition while promoting future replica can lead to partition > unavailability. > --- > > Key: KAFKA-16297 > URL: https://issues.apache.org/jira/browse/KAFKA-16297 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > > KIP-858 proposed that when a directory failure occurs after changing the > assignment of a replica that's moved between two directories in the same > broker, but before the future replica promotion completes, the broker should > reassign the replica to inform the controller of its correct status. But this > hasn't yet been implemented, and without it this failure may lead to > indefinite partition unavailability. > Example scenario: > # A broker which leads partition P receives a request to alter the replica > from directory A to directory B. > # The broker creates a future replica in directory B and starts a replica > fetcher. > # Once the future replica first catches up, the broker queues a reassignment > to inform the controller of the directory change. > # The next time the replica catches up, the broker briefly blocks appends > and promotes the replica. However, before the promotion is attempted, > directory A fails. > # The controller was informed that P in now in directory B before it > received the notification that directory A has failed, so it does not elect a > new leader, and as long as the broker is online, partition A remains > unavailable. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16363) Storage crashes if dir is unavailable
[ https://issues.apache.org/jira/browse/KAFKA-16363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-16363: --- Assignee: (was: Igor Soarez) > Storage crashes if dir is unavailable > - > > Key: KAFKA-16363 > URL: https://issues.apache.org/jira/browse/KAFKA-16363 > Project: Kafka > Issue Type: Sub-task > Components: tools >Affects Versions: 3.7.0 >Reporter: Igor Soarez >Priority: Major > > The storage tool crashes if one of the configured log directories is > unavailable. > > {code:java} > sh-4.4# ./bin/kafka-storage.sh format --ignore-formatted -t $KAFKA_CLUSTER_ID > -c server.properties > [2024-03-11 17:51:05,391] ERROR Error while reading meta.properties file > /data/d2/meta.properties > (org.apache.kafka.metadata.properties.MetaPropertiesEnsemble) > java.nio.file.AccessDeniedException: /data/d2/meta.properties > at > java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:90) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) > at > java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:218) > at java.base/java.nio.file.Files.newByteChannel(Files.java:380) > at java.base/java.nio.file.Files.newByteChannel(Files.java:432) > at > java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422) > at java.base/java.nio.file.Files.newInputStream(Files.java:160) > at > org.apache.kafka.metadata.properties.PropertiesUtils.readPropertiesFile(PropertiesUtils.java:77) > at > org.apache.kafka.metadata.properties.MetaPropertiesEnsemble$Loader.load(MetaPropertiesEnsemble.java:135) > at kafka.tools.StorageTool$.formatCommand(StorageTool.scala:431) > at kafka.tools.StorageTool$.main(StorageTool.scala:95) > at kafka.tools.StorageTool.main(StorageTool.scala) > metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, > dirs={/data/d1: MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, > nodeId=101, directoryId=zm7fSw3zso9aR0AtuzsI_A), /data/metadata: > MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, nodeId=101, > directoryId=eRO8vOP7ddbpx_W2ZazjLw), /data/d2: ERROR}) > I/O error trying to read log directory /data/d2. > {code} > When configured with multiple directories, Kafka tolerates some of them (but > not all) being inaccessible, so this tool should be able to handle the same > scenarios without crashing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
fvaleri commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1528843407 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +
Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]
wcarlson5 commented on code in PR #15414: URL: https://github.com/apache/kafka/pull/15414#discussion_r1528822628 ## streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java: ## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; + +@Timeout(600) +@Tag("integration") +public class GlobalStateReprocessTest { +private static final int NUM_BROKERS = 1; +private static final Properties BROKER_CONFIG; + +static { +BROKER_CONFIG = new Properties(); +BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1); +BROKER_CONFIG.put("transaction.state.log.min.isr", 1); +} + +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG); + +@BeforeAll +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterAll +public static void closeCluster() { +CLUSTER.stop(); +} + + +private final MockTime mockTime = CLUSTER.time; +private final String globalStore = "globalStore"; +private StreamsBuilder builder; +private Properties streamsConfiguration; +private KafkaStreams kafkaStreams; +private String globalStoreTopic; + + +@BeforeEach +public void before(final TestInfo testInfo) throws Exception { +builder = new StreamsBuilder(); + +createTopics(); +streamsConfiguration = new Properties(); +final String safeTestName = safeUniqueTestName(testInfo); +streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); +streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); +streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); +streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); +streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + +final KeyValueStoreBuilder storeBuilder = new KeyValueStoreBuilder<>( +
Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]
wcarlson5 commented on code in PR #15414: URL: https://github.com/apache/kafka/pull/15414#discussion_r1528810589 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java: ## @@ -236,6 +252,91 @@ private List topicPartitionsForStore(final StateStore store) { } return topicPartitions; } +@SuppressWarnings("unchecked") +private void reprocessState(final List topicPartitions, +final Map highWatermarks, +final InternalTopologyBuilder.ReprocessFactory reprocessFactory, +final String storeName) { +final Processor source = reprocessFactory.processorSupplier().get(); +source.init(globalProcessorContext); + +for (final TopicPartition topicPartition : topicPartitions) { +long currentDeadline = NO_DEADLINE; + +globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; +final Long checkpoint = checkpointFileCache.get(topicPartition); +if (checkpoint != null) { +globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; +} else { + globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +offset = getGlobalConsumerOffset(topicPartition); +} +final Long highWatermark = highWatermarks.get(topicPartition); +stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); + +long restoreCount = 0L; + +while (offset < highWatermark) { +// we add `request.timeout.ms` to `poll.ms` because `poll.ms` might be too short +// to give a fetch request a fair chance to actually complete and we don't want to +// start `task.timeout.ms` too early +// +// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 we can just call +// `poll(pollMS)` without adding the request timeout and do a more precise +// timeout handling +final ConsumerRecords records = globalConsumer.poll(pollMsPlusRequestTimeout); +if (records.isEmpty()) { +currentDeadline = maybeUpdateDeadlineOrThrow(currentDeadline); +} else { +currentDeadline = NO_DEADLINE; +} + +for (final ConsumerRecord record : records.records(topicPartition)) { +final ProcessorRecordContext recordContext = +new ProcessorRecordContext( +record.timestamp(), +record.offset(), +record.partition(), +record.topic(), +record.headers()); +globalProcessorContext.setRecordContext(recordContext); + +try { +if (record.key() != null) { +source.process(new Record<>( + reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()), + reprocessFactory.valueDeserializer().deserialize(record.topic(), record.value()), +record.timestamp(), +record.headers())); +restoreCount++; +} +} catch (final Exception deserializationException) { +handleDeserializationFailure( Review Comment: That was my first thought too. Maybe we could refactor it a bit more, but a `RecordDeserializer` wants a `SourceNode`. It seemed like that would be changing more surfaces than necessary. But we can -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]
wcarlson5 commented on code in PR #15414: URL: https://github.com/apache/kafka/pull/15414#discussion_r1528807717 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java: ## @@ -68,40 +68,49 @@ ConsumerRecord deserialize(final ProcessorContext processo Optional.empty() ); } catch (final Exception deserializationException) { -final DeserializationExceptionHandler.DeserializationHandlerResponse response; -try { -response = deserializationExceptionHandler.handle( -(InternalProcessorContext) processorContext, -rawRecord, -deserializationException); -} catch (final Exception fatalUserException) { -log.error( -"Deserialization error callback failed after deserialization error for record {}", -rawRecord, -deserializationException); -throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException); -} +handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor); +return null; Review Comment: yeah we can add a note -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]
lianetm commented on code in PR #15533: URL: https://github.com/apache/kafka/pull/15533#discussion_r1528798734 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -1650,6 +1650,102 @@ public void testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferen .setTopicPartitions(Collections.emptyList(; } +@Test +public void testConsumerGroupHeartbeatFullResponse() { +String groupId = "fooup"; +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +// Create a context with one consumer group containing two members. Review Comment: group containing 1 member here right? 2 partitions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]
lianetm commented on code in PR #15533: URL: https://github.com/apache/kafka/pull/15533#discussion_r1528793006 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1190,10 +1190,11 @@ private CoordinatorResult consumerGr .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs); // The assignment is only provided in the following cases: -// 1. The member reported its owned partitions; -// 2. The member just joined or rejoined to group (epoch equals to zero); -// 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { +// 1. The member sent a full request. It does so when joining or rejoining the group; or +//on any errors (e.g. timeout). +// 2. The member's assignment has been updated. +boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 && subscribedTopicNames != null && ownedTopicPartitions != null); Review Comment: nice, perfectly aligned with the client side. Just for the record, along with the `rebalanceTimeout`, `topics` and `assignment`, the client will also include the server assignor in any full request, but only if it's configured, so agree on not including it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +
[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
[ https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16375: --- Labels: client-transitions-issues kip-848-client-support (was: kip-848-client-support) > Fix logic for discarding reconciliation if member rejoined > -- > > Key: KAFKA-16375 > URL: https://issues.apache.org/jira/browse/KAFKA-16375 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > The current implementation of the new consumer discards the result of a > reconciliation if the member rejoined, based on a comparison of the member > epoch at the start and end of the reconciliation. If the epochs changed the > reconciliation is discarded. This is not right because the member epoch could > be incremented without an assignment change. This should be fixed to ensure > that the reconciliation is discarded if the member rejoined, probably based > on a flag that truly reflects that it went through a transition to joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
gaoran10 commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1528650747 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +
[jira] [Assigned] (KAFKA-15517) Improve MirrorMaker logging in case of authorization errors
[ https://issues.apache.org/jira/browse/KAFKA-15517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Werner reassigned KAFKA-15517: - Assignee: Dmitry Werner > Improve MirrorMaker logging in case of authorization errors > --- > > Key: KAFKA-15517 > URL: https://issues.apache.org/jira/browse/KAFKA-15517 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Mickael Maison >Assignee: Dmitry Werner >Priority: Major > > In case MirrorMaker is missing DESCRIBE_CONFIGS on the source cluster, all > youget in the logs are lines like: > {noformat} > 2023-09-27 11:56:54,989 ERROR > [my-cluster-source->my-cluster-target.MirrorSourceConnector|worker] Scheduler > for MirrorSourceConnector caught exception in scheduled task: refreshing > topics (org.apache.kafka.connect.mirror.Scheduler) [Scheduler for > MirrorSourceConnector-refreshing topics] > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicAuthorizationException: Topic > authorization failed. > {noformat} > It would be good to report the exact call that failed and include the cluster > as well to make it easy to figure out which permissions are missing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls
[ https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827990#comment-17827990 ] Lianet Magrans commented on KAFKA-15551: Also, given how tight the deadline is to get the fix/PR in, I would suggest we focus on the new consumer only. If we find things that could be improved in this sense in the old one, we could file a separate Jira for it and tackle it afterwards. > Evaluate conditions for short circuiting consumer API calls > --- > > Key: KAFKA-15551 > URL: https://issues.apache.org/jira/browse/KAFKA-15551 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > For conditions like: > * Committing empty offset > * Fetching offsets for empty partitions > * Getting empty topic partition position > Should be short circuit possibly at the API level. > As a bonus, we should double-check whether the existing {{KafkaConsumer}} > implementation suffers from this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls
[ https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827989#comment-17827989 ] Lianet Magrans commented on KAFKA-15551: Hey [~zxcoccer], thanks for jumping in! This one should definitely be a simple one, as I know that we do handle it already for some API calls (ex. validate positions early return if no positions to validate [here|https://github.com/apache/kafka/blob/5c929874b88b3b96f650de0f733d93d42ac535a4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java#L227]). Still, we wanted to verify that we are doing similarly for all the API calls, to ensure that we're not doing unneeded processing/requests. We want to have this for 3.8, so the deadline is really tight, but if you have availability it would be a great help, feel free to re-assign it to you and ping me anytime if you have questions. Thanks! > Evaluate conditions for short circuiting consumer API calls > --- > > Key: KAFKA-15551 > URL: https://issues.apache.org/jira/browse/KAFKA-15551 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > For conditions like: > * Committing empty offset > * Fetching offsets for empty partitions > * Getting empty topic partition position > Should be short circuit possibly at the API level. > As a bonus, we should double-check whether the existing {{KafkaConsumer}} > implementation suffers from this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito [kafka]
cadonna commented on code in PR #15254: URL: https://github.com/apache/kafka/pull/15254#discussion_r1528601954 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -190,7 +181,7 @@ public class TaskManagerTest { @org.mockito.Mock private InternalTopologyBuilder topologyBuilder; -@Mock(type = MockType.DEFAULT) +@org.mockito.Mock Review Comment: @clolov If you resolve Ismaels comment, could you also please remove the `Mockito` prefix for other method calls like `verify()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827976#comment-17827976 ] Edoardo Comar commented on KAFKA-16369: --- fix merged in trunk thanks [~showuon] > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.6.2, 3.8.0, 3.7.1 > > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-16369: -- Fix Version/s: 3.8.0 > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.8.0 > > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-16369: -- Fix Version/s: 3.6.2 3.7.1 > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.6.2, 3.8.0, 3.7.1 > > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] use public constructor [kafka]
johnnychhsu opened a new pull request, #15556: URL: https://github.com/apache/kafka/pull/15556 ## Context This test failed in several PR, and from one of the failed build, there was an error log ``` [2024-03-12 13:58:12,744] ERROR Failed to discover HeaderConverter in classpath: Unable to instantiate TestConverterWithPrivateConstructor: Plugin class default constructor must be public (org.apache.kafka.connect.runtime.isolation.ReflectionScanner:138) java.lang.IllegalAccessException: class org.apache.kafka.connect.runtime.isolation.ReflectionScanner cannot access a member of class org.apache.kafka.connect.integration.ConnectorValidationIntegrationTest$TestConverterWithPrivateConstructor with modifiers "private" ``` in [this build](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15463/4/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplicateFromLatest__/). Jira ticket: [KAFKA-16383](https://issues.apache.org/jira/browse/KAFKA-16383). I guess this could be the potential cause. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito [kafka]
ijuma commented on code in PR #15254: URL: https://github.com/apache/kafka/pull/15254#discussion_r1528552125 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -190,7 +181,7 @@ public class TaskManagerTest { @org.mockito.Mock private InternalTopologyBuilder topologyBuilder; -@Mock(type = MockType.DEFAULT) +@org.mockito.Mock Review Comment: Can we now import this and avoid the fully qualified name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16383) fix flaky test IdentityReplicationIntegrationTest.testReplicateFromLatest()
Johnny Hsu created KAFKA-16383: -- Summary: fix flaky test IdentityReplicationIntegrationTest.testReplicateFromLatest() Key: KAFKA-16383 URL: https://issues.apache.org/jira/browse/KAFKA-16383 Project: Kafka Issue Type: Bug Reporter: Johnny Hsu Assignee: Johnny Hsu Build link: [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15463/4/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplicateFromLatest__/] This test failed in build in several PR, which is flaky -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [WIP] Splitting consumer tests [kafka]
lianetm closed pull request #15535: [WIP] Splitting consumer tests URL: https://github.com/apache/kafka/pull/15535 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15951) MissingSourceTopicException should include topic names
[ https://issues.apache.org/jira/browse/KAFKA-15951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827968#comment-17827968 ] sanghyeok An commented on KAFKA-15951: -- Hi, [~mjsax] ! May i take this issue and work for it? I take a look at code already, and i think i can handle it. > MissingSourceTopicException should include topic names > -- > > Key: KAFKA-15951 > URL: https://issues.apache.org/jira/browse/KAFKA-15951 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > As the title say – we don't include topic names in all cases, what make it > hard for users to identify the root cause more clearly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KIP-780: Support fine-grained compression options [kafka]
KevinZTW opened a new pull request, #1: URL: https://github.com/apache/kafka/pull/1 KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-780%3A+Support+fine-grained+compression+options#KIP780:Supportfinegrainedcompressionoptions-Producer *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]
johnnychhsu commented on code in PR #15483: URL: https://github.com/apache/kafka/pull/15483#discussion_r1528503006 ## clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java: ## @@ -78,6 +111,10 @@ public Measurable measurable() { } } +/** + * Set the metric config. Review Comment: I understand that for the config setter, users should not use it because it's aimed for server-side caller. However, why are users not allowed to fetch the config? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
johnnychhsu commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1528497039 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -79,9 +79,34 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testThreeRecordsInSeparateBatch(quorum: String): Unit = { + def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch() +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch(topic=topicNameWithCustomConfigs) +// In LogAppendTime's case, if the timestamps are the same, we choose the offset of the first record Review Comment: @showuon just updated, thanks for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]
Vaibhav-Nazare commented on PR #13817: URL: https://github.com/apache/kafka/pull/13817#issuecomment-2003833677 Hi @mimaison additional jenkinsfile for ppc64le has been added , also as per my understanding some changes would be required at https://ci-builds.apache.org/job/Kafka/job/kafka/ to have a new job configured for ppc64le -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
johnnychhsu commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1528477660 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -79,9 +79,34 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testThreeRecordsInSeparateBatch(quorum: String): Unit = { + def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch() +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch(topic=topicNameWithCustomConfigs) +// In LogAppendTime's case, if the timestamps are the same, we choose the offset of the first record Review Comment: totally agree, let me check this, thanks for the suggestion! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827950#comment-17827950 ] Phuc Hong Tran commented on KAFKA-15538: [~lianetm] [~kirktrue], I have a question. What component supposed to update the subscription after metadata got fetched? > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Blocker > Labels: kip-848-client-support, newbie, regex > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. > As part of this task, we should re-enable all integration tests defined in > the PlainTextAsyncConsumer that relate to subscription with pattern and that > are currently disabled for the new consumer + new protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito [kafka]
clolov commented on code in PR #15254: URL: https://github.com/apache/kafka/pull/15254#discussion_r1528403432 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -785,7 +776,6 @@ public void shouldNotReturnStateUpdaterTasksInOwnedTasks() { final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); -when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask)); Review Comment: This was reported as unnecessary stubbing by Mockito ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2629,7 +2601,6 @@ public void shouldNotCommitNonCorruptedRestoringActiveTasksAndNotCommitRunningSt when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, corruptedTask))); when(tasks.task(taskId02)).thenReturn(corruptedTask); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); -when(stateUpdater.getTasks()).thenReturn(mkSet(activeRestoringTask, standbyTask)); Review Comment: This was reported as unnecessary stubbing by Mockito -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito [kafka]
clolov commented on PR #15254: URL: https://github.com/apache/kafka/pull/15254#issuecomment-2003726516 Heya @cadonna! This should be rebased, tests ought to be passing and all comments are addressed . Let me know if there is something else you would suggest improving! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Expose earliest local timestamp via the GetOffsetShell [kafka]
clolov commented on PR #14788: URL: https://github.com/apache/kafka/pull/14788#issuecomment-2003676807 Closing this in favour of https://issues.apache.org/jira/browse/KAFKA-15857 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Expose earliest local timestamp via the GetOffsetShell [kafka]
clolov closed pull request #14788: MINOR: Expose earliest local timestamp via the GetOffsetShell URL: https://github.com/apache/kafka/pull/14788 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16322) Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1
[ https://issues.apache.org/jira/browse/KAFKA-16322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827933#comment-17827933 ] Chia-Ping Tsai commented on KAFKA-16322: [~omkreddy] thanks for backporting this. > Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1 > -- > > Key: KAFKA-16322 > URL: https://issues.apache.org/jira/browse/KAFKA-16322 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Johnny Hsu >Priority: Major > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > https://devhub.checkmarx.com/cve-details/CVE-2023-50572/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Revert to Gradle 8.5 [kafka]
jlprat commented on PR #15553: URL: https://github.com/apache/kafka/pull/15553#issuecomment-2003653879 I managed to reproduce the issue with Gradle 8.6 once, but now that I'm trying to reproduce it again, it seems that incremental compilation works as expected (I tried it on `trunk` and modified `kafka.admin.AclCommandTest.scala` as well). I think we should -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update dependencies [kafka]
jlprat commented on code in PR #15404: URL: https://github.com/apache/kafka/pull/15404#discussion_r1528355399 ## gradle/dependencies.gradle: ## @@ -100,10 +100,10 @@ versions += [ commonsCli: "1.4", commonsValidator: "1.7", dropwizardMetrics: "4.1.12.1", - gradle: "8.5", + gradle: "8.6", Review Comment: I see some inconsistencies with this report. I managed to reproduce it only once. But when I tried it again, incremental compilation worked as expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Update upgrade docs to refer 3.6.2 version [kafka]
omkreddy opened a new pull request, #15554: URL: https://github.com/apache/kafka/pull/15554 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru merged PR #15511: URL: https://github.com/apache/kafka/pull/15511 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1527973416 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftWindowOpen = false; +boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; final KeyValue, LeftOrRightValue> next = it.next(); final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = next.key; -final LeftOrRightValue value = next.value; -final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; -// Skip next records if window has not closed +// Skip next records if window has not closed yet +// We rely on the ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { -outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side -} else { -outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side -} -if (outerJoinLeftBreak && outerJoinRightBreak) { -break; // there are no more candidates to emit on left-outerJoin-side and -// right-outerJoin-side +outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side } else { -continue; // there are possibly candidates left on the other outerJoin-side +outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side } } -final VOut nullJoinedValue; -if (isLeftSide) { -nullJoinedValue = joiner.apply(key, -value.getLeftValue(), -value.getRightValue()); -} else { -nullJoinedValue = joiner.apply(key, -(V1) value.getRightValue(), -(V2) value.getLeftValue()); +if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { +// if windows are open for both joinSides we can break since there are no more candidates to emit +break; +} else if (windowOpenForJoinSide(outerJoinLeftWindowOpen, outerJoinRightWindowOpen, timestampedKeyAndJoinSide)) { +// else if window is open only for this joinSide we continue with the next outer record +continue; } -context().forward( - record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) -); - -if (prevKey != null && !prevKey.equals(timestampedKeyAndJoinSide)) { -// blind-delete the previous key from the outer window store now it is emitted; -// we do this because this delete would remove the whole list of values of the same key, -// and hence if we delete eagerly and then fail, we would miss emitting join results of the later -// values in the list. -// we do not use delete() calls since it would incur extra get() -store.put(prevKey, null); +final K key = timestampedKeyAndJoinSide.getKey(); +final LeftOrRightValue leftOrRightValue = next.value; +final VOut nullJoinedValue = getNullJoinedValue(key, leftOrRightValue); +if (nullJoinedValue != null) { Review Comment: Don't know. I guess null-checks are the default in my system ;-). Removed the null check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1527968309 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -727,7 +801,7 @@ public void testWindowing() { } @Test -public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { +public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { Review Comment: Ok. sorry, can revert this ofcourse. ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -727,7 +801,7 @@ public void testWindowing() { } @Test -public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { +public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { Review Comment: Ok. sorry, can revert this of course. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1528271251 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -511,14 +511,88 @@ public void testGracePeriod() { // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) } -inputTopic2.pipeInput(0, "dummy", 211); +inputTopic2.pipeInput(0, "dummy", 112); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(1, "null+a1", 0L), new KeyValueTimestamp<>(0, "A0+null", 0L) ); } } +@Test +public void testEmitAllNonJoinedResultsForAsymmetricWindow() { +final StreamsBuilder builder = new StreamsBuilder(); + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.outerJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)), +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +// push one item to the primary stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = {} +// w2 = {} +// --> w1 = { 0:A0 (ts: 29) } +// --> w2 = {} +inputTopic1.pipeInput(0, "A0", 29L); +processor.checkAndClearProcessResult(); + +// push another item to the primary stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = { 0:A0 (ts: 29) } +// w2 = {} +// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } +// --> w2 = {} +inputTopic1.pipeInput(1, "A1", 30L); +processor.checkAndClearProcessResult(); + +// push one item to the other stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) } +// w2 = {} +// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } +// --> w2 = { 2:a2 (ts: 31) } +inputTopic2.pipeInput(2, "a2", 31L); +processor.checkAndClearProcessResult(); + +// push another item to the other stream; this should produce no joined-items because there are no joins Review Comment: Good idea, added a step with right hand side record on ts=37 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1528270350 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -511,14 +511,88 @@ public void testGracePeriod() { // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) } -inputTopic2.pipeInput(0, "dummy", 211); +inputTopic2.pipeInput(0, "dummy", 112); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(1, "null+a1", 0L), new KeyValueTimestamp<>(0, "A0+null", 0L) ); } } +@Test +public void testEmitAllNonJoinedResultsForAsymmetricWindow() { +final StreamsBuilder builder = new StreamsBuilder(); + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.outerJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)), +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +// push one item to the primary stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = {} +// w2 = {} +// --> w1 = { 0:A0 (ts: 29) } +// --> w2 = {} +inputTopic1.pipeInput(0, "A0", 29L); +processor.checkAndClearProcessResult(); + +// push another item to the primary stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = { 0:A0 (ts: 29) } +// w2 = {} +// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } +// --> w2 = {} +inputTopic1.pipeInput(1, "A1", 30L); +processor.checkAndClearProcessResult(); + +// push one item to the other stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) } +// w2 = {} +// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } +// --> w2 = { 2:a2 (ts: 31) } +inputTopic2.pipeInput(2, "a2", 31L); +processor.checkAndClearProcessResult(); + Review Comment: Good idea, added a step with right hand side record on ts=36 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-16272: - Assignee: Sagar Rao > Update connect_distributed_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16272 > URL: https://issues.apache.org/jira/browse/KAFKA-16272 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Sagar Rao >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{connect_distributed_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use [kafka]
edoardocomar commented on PR #15530: URL: https://github.com/apache/kafka/pull/15530#issuecomment-2003461536 thanks @showuon ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use [kafka]
edoardocomar merged PR #15530: URL: https://github.com/apache/kafka/pull/15530 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use [kafka]
edoardocomar commented on code in PR #15530: URL: https://github.com/apache/kafka/pull/15530#discussion_r1528223058 ## core/src/test/scala/unit/kafka/server/KafkaServerTest.scala: ## @@ -42,6 +43,24 @@ class KafkaServerTest extends QuorumTestHarness { TestUtils.shutdownServers(Seq(server1, server2)) } + @Test + def testListenerPortAlreadyInUse(): Unit = { +val serverSocket = new ServerSocket(0, 0, InetAddress.getLoopbackAddress) + +var kafkaServer : Option[KafkaServer] = None +try { + TestUtils.waitUntilTrue(() => serverSocket.isBound, "Server socket failed to bind.") + // start a server with listener on the port already bound + assertThrows(classOf[RuntimeException], +() => kafkaServer = Option(createServerWithListenerOnPort(serverSocket.getLocalPort)), +"Exepected RuntimeException because of KafkaServer startup failure due do address already in use" Review Comment: fixed and rephrased, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
showuon commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1528195441 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -79,9 +79,34 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testThreeRecordsInSeparateBatch(quorum: String): Unit = { + def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch() +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch(topic=topicNameWithCustomConfigs) +// In LogAppendTime's case, if the timestamps are the same, we choose the offset of the first record Review Comment: These codes to verify the logAppendTime has duplicated quite a lot in this test suite. Could we extract the into another method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: [Tiered] Update localLogStartOffset before deleting segments in memory t… [kafka]
omkreddy commented on PR #15141: URL: https://github.com/apache/kafka/pull/15141#issuecomment-2003349984 If we want this to be 3.6.2 release, we need to merge the PR in next couple of days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16322) Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1
[ https://issues.apache.org/jira/browse/KAFKA-16322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-16322: -- Fix Version/s: 3.7.1 > Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1 > -- > > Key: KAFKA-16322 > URL: https://issues.apache.org/jira/browse/KAFKA-16322 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Johnny Hsu >Priority: Major > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > https://devhub.checkmarx.com/cve-details/CVE-2023-50572/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16322) Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1
[ https://issues.apache.org/jira/browse/KAFKA-16322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-16322: -- Fix Version/s: 3.6.2 > Fix CVE-2023-50572 by updating jline from 3.22.0 to 3.25.1 > -- > > Key: KAFKA-16322 > URL: https://issues.apache.org/jira/browse/KAFKA-16322 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Johnny Hsu >Priority: Major > Fix For: 3.6.2, 3.8.0 > > > https://devhub.checkmarx.com/cve-details/CVE-2023-50572/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16210) Upgrade jose4j to 0.9.4
[ https://issues.apache.org/jira/browse/KAFKA-16210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-16210: -- Fix Version/s: 3.6.2 > Upgrade jose4j to 0.9.4 > --- > > Key: KAFKA-16210 > URL: https://issues.apache.org/jira/browse/KAFKA-16210 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Major > Fix For: 3.7.0, 3.6.2, 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16382) Kafka Streams drop NULL values after reset
Stanislav Spiridonov created KAFKA-16382: Summary: Kafka Streams drop NULL values after reset Key: KAFKA-16382 URL: https://issues.apache.org/jira/browse/KAFKA-16382 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.6.1 Reporter: Stanislav Spiridonov Kafka Streams (KTable) drops null values after full reset. See [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java] for sample topology Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics) # Start example - 1st round # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull" # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab" # Stop application # Run kafka-streams-application-reset {code:java} call bin/windows/kafka-streams-application-reset --application-id nullproblem-example^ --input-topics "NULL-IN,NULL-IN-AUX"^ --bootstrap-server "localhost:9092" {code} # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app running yet) # Start example - 2nd round # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab" # Expected output "A1:anull, A1:ab, A1:" The issue is NOT reproduced if application just restarted (skip step 5). The issue is NOT reproduced if internal cache is disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Revert to Gradle 8.5 [kafka]
jlprat commented on PR #15553: URL: https://github.com/apache/kafka/pull/15553#issuecomment-2003177508 I accidentally committed the `core/data` files from some failed test run. @dajac feel free to review now. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Revert to Gradle 8.5 [kafka]
jlprat commented on PR #15553: URL: https://github.com/apache/kafka/pull/15553#issuecomment-2003170494 Oops, you are right @dajac I don't know what happened. I'll check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Revert to Gradle 8.5 [kafka]
dajac commented on PR #15553: URL: https://github.com/apache/kafka/pull/15553#issuecomment-2003168800 @jlprat Thanks for the PR. It looks like the PR contains unwanted files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update dependencies [kafka]
jlprat commented on code in PR #15404: URL: https://github.com/apache/kafka/pull/15404#discussion_r1528029938 ## gradle/dependencies.gradle: ## @@ -100,10 +100,10 @@ versions += [ commonsCli: "1.4", commonsValidator: "1.7", dropwizardMetrics: "4.1.12.1", - gradle: "8.5", + gradle: "8.6", Review Comment: https://github.com/apache/kafka/pull/15553 PR reverting to the old version. @pasharik have you reported the bug (if it is indeed a bug in Gradle) to the upstream project? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Revert to Gradle 8.5 [kafka]
jlprat opened a new pull request, #15553: URL: https://github.com/apache/kafka/pull/15553 Rollback to Gradle 8.5. When upgrading to Gradle 8.6, Scala incremental compilation seems to be broken. See https://github.com/apache/kafka/pull/15404#discussion_r1526739190 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org