Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1587154091 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition, * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState */ Optional maybeAdvanceState(TopicPartition topicPartition, -PartitionFetchState currentFetchState); +PartitionFetchState currentFetchState) { +// This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560. +return Optional.of(currentFetchState); +} + +/** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the + * next offset following the end offset of the remote log portion. + */ +private Long buildRemoteLogAuxState(TopicPartition topicPartition, +Integer currentLeaderEpoch, +Long leaderLocalLogStartOffset, +Integer epochForLeaderLocalLogStartOffset, +Long leaderLogStartOffset, +UnifiedLog unifiedLog) throws IOException, RemoteStorageException { + +long nextOffset; + +if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) { +if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated"); + +RemoteLogManager rlm = replicaMgr.remoteLogManager().get(); + +// Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache +// until that offset +long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1; +int targetEpoch; +// If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) +// will have the same epoch. +if (epochForLeaderLocalLogStartOffset == 0) { +targetEpoch = epochForLeaderLocalLogStartOffset; +} else { +// Fetch the earlier epoch/end-offset(exclusive) from the leader. +OffsetForLeaderEpochResponseData.EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch); +// Check if the target offset lies within the range of earlier epoch. Here, epoch's end-offset is exclusive. +if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) { +// Always use the leader epoch from returned earlierEpochEndOffset. +// This gives the respective leader epoch, that will handle any gaps in epochs. +// For ex, leader epoch cache contains: +// leader-epoch start-offset +// 0 20 +// 1 85 +// <2> - gap no messages were appended in this leader epoch. +// 3 90 +// 4 98 +// There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3. +// fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90. +// So, for offset 89, we should return leader epoch as 1 like below. +targetEpoch = earlierEpochEndOffset.leaderEpoch(); +} else { +targetEpoch = epochForLeaderLocalLogStartOffset; +} +} + +Optional maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset); + +if (maybeRlsm.isPresent()) { +RemoteLogSegmentMetadata remoteLogSegmentMetadata = maybeRlsm.get(); Review Comment: Nice refactor! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
AndrewJSchofield commented on code in PR #15803: URL: https://github.com/apache/kafka/pull/15803#discussion_r1587067096 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -173,15 +173,23 @@ void awaitNotEmpty(Timer timer) { // Update the timer before we head into the loop in case it took a while to get the lock. timer.update(); -if (timer.isExpired()) +if (timer.isExpired()) { +// If the thread was interrupted before we start waiting, it still counts as +// interrupted from the point of view of the KafkaConsumer.poll(Duration) contract. +// We only need to check this when we are not going to wait because waiting +// already checks whether the thread is interrupted. +if (Thread.interrupted()) +throw new InterruptException("Thread interrupted."); Review Comment: OK. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]
showuon commented on code in PR #15817: URL: https://github.com/apache/kafka/pull/15817#discussion_r1587041449 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1217,26 +1217,41 @@ public String toString() { * @return true if the remote segment's epoch/offsets are within the leader epoch lineage of the partition. */ // Visible for testing -public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, -long logEndOffset, - NavigableMap leaderEpochs) { +static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, + long logEndOffset, + NavigableMap leaderEpochs) { long segmentEndOffset = segmentMetadata.endOffset(); // Filter epochs that does not have any messages/records associated with them. NavigableMap segmentLeaderEpochs = buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs()); // Check for out of bound epochs between segment epochs and current leader epochs. -Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey(); Integer segmentLastEpoch = segmentLeaderEpochs.lastKey(); -if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { +if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { Review Comment: I'm not sure if the check `segmentLastEpoch < leaderEpochs.firstKey()` makes sense or not. Suppose: leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)} segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 15)} Now, delete_records are called and log start offset incremented to 100, so the new leader-epoch-file-cache will be: {(9, 100)} When entering this check, it'll fail because the segmentLastEpoch (7) will be < leaderEpochs.firstKey() (9). But we still want to delete this segment, right? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1217,26 +1217,41 @@ public String toString() { * @return true if the remote segment's epoch/offsets are within the leader epoch lineage of the partition. */ // Visible for testing -public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, -long logEndOffset, - NavigableMap leaderEpochs) { +static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, + long logEndOffset, + NavigableMap leaderEpochs) { long segmentEndOffset = segmentMetadata.endOffset(); // Filter epochs that does not have any messages/records associated with them. NavigableMap segmentLeaderEpochs = buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs()); // Check for out of bound epochs between segment epochs and current leader epochs. -Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey(); Integer segmentLastEpoch = segmentLeaderEpochs.lastKey(); -if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { +if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { +LOGGER.debug("Segment {} is not within the partition leader epoch lineage. " + +"Remote segment epochs: {} and partition leader epochs: {}", +segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs); +return false; +} +// There can be overlapping remote log segments in the remote storage. (eg) +// leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)} +// segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 15)} +// segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 15), (9, 100)}, after leader-election. +// When the segment1 gets deleted, then the log-start-offset = 51 and leader-epoch-file-cache gets updated to: {(7, 51), (9, 100)}. +// While validating the segment2, we should ensure the overlapping remote log segments case. +Integer segmentFirstEpoch = segmentLeaderEpochs.ceilingKey(leaderEpochs.firstKey()); +if (segmentFirstEpoch == null || !leaderEpochs.containsKey(segmentFirstEpoch)) { Review Comment: Same here, if the above case makes sense, this check also fails to delete the segment. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ###
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
mjsax commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1587028687 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,14 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize serdes for sink node %s", name()), e); Review Comment: Should we split this up further, and have two try-catch blocks, one for the key, and one for the value, to narrow it down further and add key/value as information to the error message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]
mjsax commented on PR #13283: URL: https://github.com/apache/kafka/pull/13283#issuecomment-2089475025 Very happy to see some activity on this PR. The release plan is in the wiki: https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.8.0 Feature freeze is May/29, so there is still 4 weeks to the merge deadline. Would be great to finally close this one out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] DO NOT MERGE: Isolate Connect tests [kafka]
github-actions[bot] commented on PR #15229: URL: https://github.com/apache/kafka/pull/15229#issuecomment-2089469362 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]
mjsax commented on code in PR #15573: URL: https://github.com/apache/kafka/pull/15573#discussion_r1587018485 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java: ## @@ -19,20 +19,31 @@ public enum AssignorError { // Note: this error code should be reserved for fatal errors, as the receiving clients are future-proofed // to throw an exception upon an unrecognized error code. -NONE(0), -INCOMPLETE_SOURCE_TOPIC_METADATA(1), -VERSION_PROBING(2), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions -ASSIGNMENT_ERROR(3), -SHUTDOWN_REQUESTED(4); +NONE(0, "NONE", "NONE"), +INCOMPLETE_SOURCE_TOPIC_METADATA(1, "INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check which topics are missing, please look into the logs of the consumer group leader. Only the leaders knows and logs the name of the missing topics."), +VERSION_PROBING(2, "VERSION_PROBING", "VERSION_PROBING"), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions +ASSIGNMENT_ERROR(3, "ASSIGNMENT_ERROR", "Hit an unexpected exception during task assignment phase of rebalance."), +SHUTDOWN_REQUESTED(4, "SHUTDOWN_REQUESTED","Encountered fatal error, and should send shutdown request for the entire application."); Review Comment: ```suggestion SHUTDOWN_REQUESTED(4, "SHUTDOWN_REQUESTED", "A KafkaStreams instance encountered a fatal error and requested a shutdown for the entire application."); ``` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java: ## @@ -19,20 +19,31 @@ public enum AssignorError { // Note: this error code should be reserved for fatal errors, as the receiving clients are future-proofed // to throw an exception upon an unrecognized error code. -NONE(0), -INCOMPLETE_SOURCE_TOPIC_METADATA(1), -VERSION_PROBING(2), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions -ASSIGNMENT_ERROR(3), -SHUTDOWN_REQUESTED(4); +NONE(0, "NONE", "NONE"), +INCOMPLETE_SOURCE_TOPIC_METADATA(1, "INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check which topics are missing, please look into the logs of the consumer group leader. Only the leaders knows and logs the name of the missing topics."), +VERSION_PROBING(2, "VERSION_PROBING", "VERSION_PROBING"), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions +ASSIGNMENT_ERROR(3, "ASSIGNMENT_ERROR", "Hit an unexpected exception during task assignment phase of rebalance."), Review Comment: ```suggestion ASSIGNMENT_ERROR(3, "ASSIGNMENT_ERROR", "Internal task assignment error. Check the group leader logs for details."), ``` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java: ## @@ -19,20 +19,31 @@ public enum AssignorError { // Note: this error code should be reserved for fatal errors, as the receiving clients are future-proofed // to throw an exception upon an unrecognized error code. -NONE(0), -INCOMPLETE_SOURCE_TOPIC_METADATA(1), -VERSION_PROBING(2), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions -ASSIGNMENT_ERROR(3), -SHUTDOWN_REQUESTED(4); +NONE(0, "NONE", "NONE"), +INCOMPLETE_SOURCE_TOPIC_METADATA(1, "INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check which topics are missing, please look into the logs of the consumer group leader. Only the leaders knows and logs the name of the missing topics."), Review Comment: ```suggestion INCOMPLETE_SOURCE_TOPIC_METADATA(1, "INCOMPLETE_SOURCE_TOPIC_METADATA", "Missing metadata for source topics. Check the group leader logs for details."), ``` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java: ## @@ -19,20 +19,31 @@ public enum AssignorError { // Note: this error code should be reserved for fatal errors, as the receiving clients are future-proofed // to throw an exception upon an unrecognized error code. -NONE(0), -INCOMPLETE_SOURCE_TOPIC_METADATA(1), -VERSION_PROBING(2), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions -ASSIGNMENT_ERROR(3), -SHUTDOWN_REQUESTED(4); +NONE(0, "NONE", "NONE"), +INCOMPLETE_SOURCE_TOPIC_METADATA(1, "INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check which topics are missing, please look into the logs of the consumer group leader. Only the leaders knows and logs the name of the missing topics."), +VERSION_PROBING(2, "VERSION_PROBING", "VERSION_PROBING"), // not actually used anymore, but
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Component/s: clients > Deprecate window.size.ms and window.inner.serde.class in StreamsConfig > -- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} and {{window.inner.serde.class}} are not a true > KafkaStreams config, and are ignored when set from a KStreams application. > Both belong on the client. > KIP-1020: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Description: {{window.size.ms}} and {{window.inner.serde.class}} are not a true KafkaStreams config, and are ignored when set from a KStreams application. Both belong on the client. KIP-1020: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] was: {{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. KIP-1020: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > Deprecate window.size.ms and window.inner.serde.class in StreamsConfig > -- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} and {{window.inner.serde.class}} are not a true > KafkaStreams config, and are ignored when set from a KStreams application. > Both belong on the client. > KIP-1020: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Description: {{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. KIP-1020: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] was: {{window.size.ms}} and `is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. KIP-1020: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > Deprecate window.size.ms and window.inner.serde.class in StreamsConfig > -- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in > an error when set from a KStreams application. It belongs on the client. > KIP-1020: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Description: {{window.size.ms}} and `is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. KIP-1020: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] was: {{window.size.ms}} is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > Deprecate window.size.ms and inner.serde.class in StreamsConfig > --- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} and `is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. > KIP-1020: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Summary: Deprecate window.size.ms and window.inner.serde.class in StreamsConfig (was: Deprecate window.size.ms and inner.serde.class in StreamsConfig) > Deprecate window.size.ms and window.inner.serde.class in StreamsConfig > -- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} and `is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. > KIP-1020: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Summary: Deprecate window.size.ms and inner.serde.class in StreamsConfig (was: Deprecate window.size.ms in StreamsConfig) > Deprecate window.size.ms and inner.serde.class in StreamsConfig > --- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Description: {{window.size.ms}} is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] was:{{window.size.ms}} is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. > Deprecate window.size.ms in StreamsConfig > - > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Labels: KIP (was: needs-kip) > Deprecate window.size.ms in StreamsConfig > - > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
chia7712 commented on code in PR #15745: URL: https://github.com/apache/kafka/pull/15745#discussion_r1587006765 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -55,14 +56,21 @@ public class ClusterConfig { private final Map> perBrokerOverrideProperties; @SuppressWarnings("checkstyle:ParameterNumber") -private ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart, +private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart, SecurityProtocol securityProtocol, String listenerName, File trustStoreFile, MetadataVersion metadataVersion, Map serverProperties, Map producerProperties, Map consumerProperties, Map adminClientProperties, Map saslServerProperties, Map saslClientProperties, Map> perBrokerOverrideProperties) { +if (brokers < 0) { +throw new IllegalArgumentException("Number of brokers must be greater or equal to zero."); +} +if (controllers <= 0 || disksPerBroker <= 0) { Review Comment: `controllers <= 0` is acceptable if it is zk mode. Also, `TestKitNodes` guards against that already https://github.com/apache/kafka/blob/89d8045a15b622805f65c3c6fbfde82606921f65/core/src/test/java/kafka/testkit/TestKitNodes.java#L90 Hence, `ClusterConfig` does not require such check, and we can add comment to explain why we don't need to check `controllers` here. ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -104,6 +107,26 @@ public void testClusterTests() { } } +@ClusterTests({ +@ClusterTest(clusterType = Type.ZK), Review Comment: We should test all types for both cases: "default" + "custom" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842828#comment-17842828 ] Phuc Hong Tran commented on KAFKA-15561: [~kirktrue] just a quick question, why was this moved back to 3.8.0? > Client support for new SubscriptionPattern based subscription > - > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Blocker > Labels: kip-848-client-support, regex > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
chia7712 commented on code in PR #15843: URL: https://github.com/apache/kafka/pull/15843#discussion_r1587003306 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration timeout) { return position.offset; updateFetchPositions(timer); +timer.update(); Review Comment: BTW, please let's me know if you have no free time. I'm fine to fix it if above bug I described is existent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
chia7712 commented on code in PR #15843: URL: https://github.com/apache/kafka/pull/15843#discussion_r1586996403 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration timeout) { return position.offset; updateFetchPositions(timer); +timer.update(); Review Comment: (this comment is unrelated to this PR) It seems `AsyncConsumer#position` does not honour `WakupException`? see following test ```scala @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) @Timeout(10) def testPositionRespectsWakeup(quorum: String, groupProtocol: String): Unit = { val topicPartition = new TopicPartition("abc", 15) val consumer = createConsumer() consumer.assign(List(topicPartition).asJava) val service = Executors.newSingleThreadExecutor() service.execute(() => { TimeUnit.SECONDS.sleep(1) consumer.wakeup() }) try assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(3))) finally { service.shutdownNow() service.awaitTermination(1, TimeUnit.SECONDS) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16655) deflake ZKMigrationIntegrationTest.testDualWrite
[ https://issues.apache.org/jira/browse/KAFKA-16655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842820#comment-17842820 ] Alyssa Huang commented on KAFKA-16655: -- [https://github.com/apache/kafka/pull/15845/files] > deflake ZKMigrationIntegrationTest.testDualWrite > > > Key: KAFKA-16655 > URL: https://issues.apache.org/jira/browse/KAFKA-16655 > Project: Kafka > Issue Type: Improvement >Reporter: Alyssa Huang >Assignee: Alyssa Huang >Priority: Minor > > {code:java} > Failed to map supported failure 'org.opentest4j.AssertionFailedError: > expected: not equal but was: <0>' with mapper > 'org.gradle.api.internal.tasks.testing.failure.mappers.OpenTestAssertionFailedMapper@59b5251d': > Cannot invoke "Object.getClass()" because "obj" is null > > Task :core:test > kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ClusterInstance)[8] failed, > log available in > /Users/ahuang/ce-kafka/core/build/reports/testOutput/kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ClusterInstance)[8].test.stdout > Gradle Test Run :core:test > Gradle Test Executor 8 > > ZkMigrationIntegrationTest > testDualWrite(ClusterInstance) > testDualWrite > [8] Type=ZK, MetadataVersion=3.8-IV0, Security=PLAINTEXT FAILED > org.opentest4j.AssertionFailedError: expected: not equal but was: <0> > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:152) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at > app//org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:277) > at > app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:119) > at > app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:111) > at > app//org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2121) > at > app//kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ZkMigrationIntegrationTest.scala:995) > {code} > This test occasionally fails due to stale broker epoch exceptions, which in > turn causes allocate producer ids to fail. > Also fixes {{sendAllocateProducerIds}} erroneously returning 0 as the > `producerIdStart` in error cases (because `onComplete` only accounts for > timeouts and ignores any other error code) > {code:java} > [2024-04-12 18:45:08,820] INFO [ControllerServer id=3000] > allocateProducerIds: event failed with StaleBrokerEpochException in 19 > microseconds. (org.apache.kafka.controller.QuorumController:765) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16655 Deflaking ZKMigrationIntegrationTest.testDualWrite [kafka]
ahuang98 opened a new pull request, #15845: URL: https://github.com/apache/kafka/pull/15845 This test occasionally fails due to stale broker epoch exceptions, which in turn causes allocate producer ids to fail. Adds retries as stale broker epoch is a retriable issue, and fixes sendAllocateProducerIds returning 0 as the producerIdStart in error cases ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16655) deflake ZKMigrationIntegrationTest.testDualWrite
Alyssa Huang created KAFKA-16655: Summary: deflake ZKMigrationIntegrationTest.testDualWrite Key: KAFKA-16655 URL: https://issues.apache.org/jira/browse/KAFKA-16655 Project: Kafka Issue Type: Improvement Reporter: Alyssa Huang Assignee: Alyssa Huang {code:java} Failed to map supported failure 'org.opentest4j.AssertionFailedError: expected: not equal but was: <0>' with mapper 'org.gradle.api.internal.tasks.testing.failure.mappers.OpenTestAssertionFailedMapper@59b5251d': Cannot invoke "Object.getClass()" because "obj" is null > Task :core:test kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ClusterInstance)[8] failed, log available in /Users/ahuang/ce-kafka/core/build/reports/testOutput/kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ClusterInstance)[8].test.stdout Gradle Test Run :core:test > Gradle Test Executor 8 > ZkMigrationIntegrationTest > testDualWrite(ClusterInstance) > testDualWrite [8] Type=ZK, MetadataVersion=3.8-IV0, Security=PLAINTEXT FAILED org.opentest4j.AssertionFailedError: expected: not equal but was: <0> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:152) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:277) at app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:119) at app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:111) at app//org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2121) at app//kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ZkMigrationIntegrationTest.scala:995) {code} This test occasionally fails due to stale broker epoch exceptions, which in turn causes allocate producer ids to fail. Also fixes {{sendAllocateProducerIds}} erroneously returning 0 as the `producerIdStart` in error cases (because `onComplete` only accounts for timeouts and ignores any other error code) {code:java} [2024-04-12 18:45:08,820] INFO [ControllerServer id=3000] allocateProducerIds: event failed with StaleBrokerEpochException in 19 microseconds. (org.apache.kafka.controller.QuorumController:765) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
chia7712 commented on code in PR #15803: URL: https://github.com/apache/kafka/pull/15803#discussion_r1586968300 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -173,15 +173,23 @@ void awaitNotEmpty(Timer timer) { // Update the timer before we head into the loop in case it took a while to get the lock. timer.update(); -if (timer.isExpired()) +if (timer.isExpired()) { +// If the thread was interrupted before we start waiting, it still counts as +// interrupted from the point of view of the KafkaConsumer.poll(Duration) contract. +// We only need to check this when we are not going to wait because waiting +// already checks whether the thread is interrupted. +if (Thread.interrupted()) +throw new InterruptException("Thread interrupted."); Review Comment: Maybe we should use the same exception message `Interrupted waiting for results from fetching records` for consistency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16637: KIP-848 does not work well [kafka]
kirktrue opened a new pull request, #15844: URL: https://github.com/apache/kafka/pull/15844 This issue is related to an optimization for offset fetch logic. When a user calls `Consumer.poll()`, among other things, the consumer performs a network request to fetch any previously-committed offsets so it can determine from where to start fetching new records. When the user passes in a timeout of zero, it's almost always the case that the offset fetch network request will not be performed within 0 milliseconds. However, the consumer still sends out the request and handles the response when it is received, usually a few milliseconds later. In this first attempt, the lookup fails and the `poll()` loops back around. Given that this timeout is the common case, the consumer caches the offset fetch response/result from the first attempt (even though it timed out) because it knows that the next call to `poll()` is going to attempt the exact same operation. When it is later attempted a second time, the response is already there from the first attempt such that the consumer doesn't need to perform a network request. The existing consumer has implemented this caching in [PendingCommittedOffsetRequest](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L132). The new consumer has implemented it in [CommitRequestManager](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L510). The core issue is the new consumer implementation is clearing out the first attempt's cached result too aggressively. The effect being that the second (and subsequent) attempts fail to find any previous attempt's cached result, and all submit network requests, which all fail. Thus the consumer never makes any headway. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16637: -- Priority: Blocker (was: Minor) > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16427) KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER
[ https://issues.apache.org/jira/browse/KAFKA-16427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842817#comment-17842817 ] Kirk True commented on KAFKA-16427: --- This bug can be triggered with an embarrassingly simple integration test (n) But it appears this bug can be fixed with an embarrassingly simple change, so (y) > KafkaConsumer#position() does not respect timeout when group protocol is > CONSUMER > - > > Key: KAFKA-16427 > URL: https://issues.apache.org/jira/browse/KAFKA-16427 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Alyssa Huang >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > When > `long position(TopicPartition partition, final Duration timeout);` > is called on an unknown topic partition (and auto creation is disabled), the > method fails to adhere to the timeout supplied. > e.g. the following warning is logged continuously as metadata fetches are > retried > [2024-03-26 11:03:48,589] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Error while fetching metadata with correlation id 200 : > \{nonexistingTopic=UNKNOWN_TOPIC_OR_PARTITION} > (org.apache.kafka.clients.NetworkClient:1313) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Replaced Utils.join() with JDK API. [kafka]
chia7712 commented on code in PR #15823: URL: https://github.com/apache/kafka/pull/15823#discussion_r1586962262 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -399,7 +398,7 @@ public TopicCreationResponse createOrFindTopics(NewTopic... topics) { } } if (topicsByName.isEmpty()) return EMPTY_CREATION; -String topicNameList = Utils.join(topicsByName.keySet(), "', '"); +String topicNameList = String.join("', '", topicsByName.keySet()).replace("[", "").replace("]", ""); Review Comment: Why we need those `replace`? ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -630,7 +629,7 @@ public Map describeTopicConfigs(String... topicNames) { if (topics.isEmpty()) { return Collections.emptyMap(); } -String topicNameList = String.join(", ", topics); +String topicNameList = String.join(", ", topics).replace("[", "").replace("]", ""); Review Comment: ditto ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -475,7 +474,7 @@ public Map describeTopics(String... topics) { if (topics == null) { return Collections.emptyMap(); } -String topicNameList = String.join(", ", topics); +String topicNameList = String.join(", ", topics).replace("[", "").replace("]", ""); Review Comment: ditto ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java: ## @@ -216,7 +217,7 @@ public List> consume(TopicPartition topicPartitio Function0 messageSupplier = () -> String.format("Could not consume %d records of %s from offset %d in %d ms. %d message(s) consumed:%s%s", expectedTotalCount, topicPartition, fetchOffset, timeoutMs, records.size(), sep, -Utils.join(records, sep)); +String.join(sep, Arrays.toString(records.toArray(; Review Comment: `records.stream().map(Object::toString).collect(Collectors.joining(","))` ## tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java: ## @@ -230,8 +231,8 @@ else if (leader.lastCaughtUpTimestamp().isPresent() && maxLagFollower.lastCaught "\nHighWatermark: " + quorumInfo.highWatermark() + "\nMaxFollowerLag: " + maxFollowerLag + "\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs + -"\nCurrentVoters: " + Utils.mkString(quorumInfo.voters().stream().map(v -> v.replicaId()), "[", "]", ",") + -"\nCurrentObservers: " + Utils.mkString(quorumInfo.observers().stream().map(v -> v.replicaId()), "[", "]", ",") +"\nCurrentVoters: " + "[" + quorumInfo.voters().stream().map(QuorumInfo.ReplicaState::replicaId).map(Object::toString).collect(Collectors.joining(",")) + "]" + Review Comment: `quorumInfo.voters().stream().map(QuorumInfo.ReplicaState::replicaId).map(Object::toString).collect(Collectors.joining(",", "[", "]"))` ## tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java: ## @@ -230,8 +231,8 @@ else if (leader.lastCaughtUpTimestamp().isPresent() && maxLagFollower.lastCaught "\nHighWatermark: " + quorumInfo.highWatermark() + "\nMaxFollowerLag: " + maxFollowerLag + "\nMaxFollowerLagTimeMs: " + maxFollowerLagTimeMs + -"\nCurrentVoters: " + Utils.mkString(quorumInfo.voters().stream().map(v -> v.replicaId()), "[", "]", ",") + -"\nCurrentObservers: " + Utils.mkString(quorumInfo.observers().stream().map(v -> v.replicaId()), "[", "]", ",") +"\nCurrentVoters: " + "[" + quorumInfo.voters().stream().map(QuorumInfo.ReplicaState::replicaId).map(Object::toString).collect(Collectors.joining(",")) + "]" + +"\nCurrentObservers: " + "[" + quorumInfo.observers().stream().map(QuorumInfo.ReplicaState::replicaId).map(Objects::toString).collect(Collectors.joining(",")) + "]" Review Comment: ditto ## trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java: ## @@ -318,7 +320,7 @@ void log() { } } log.info("{}: consumer waiting for {} message(s), starting with: {}", -id, numToReceive, Utils.join(list, ", ")); +id, numToReceive, String.join(", ", Arrays.toString(list.toArray(; Review Comment: ditto ## tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java: ## @@ -488,12 +488,12 @@ private static void clearAllThrottles(Admin adminClient, targetParts.forEach(t -> brokers.addAll(t.getValue())); System.out.printf("Clearing broker-level throttles on
[jira] [Commented] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash
[ https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842816#comment-17842816 ] Haruki Okada commented on KAFKA-16541: -- [~junrao] Yes. My concern now is only changing renameDir may not be enough, so I'm trying to figure out if we can fix in another way without checking all call paths > Potential leader epoch checkpoint file corruption on OS crash > - > > Key: KAFKA-16541 > URL: https://issues.apache.org/jira/browse/KAFKA-16541 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0 >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Minor > > Pointed out by [~junrao] on > [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125] > [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid > of fsync of leader-epoch ckeckpoint file in some path for performance reason. > However, since now checkpoint file is flushed to the device asynchronously by > OS, content would corrupt if OS suddenly crashes (e.g. by power failure, > kernel panic) in the middle of flush. > Corrupted checkpoint file could prevent Kafka broker to start-up -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16427: KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER [kafka]
kirktrue opened a new pull request, #15843: URL: https://github.com/apache/kafka/pull/15843 The AsyncKafkaConsumer implementation of `position(TopicPartition, Duration)` was not updating its internal `Timer`, causing it to execute the loop forever. Adding a call to update the `Timer` at the bottom of the loop fixes the issue. An integration test was added to catch this case; it fails without the newly added call to `Timer.update(long)`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16647) Remove setMetadataDirectory from BrokerNode/ControllerNode
[ https://issues.apache.org/jira/browse/KAFKA-16647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16647. Fix Version/s: 3.8.0 Resolution: Fixed > Remove setMetadataDirectory from BrokerNode/ControllerNode > -- > > Key: KAFKA-16647 > URL: https://issues.apache.org/jira/browse/KAFKA-16647 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Kuan Po Tseng >Priority: Minor > Fix For: 3.8.0 > > > `TestKitNodes` does not enable callers to define the location of "base > folder". That makes sense to me since callers should not care for it. That > means the location of metadata folder shoud be transparent to callers. Hence, > the setter of metadata folder is useless. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16647: Remove setMetadataDirectory from BrokerNode/ControllerNode [kafka]
chia7712 merged PR #15833: URL: https://github.com/apache/kafka/pull/15833 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16647: Remove setMetadataDirectory from BrokerNode/ControllerNode [kafka]
chia7712 commented on PR #15833: URL: https://github.com/apache/kafka/pull/15833#issuecomment-2089361742 loop the failed tests on my local, and they pass. ``` ./gradlew cleanTest :connect:runtime:test --tests org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsets --tests org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsOverriddenConsumerGroupId --tests org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted :metadata:test --tests QuorumControllerTest.testBrokerHeartbeatDuringMigration --tests QuorumControllerTest.testFenceMultipleBrokers --tests QuorumControllerTest.testConfigurationOperations :connect:mirror:test --tests IdentityReplicationIntegrationTest.testReplicateFromLatest --tests MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow --tests MirrorConnectorsIntegrationExactlyOnceTest.testSyncTopicConfigs --tests MirrorConnectorsIntegrationTransactionsTest.testReplicateSourceDefault --tests MirrorConnectorsIntegrationTransactionsTest.testSyncTopicConfigs :c ore:test --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails --tests ZkMigrationIntegrationTest.testDualWrite ``` Also, the changes of this PR should be unrelated to those failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (2/3) [kafka]
chia7712 commented on code in PR #15841: URL: https://github.com/apache/kafka/pull/15841#discussion_r1586954293 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java: ## @@ -347,6 +380,165 @@ public void testPutConnectorConfigWithTargetState() throws Exception { verify(configLog).stop(); } +@Test +public void testPutConnectorConfigProducerError() throws Exception { +expectStart(Collections.emptyList(), Collections.emptyMap()); +expectPartitionCount(1); + +configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); +verifyConfigure(); +configStorage.start(); + +when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0))) +.thenReturn(CONFIGS_SERIALIZED.get(0)); +when(configLog.sendWithReceipt(anyString(), any(byte[].class))).thenReturn(producerFuture); + +// Verify initial state +ClusterConfigState configState = configStorage.snapshot(); +assertEquals(-1, configState.offset()); +assertEquals(0, configState.connectors().size()); + +when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow( +new ExecutionException(new TopicAuthorizationException(Collections.singleton("test"; + +// verify that the producer exception from KafkaBasedLog::send is propagated +ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), +SAMPLE_CONFIGS.get(0), null)); +assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka")); Review Comment: Could we verify the `e.getCause()` to make sure the error is caused by what we expect? ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java: ## @@ -347,6 +380,165 @@ public void testPutConnectorConfigWithTargetState() throws Exception { verify(configLog).stop(); } +@Test +public void testPutConnectorConfigProducerError() throws Exception { +expectStart(Collections.emptyList(), Collections.emptyMap()); +expectPartitionCount(1); + +configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); +verifyConfigure(); +configStorage.start(); + +when(converter.fromConnectData(TOPIC, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0))) +.thenReturn(CONFIGS_SERIALIZED.get(0)); +when(configLog.sendWithReceipt(anyString(), any(byte[].class))).thenReturn(producerFuture); + +// Verify initial state +ClusterConfigState configState = configStorage.snapshot(); +assertEquals(-1, configState.offset()); +assertEquals(0, configState.connectors().size()); + +when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow( +new ExecutionException(new TopicAuthorizationException(Collections.singleton("test"; + +// verify that the producer exception from KafkaBasedLog::send is propagated +ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), +SAMPLE_CONFIGS.get(0), null)); +assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka")); + +configStorage.stop(); +verify(configLog).stop(); +} + +@Test +public void testRemoveConnectorConfigSlowProducer() throws Exception { +expectStart(Collections.emptyList(), Collections.emptyMap()); +expectPartitionCount(1); + +configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); +verifyConfigure(); +configStorage.start(); + +@SuppressWarnings("unchecked") +Future connectorConfigProducerFuture = mock(Future.class); + +@SuppressWarnings("unchecked") +Future targetStateProducerFuture = mock(Future.class); + +when(configLog.sendWithReceipt(anyString(), isNull())) +// tombstone for the connector config +.thenReturn(connectorConfigProducerFuture) +// tombstone for the connector target state +.thenReturn(targetStateProducerFuture); + + when(connectorConfigProducerFuture.get(eq(READ_WRITE_TOTAL_TIMEOUT_MS), any(TimeUnit.class))) +.thenAnswer((Answer) invocation -> { +time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000); +return null; +}); + +// the future get timeout is expected to be reduced according to how long the previous Future::get took +when(targetStateProducerFuture.get(eq(1000L), any(TimeUnit.class))) +.thenAnswer((Answer) invocation -> { +time.sleep(1000); +
[jira] [Assigned] (KAFKA-16654) Refactor kafka.test.annotation.Type and ClusterTestExtensions
[ https://issues.apache.org/jira/browse/KAFKA-16654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16654: -- Assignee: TaiJuWu (was: Chia-Ping Tsai) > Refactor kafka.test.annotation.Type and ClusterTestExtensions > - > > Key: KAFKA-16654 > URL: https://issues.apache.org/jira/browse/KAFKA-16654 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Minor > > It seems to me the refactor could include following tasks. > 1. change `invocationContexts`, method invoked by `ClusterTemplate`, and > generate-related methods in `ClusterTestExtensions` to return a > java.util.Collection instead of accepting a `java.util.function.Consumer`. > That can brings two benefit. 1) more simple in production: we don't need to > create a List and then pass it to be a function to collect stuff. 2) more > easy to write unit test. > 2. separate `provideTestTemplateInvocationContexts` to multi methods to > handle each annotation. That can help us to write tests, and make core more > readable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]
mjsax commented on code in PR #14360: URL: https://github.com/apache/kafka/pull/14360#discussion_r1586935977 ## docs/streams/developer-guide/config-streams.html: ## @@ -240,24 +240,29 @@ num.standby.replicas - acceptable.recovery.lag + acceptable.recovery.lag Medium The maximum acceptable lag (number of offsets to catch up) for an instance to be considered caught-up and ready for the active task. 1 - application.server + application.server Low A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single Kafka Streams application. The value of this must be different for each instance of the application. the empty string - buffered.records.per.partition + buffered.records.per.partition Low The maximum number of records to buffer per partition. 1000 - cache.max.bytes.buffering + statestore.cache.max.bytes +Medium +Maximum number of memory bytes to be used for record caches across all threads. +10485760 + + cache.max.bytes.buffering (Deprecated. Use cache.max.bytes instead.) Review Comment: ```suggestion cache.max.bytes.buffering (Deprecated. Use statestore.cache.max.bytes instead.) ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -564,25 +565,22 @@ public class StreamsConfig extends AbstractConfig { static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the org.apache.kafka.streams.state.DslStoreSuppliers interface."; static final Class DSL_STORE_SUPPLIERS_CLASS_DEFAULT = BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class; -/** {@code default.windowed.key.serde.inner} */ +/** {@code default.windowed.key.serde.inner + * @deprecated since 3.0.0} */ @SuppressWarnings("WeakerAccess") @Deprecated public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner"; private static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed key. Must implement the " + "org.apache.kafka.common.serialization.Serde interface."; -/** {@code default.windowed.value.serde.inner} */ +/** {@code default.windowed.value.serde.inner + * @deprecated since 3.0.0 } */ @SuppressWarnings("WeakerAccess") @Deprecated public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner"; private static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed value. Must implement the " + "org.apache.kafka.common.serialization.Serde interface."; -public static final String WINDOWED_INNER_CLASS_SERDE = "windowed.inner.class.serde"; Review Comment: Seems we cannot just remove it, but also marked as deprecated only instead? Also, given that deprecation is part of KIP-1020, should this be only done in the KIP-1020 PR, and this PR would only do the docs cleanup? ## docs/streams/developer-guide/config-streams.html: ## @@ -326,8 +331,15 @@ num.standby.replicasDefault serializer/deserializer for the inner class of windowed keys, implementing the Serde interface. Review Comment: Seems this line does not belong to `dsl.store.suppliers.class` config? ## docs/streams/developer-guide/config-streams.html: ## @@ -300,7 +305,7 @@ num.standby.replicasnull - default.windowed.key.serde.inner + default.windowed.key.serde.inner (Deprecated.) Review Comment: Below is `default.window.value.serde.inner` which was also deprecated, right? (L308 original, new L313) ## docs/streams/developer-guide/config-streams.html: ## @@ -326,8 +331,15 @@ num.standby.replicasDefault serializer/deserializer for the inner class of windowed keys, implementing the Serde interface. +null - max.task.idle.ms + default.windowed.value.serde.inner (Deprecated.) Review Comment: It seems `default.windowed.value.serde.inner` exist already? Why do we add it here? ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -647,7 +645,8 @@ public class StreamsConfig extends AbstractConfig { @SuppressWarnings("WeakerAccess") public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; -/** {@code auto.include.jmx.reporter} */ +/** {@code auto.include.jmx.r
[jira] [Commented] (KAFKA-16654) Refactor kafka.test.annotation.Type and ClusterTestExtensions
[ https://issues.apache.org/jira/browse/KAFKA-16654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842808#comment-17842808 ] TaiJuWu commented on KAFKA-16654: - Hi [~chia7712] , If you are not working on it, could you assign it to me? > Refactor kafka.test.annotation.Type and ClusterTestExtensions > - > > Key: KAFKA-16654 > URL: https://issues.apache.org/jira/browse/KAFKA-16654 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > It seems to me the refactor could include following tasks. > 1. change `invocationContexts`, method invoked by `ClusterTemplate`, and > generate-related methods in `ClusterTestExtensions` to return a > java.util.Collection instead of accepting a `java.util.function.Consumer`. > That can brings two benefit. 1) more simple in production: we don't need to > create a List and then pass it to be a function to collect stuff. 2) more > easy to write unit test. > 2. separate `provideTestTemplateInvocationContexts` to multi methods to > handle each annotation. That can help us to write tests, and make core more > readable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842805#comment-17842805 ] Matthias J. Sax commented on KAFKA-16514: - Thanks for the background! Makes sense. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16614) Disallow `@ClusterTemplate("")`
[ https://issues.apache.org/jira/browse/KAFKA-16614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16614. Fix Version/s: 3.8.0 Resolution: Fixed > Disallow `@ClusterTemplate("")` > --- > > Key: KAFKA-16614 > URL: https://issues.apache.org/jira/browse/KAFKA-16614 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Minor > Fix For: 3.8.0 > > > `@ClusterTemplate` enable us to create dynamic configs, and it expect to > accept a method name which can create server configs at runtime. It throws > error when we pass a nonexistent method name, but it works if we pass an > empty name -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]
chia7712 merged PR #15800: URL: https://github.com/apache/kafka/pull/15800 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16654) Refactor kafka.test.annotation.Type and ClusterTestExtensions
Chia-Ping Tsai created KAFKA-16654: -- Summary: Refactor kafka.test.annotation.Type and ClusterTestExtensions Key: KAFKA-16654 URL: https://issues.apache.org/jira/browse/KAFKA-16654 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai It seems to me the refactor could include following tasks. 1. change `invocationContexts`, method invoked by `ClusterTemplate`, and generate-related methods in `ClusterTestExtensions` to return a java.util.Collection instead of accepting a `java.util.function.Consumer`. That can brings two benefit. 1) more simple in production: we don't need to create a List and then pass it to be a function to collect stuff. 2) more easy to write unit test. 2. separate `provideTestTemplateInvocationContexts` to multi methods to handle each annotation. That can help us to write tests, and make core more readable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586871366 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -2415,17 +2476,30 @@ public void resign(int epoch) { @Override public Optional> createSnapshot( OffsetAndEpoch snapshotId, -long lastContainedLogTime +long lastContainedLogTimestamp ) { -return RecordsSnapshotWriter.createWithHeader( -() -> log.createNewSnapshot(snapshotId), -MAX_BATCH_SIZE_BYTES, -memoryPool, -time, -lastContainedLogTime, -CompressionType.NONE, -serde -); +if (!isInitialized()) { +throw new IllegalStateException("Cannot create snapshot before the replica has been initialized"); +} + +return log.createNewSnapshot(snapshotId).map(writer -> { +long lastContainedLogOffset = snapshotId.offset() - 1; Review Comment: Yes. I have this issue [KAFKA-14620](https://issues.apache.org/jira/browse/KAFKA-14620) to introduce the `SnapshotId` type. I can fix this on that PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14620) Add a type for SnapshotId
[ https://issues.apache.org/jira/browse/KAFKA-14620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14620: --- Description: We have seen issues where the state machine assumes that offset in the snapshot id is inclusive. I think adding at type that makes this clear would help developers and reviewers catch such issues. The snapshot id type should support the help function lastContainedLogOffset. was:We have seen issues where the state machine assumes that offset in the snapshot id is inclusive. I think adding at type that makes this clear would help developers and reviewers catch such issues. > Add a type for SnapshotId > - > > Key: KAFKA-14620 > URL: https://issues.apache.org/jira/browse/KAFKA-14620 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > > We have seen issues where the state machine assumes that offset in the > snapshot id is inclusive. I think adding at type that makes this clear would > help developers and reviewers catch such issues. > The snapshot id type should support the help function lastContainedLogOffset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586869606 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -370,8 +363,52 @@ private void maybeFireLeaderChange() { } } -@Override -public void initialize() { +public void initialize( +Map voterAddresses, +String listenerName, +QuorumStateStore quorumStateStore, +Metrics metrics +) { +partitionState = new KRaftControlRecordStateMachine( +Optional.of(VoterSet.fromAddressSpecs(listenerName, voterAddresses)), Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586867970 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -370,8 +363,52 @@ private void maybeFireLeaderChange() { } } -@Override -public void initialize() { +public void initialize( +Map voterAddresses, +String listenerName, +QuorumStateStore quorumStateStore, +Metrics metrics Review Comment: Yeah. I am also not happy with this move. We only do this delay initialization because of integration tests (`QuorumTestHarness`, `KRaftClusterTestKit`). This is not needed by `**/src/main`. Once we have KIP-853 fully implemented, I should be able to fix the integration tests to not use the static voter set and the delayed initialization. I create [Remove delayed initialization because of static voter set](https://issues.apache.org/jira/browse/KAFKA-16653) to track this work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16653) Remove delayed initialization because of static voter set
José Armando García Sancio created KAFKA-16653: -- Summary: Remove delayed initialization because of static voter set Key: KAFKA-16653 URL: https://issues.apache.org/jira/browse/KAFKA-16653 Project: Kafka Issue Type: Sub-task Components: kraft Reporter: José Armando García Sancio Once KRaft supports the AddVoter RPC, the QuorumTestHarness and KRaftClusterTestKit can be reimplemented to use dynamic voters instead of the static voter set. This should allow us to remove KRaft's support for delay static voter set initialization. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586861717 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -159,67 +165,76 @@ public class KafkaRaftClient implements RaftClient { private final MemoryPool memoryPool; private final RaftMessageQueue messageQueue; private final QuorumConfig quorumConfig; -private final KafkaRaftMetrics kafkaRaftMetrics; -private final QuorumState quorum; -private final RequestManager requestManager; private final RaftMetadataLogCleanerManager snapshotCleaner; private final Map, ListenerContext> listenerContexts = new IdentityHashMap<>(); private final ConcurrentLinkedQueue> pendingRegistrations = new ConcurrentLinkedQueue<>(); +// These components need to be initialized by the method initialize() because they depend on the voter set +/* + * The key invariant for the kraft control record state machine is that it has always read to the LEO. This is achived by: Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16624: Don't generate useless PartitionChangeRecord on older MV [kafka]
cmccabe commented on code in PR #15810: URL: https://github.com/apache/kafka/pull/15810#discussion_r1586861400 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -365,45 +369,62 @@ private void tryElection(PartitionChangeRecord record) { } /** - * Trigger a leader epoch bump if one is needed. - * - * We need to bump the leader epoch if: - * 1. The leader changed, or - * 2. The new replica list does not contain all the nodes that the old replica list did. - * - * Changes that do NOT fall in any of these categories will increase the partition epoch, but - * not the leader epoch. Note that if the leader epoch increases, the partition epoch will - * always increase as well; there is no case where the partition epoch increases more slowly - * than the leader epoch. + * Trigger a leader epoch bump if one is needed because of replica reassignment. * - * If the PartitionChangeRecord sets the leader field to something other than - * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of - * case 1. In this function, we check for cases 2 and 3, and handle them by manually - * setting record.leader to the current leader. - * - * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager - * that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader - * bump is not required when the ISR shrinks. Note, that the leader epoch is never increased if - * the ISR expanded. + * Note that if the leader epoch increases, the partition epoch will always increase as well; there is no + * case where the partition epoch increases more slowly than the leader epoch. + */ +void triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(PartitionChangeRecord record) { Review Comment: I agree that the name should be revisited. Better to do that in a follow-on PR, though, to avoid making this one too big. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16624: Don't generate useless PartitionChangeRecord on older MV [kafka]
cmccabe commented on PR #15810: URL: https://github.com/apache/kafka/pull/15810#issuecomment-2089185449 > Should we have a new test for that specific case? Good point. Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586860365 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ## @@ -213,20 +213,21 @@ private void completeCurrentBatch() { * * @param valueCreator a function that uses the passed buffer to create the control *batch that will be appended. The memory records returned must contain one - *control batch and that control batch have one record. + *control batch and that control batch have at least one record. */ -private void appendControlMessage(Function valueCreator) { +public void appendControlMessages(Function valueCreator) { Review Comment: Okay, I was trying to avoid decoding the `MemoryRecords`. if we are going to read the first batch, we don't even need `CreatedRecords`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
junrao commented on PR #15673: URL: https://github.com/apache/kafka/pull/15673#issuecomment-2089169862 @clolov: Are you able to address the remaining comments? 3.8.0 code freeze is getting close. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]
cmccabe commented on PR #15838: URL: https://github.com/apache/kafka/pull/15838#issuecomment-2089167707 > Thanks for the patch, @cmccabe. I looked for other usages of this lock and see we're obtaining the write lock in DynamicBrokerConfig#updateBrokerConfig which gets called from DynamicConfigPublisher. Will we still need this locking when we have dropped ZK? I think we can simplify this code a lot once ZK is gone, yes. Having a single updater will help a lot. I suspect we can get rid of the lock at that point, although I'll have to look more later. > Can you add a comment next to the lock indicating what it is guarding? I added a clarification that the lock protects against concurrent reconfiguration operations, but not concurrent additions or removals of reconfigurables. As you mentioned, this is mostly only relevant to ZK mode since there we can have racing threads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]
cmccabe commented on code in PR #15838: URL: https://github.com/apache/kafka/pull/15838#discussion_r1586844632 ## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ## @@ -1541,6 +1541,36 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testReduceNumNetworkThreads(): Unit = { Review Comment: I wanted to ensure that no deadlocks are created in the future, by adding this test coverage. I don't think this PR is required to avoid deadlock currently, however. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable [kafka]
cmccabe commented on code in PR #15838: URL: https://github.com/apache/kafka/pull/15838#discussion_r1586843624 ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -303,17 +303,17 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging addBrokerReconfigurable(controller.socketServer) } - def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) { + def addReconfigurable(reconfigurable: Reconfigurable): Unit = { verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala) Review Comment: Good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash
[ https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842783#comment-17842783 ] Jun Rao commented on KAFKA-16541: - [~ocadaruma] : Will you be able to work on this soon? The 3.8.0 code freeze is getting close. Thanks. > Potential leader epoch checkpoint file corruption on OS crash > - > > Key: KAFKA-16541 > URL: https://issues.apache.org/jira/browse/KAFKA-16541 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0 >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Minor > > Pointed out by [~junrao] on > [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125] > [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid > of fsync of leader-epoch ckeckpoint file in some path for performance reason. > However, since now checkpoint file is flushed to the device asynchronously by > OS, content would corrupt if OS suddenly crashes (e.g. by power failure, > kernel panic) in the middle of flush. > Corrupted checkpoint file could prevent Kafka broker to start-up -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
junrao commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1586840549 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > I'm not clear on this: > > 1. Segments that are eligible for upload to remote storage only when the lastStableOffset moves beyond the segment-to-be-uploaded-end-offset. > 2. When all the replicas loses local data (offline partition), then we consider the data in remote storage also lost. Currently, for this case, we don't have provision to serve the remote data. > 3. When firstUnstableOffsetMetadata is empty, we return highWatermark. With this patch, the highWatermark lower boundary is set to localLogStartOffset so there won't be an issue. > That's true. It's just that that is yet another offset that we need to bound. I am also not sure if there are other side effects of adjusting HWM and LSO. Left some comments on https://github.com/apache/kafka/pull/15825. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1586835182 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java: ## @@ -28,6 +28,7 @@ public final class LogOffsetMetadata { //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage module private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L; +public static final long REMOTE_LOG_UNKNOWN_OFFSET = -2L; Review Comment: We probably don't need this. The existing UNIFIED_LOG_UNKNOWN_OFFSET should be enough. ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java: ## @@ -51,6 +57,8 @@ public LogOffsetMetadata(long messageOffset, // check if this offset is already on an older segment compared with the given offset public boolean onOlderSegment(LogOffsetMetadata that) { +if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET) Review Comment: We probably don't need this. If messageOffsetOnly() is true, we can just return false. ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java: ## @@ -65,6 +73,8 @@ private boolean onSameSegment(LogOffsetMetadata that) { // compute the number of bytes between this offset to the given offset // if they are on the same segment and this offset precedes the given offset public int positionDiff(LogOffsetMetadata that) { +if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET) +return 1; Review Comment: It's a bit hacky to do this here. I was thinking of doing this in DelayedFetch. ``` if (endOffset.messageOffset != fetchOffset.messageOffset) { if (endOffset.messageOnly() || fetchOffset.messageOnly()) { accumulatedSize += 1 } else if (endOffset.onOlderSegment(fetchOffset)) { // Case F, this can happen when the new fetch operation is on a truncated leader ... } ``` ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1428,7 +1428,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { checkLogStartOffset(offset) -localLog.convertToOffsetMetadataOrThrow(offset) +if (remoteLogEnabled() && offset < localLogStartOffset()) { + new LogOffsetMetadata(offset, LogOffsetMetadata.REMOTE_LOG_UNKNOWN_OFFSET) +} else { + localLog.convertToOffsetMetadataOrThrow(offset) Review Comment: I was thinking that we could change localLog.convertToOffsetMetadataOrThrow() such that if read() throws an exception, it just returns a message-only LogOffsetMetadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
C0urante commented on code in PR #14309: URL: https://github.com/apache/kafka/pull/14309#discussion_r1586591019 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String connName, ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest( connName, connectorType, connectorClass, clientConfigs, clientType); List configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest); -if (configValues != null) { -for (ConfigValue validatedConfigValue : configValues) { -ConfigKey configKey = configKeys.get(validatedConfigValue.name()); -ConfigKeyInfo configKeyInfo = null; -if (configKey != null) { -if (configKey.group != null) { -groups.add(configKey.group); -} -configKeyInfo = convertConfigKey(configKey, prefix); -} -ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(), - validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages()); -if (!configValue.errorMessages().isEmpty()) { -errorCount++; +return prefixedConfigInfos(configDef.configKeys(), configValues, prefix); +} + +private static ConfigInfos prefixedConfigInfos(Map configKeys, List configValues, String prefix) { +int errorCount = 0; +Set groups = new LinkedHashSet<>(); +List configInfos = new ArrayList<>(); + +if (configValues == null) { Review Comment: `ConfigDef::validate` is non-final, and plugin instances may return a subclass from their `config` methods that possibly returns null. I acknowledge that this is extremely unlikely, but it seems like this null guard is the best way to handle that scenario as opposed to, e.g., throwing an error and causing a 500 response to be returned. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586828276 ## raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java: ## @@ -206,8 +151,16 @@ private static Integer parseVoterId(String idString) { } } -public static Map parseVoterConnections(List voterEntries) { -Map voterMap = new HashMap<>(); +public static Map parseVoterConnections(List voterEntries) { +return parseVoterConnections(voterEntries, true); +} + +public static Set parseVoterIds(List voterEntries) { +return parseVoterConnections(voterEntries, false).keySet(); +} + +private static Map parseVoterConnections(List voterEntries, boolean routableOnly) { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586825107 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -181,20 +181,12 @@ class KafkaRaftManager[T]( private val clientDriver = new KafkaRaftClientDriver[T](client, threadNamePrefix, fatalFaultHandler, logContext) def startup(): Unit = { -// Update the voter endpoints (if valid) with what's in RaftConfig -val voterAddresses: util.Map[Integer, AddressSpec] = controllerQuorumVotersFuture.get() -for (voterAddressEntry <- voterAddresses.entrySet.asScala) { - voterAddressEntry.getValue match { -case spec: InetAddressSpec => - netChannel.updateEndpoint(voterAddressEntry.getKey, spec) -case _: UnknownAddressSpec => - info(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " + -s"because of non-routable endpoint: ${NON_ROUTABLE_ADDRESS.toString}") -case invalid: AddressSpec => - warn(s"Unexpected address spec (type: ${invalid.getClass}) for channel update for " + -s"destination ID: ${voterAddressEntry.getKey}") - } -} +client.initialize( + controllerQuorumVotersFuture.get(), + config.controllerListenerNames.head, + new FileBasedStateStore(new File(dataDir, "quorum-state")), Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586802072 ## raft/src/main/java/org/apache/kafka/raft/internals/History.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Objects; +import java.util.Optional; + +/** + * A object tracks values of {@code T} at different offsets. + */ +public interface History { Review Comment: Okay. Went with `LogHistory`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on PR #15640: URL: https://github.com/apache/kafka/pull/15640#issuecomment-2089079055 > > > Here I have a comment, I could not put at the right location in the code: > > > On line 1362, in commitSync() the consumer waits on the commitFuture with a timer. I think, it should not wait on a timer there since we already wait on a timer in the background thread. > > > > > > I agree. What about the timed wait in awaitPendingAsyncCommitsAndExecuteCommitCallbacks()? > > Agree we should not wait on the `commitFuture` with a timer because the deadline is contained in the event we submitted, and already enforced by the reaper, and not clear about what the proposed relationship with `awaitPendingAsyncCommitsAndExecuteCommitCallbacks` is?? > > I would expect we only need to call `ConsumerUtils.getResult(commitFuture);`, and that is consistent with how we get results for all other completable events now: > > * we create an event with a deadline > * we call `applicationEventHandler.addAndGet(event)` > > For the commit case that flow has a different shape just because we use `applicationEventHandler.add(event)` [here](https://github.com/apache/kafka/blob/097522abd6b51bca2407ea0de7009ed6a2d970b4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L775), to cater for commit sync and async, but we should still apply the same approach and just call get without any time boundary I would say. Here's my reasoning on the need for a `Timer`-based `get()` in `awaitPendingAsyncCommitsAndExecuteCommitCallbacks()`... The `Future` that's referenced in `lastPendingAsyncCommit` comes from an `AsyncCommitEvent` and has a hard-coded deadline of `Long.MAX_VALUE`. As such, the `CompetableEventReaper` in the network thread will never prune that event. Without a timeout when calling `get()` on the `lastPendingAsyncCommit`, the caller could hang for up to `request.timeout.ms` while we wait for the network I/O request to complete (or timeout). @cadonna @lianetm @lucasbru—does that make sense? CMIIW, please 🙏 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586782977 ## raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java: ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +final public class VoterSetHistoryTest { Review Comment: Yes. Good catch. Added two more tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586765460 ## raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java: ## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Arrays; +import java.util.Optional; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.KRaftVersionRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.raft.MockLog; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.snapshot.RecordsSnapshotWriter; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; + +final class KRaftControlRecordStateMachineTest { +private static final RecordSerde STRING_SERDE = new StringSerde(); + +private static MockLog buildLog() { +return new MockLog(new TopicPartition("partition", 0), Uuid.randomUuid(), new LogContext()); +} + +private static KRaftControlRecordStateMachine buildPartitionListener(MockLog log, Optional staticVoterSet) { +return new KRaftControlRecordStateMachine( +staticVoterSet, +log, +STRING_SERDE, +BufferSupplier.NO_CACHING, +1024, +new LogContext() +); +} + +@Test +void testEmptyParition() { Review Comment: Fxed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586760874 ## raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java: ## @@ -216,4 +140,132 @@ private void appendBatches(List> batches) { batches.forEach(CompletedBatch::release); } } + +final public static class Builder { +private long lastContainedLogTimestamp = 0; +private CompressionType compressionType = CompressionType.NONE; +private Time time = Time.SYSTEM; +private int maxBatchSize = 1024; +private MemoryPool memoryPool = MemoryPool.NONE; +private short kraftVersion = 0; Review Comment: In practice `KafkaRaftClient` will override all of these defaults. I set the default to what would make sense to use in most cases in `**/src/test`. This is why `maxBatchSize` is 1024 for example. We use a similar strategy in the `metadata` module. The builder's default is what would make sense in most tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842765#comment-17842765 ] Lianet Magrans commented on KAFKA-16514: Just to answer the question above: {quote}what is the purpose of -2 code? In the end, not sending any request, with a large enough session timeout, no rebalance would be triggered anyway? What does change is we send -2 instead of just not sending any leaver group request on close()?{quote} The purpose is just to set the intention explicitly at the protocol level (and not assume it). This is mainly to allow for richer semantics around the static membership leave in the future. It does not make a difference at the moment (over not sending the leave group), but it does allow to cleanly extend the current logic if we ever want to, and allow static members to leave permanently by sending a -1 epoch on the leave group. That would effectively allow to remove a static members from a group (which can only be achieved now either waiting for the session timeout to expire, or via the admin api) Anyways, that's just food for thought for now. The KIP extending the consumer close with options seems sensible to solve the current situation, and would align nicely with any future extension of the static leave semantics if we ever go down that path. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586731329 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.feature.SupportedVersionRange; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting + * from the static configuration. + */ +final public class VoterSet { +private final Map voters; + +VoterSet(Map voters) { +if (voters.isEmpty()) { +throw new IllegalArgumentException("Voters cannot be empty"); +} + +this.voters = voters; +} + +/** + * Returns the socket address for a given voter at a given listener. + * + * @param voter the id of the voter + * @param listener the name of the listener + * @return the socket address if it exist, otherwise {@code Optional.empty()} + */ +public Optional voterAddress(int voter, String listener) { +return Optional.ofNullable(voters.get(voter)) +.flatMap(voterNode -> voterNode.address(listener)); +} + +/** + * Returns all of the voter ids. + */ +public Set voterIds() { +return voters.keySet(); +} + +/** + * Adds a voter to the voter set. + * + * This object is immutable. A new voter set is returned if the voter was added. + * + * A new voter can be added to a voter set if its id doesn't already exist in the voter set. + * + * @param voter the new voter to add + * @return a new voter set if the voter was added, otherwise {@code Optional.empty()} + */ +public Optional addVoter(VoterNode voter) { +if (voters.containsKey(voter.id())) { +return Optional.empty(); +} + +HashMap newVoters = new HashMap<>(voters); +newVoters.put(voter.id(), voter); + +return Optional.of(new VoterSet(newVoters)); +} + +/** + * Removew a voter from the voter set. + * + * This object is immutable. A new voter set is returned if the voter was removed. + * + * A voter can be removed from the voter set if its id and uuid match. + * + * @param voterId the voter id Review Comment: Okay. I'll use `VoterKey`. I was conflicted about using `VoterKey` instead of `int , Optional` through out the code but probably using `VoterKey` as much as possible is the right decision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586632167 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1169,8 +1129,7 @@ private Map beginningOrEndOffset(Collection offsetAndTimestampMap; offsetAndTimestampMap = applicationEventHandler.addAndGet( -listOffsetsEvent, -timer); +listOffsetsEvent); Review Comment: Moved the `listOffsetsEvent` up to the previous line. Missed it on first read, sorry 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586626516 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -987,6 +987,7 @@ public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) { @ParameterizedTest @EnumSource(GroupProtocol.class) public void testOffsetIsValidAfterSeek(GroupProtocol groupProtocol) { +Time time = new MockTime(1); Review Comment: Done. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class CompletableEventReaperTest { + +private final LogContext logContext = new LogContext(); +private final Time time = new MockTime(0, 0, 0); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586622390 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1273,6 +1228,22 @@ private void close(Duration timeout, boolean swallowException) { if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); closeTimer.update(); + +// close() can be called from inside one of the constructors. In that case, it's possible that neither +// the reaper nor the background event queue were constructed, so check them first to avoid NPE. +if (backgroundEventReaper != null && backgroundEventQueue != null) { +// Copy over the completable events to a separate list, then reap any incomplete +// events on that list. +LinkedList allEvents = new LinkedList<>(); Review Comment: Yes, because it has the `drainTo()` method. However, this code is now gone, so it's moot 🤷♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586620323 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java: ## @@ -16,9 +16,118 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +import java.time.Duration; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import static java.util.Objects.requireNonNull; +/** + * {@code CompletableEvent} is an interface that is used by both {@link CompletableApplicationEvent} and + * {@link CompletableBackgroundEvent} for common processing and logic. A {@code CompletableEvent} is one that + * allows the caller to get the {@link #future() future} related to the event and the event's + * {@link #deadlineMs() expiration timestamp}. + * + * @param Return type for the event when completed + */ public interface CompletableEvent { +/** + * Returns the {@link CompletableFuture future} associated with this event. Any event will have some related + * logic that is executed on its behalf. The event can complete in one of the following ways: + * + * + * + * Success: when the logic for the event completes successfully, the data generated by that event + * (if applicable) is passed to {@link CompletableFuture#complete(Object)}. In the case where the generic + * bound type is specified as {@link Void}, {@code null} is provided. + * + * Error: when the the event logic generates an error, the error is passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * + * Timeout: when the time spent executing the event logic exceeds the {@link #deadlineMs() deadline}, an + * instance of {@link TimeoutException} should be created and passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * + * Cancelled: when an event remains incomplete when the consumer closes, the future will be + * {@link CompletableFuture#cancel(boolean) cancelled}. Attempts to {@link Future#get() get the result} Review Comment: Good catch! I had been using `cancel()`, but noticed that the message in the exception the caller of `Future.get()` later received was unhelpful. Yes, `cancel()` calls `completeExceptionally(new CancellationException())`, but I wanted the exception to include a (hopefully) meaningful message. Anyhoo... I've updated the documentation to reflect that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586620323 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java: ## @@ -16,9 +16,118 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +import java.time.Duration; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import static java.util.Objects.requireNonNull; +/** + * {@code CompletableEvent} is an interface that is used by both {@link CompletableApplicationEvent} and + * {@link CompletableBackgroundEvent} for common processing and logic. A {@code CompletableEvent} is one that + * allows the caller to get the {@link #future() future} related to the event and the event's + * {@link #deadlineMs() expiration timestamp}. + * + * @param Return type for the event when completed + */ public interface CompletableEvent { +/** + * Returns the {@link CompletableFuture future} associated with this event. Any event will have some related + * logic that is executed on its behalf. The event can complete in one of the following ways: + * + * + * + * Success: when the logic for the event completes successfully, the data generated by that event + * (if applicable) is passed to {@link CompletableFuture#complete(Object)}. In the case where the generic + * bound type is specified as {@link Void}, {@code null} is provided. + * + * Error: when the the event logic generates an error, the error is passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * + * Timeout: when the time spent executing the event logic exceeds the {@link #deadlineMs() deadline}, an + * instance of {@link TimeoutException} should be created and passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * + * Cancelled: when an event remains incomplete when the consumer closes, the future will be + * {@link CompletableFuture#cancel(boolean) cancelled}. Attempts to {@link Future#get() get the result} Review Comment: Good catch! I had been using `cancel()`, but noticed that the message in the exception the caller of `Future.get()` later received was unhelpful. Yes, `cancel()` calls `completeExceptionally(new CancellationException())`, but I wanted the exception with a (hopefully) meaningful message. Anyhoo... I've updated the documentation to reflect that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586607598 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java: ## @@ -30,12 +29,7 @@ public abstract class CommitEvent extends CompletableApplicationEvent { */ private final Map offsets; -protected CommitEvent(final Type type, final Map offsets, final Timer timer) { -super(type, timer); -this.offsets = validate(offsets); -} - -protected CommitEvent(final Type type, final Map offsets, final long deadlineMs) { +public CommitEvent(final Type type, Map offsets, final long deadlineMs) { Review Comment: Added back `final` and changed back to `protected`. Not sure how/why I changed those 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586603709 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -273,9 +310,18 @@ void cleanup() { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { sendUnsentRequests(timer); + +LinkedList allEvents = new LinkedList<>(); +applicationEventQueue.drainTo(allEvents); +List> completableEvents = allEvents +.stream() +.filter(e -> e instanceof CompletableApplicationEvent) +.map(e -> (CompletableApplicationEvent) e) +.collect(Collectors.toList()); Review Comment: Pulled the logic to `reapIncomplete()` as suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
C0urante commented on code in PR #14309: URL: https://github.com/apache/kafka/pull/14309#discussion_r1586591574 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String connName, ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest( connName, connectorType, connectorClass, clientConfigs, clientType); List configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest); -if (configValues != null) { -for (ConfigValue validatedConfigValue : configValues) { -ConfigKey configKey = configKeys.get(validatedConfigValue.name()); -ConfigKeyInfo configKeyInfo = null; -if (configKey != null) { -if (configKey.group != null) { -groups.add(configKey.group); -} -configKeyInfo = convertConfigKey(configKey, prefix); -} -ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(), - validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages()); -if (!configValue.errorMessages().isEmpty()) { -errorCount++; +return prefixedConfigInfos(configDef.configKeys(), configValues, prefix); +} + +private static ConfigInfos prefixedConfigInfos(Map configKeys, List configValues, String prefix) { +int errorCount = 0; +Set groups = new LinkedHashSet<>(); +List configInfos = new ArrayList<>(); + +if (configValues == null) { +return new ConfigInfos("", 0, new ArrayList<>(groups), configInfos); +} + +for (ConfigValue validatedConfigValue : configValues) { +ConfigKey configKey = configKeys.get(validatedConfigValue.name()); +ConfigKeyInfo configKeyInfo = null; +if (configKey != null) { +if (configKey.group != null) { +groups.add(configKey.group); } -ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null); -configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo)); +configKeyInfo = convertConfigKey(configKey, prefix); +} + +ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(), +validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages()); +if (configValue.errorMessages().size() > 0) { +errorCount++; } +ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null); +configInfos.add(new ConfigInfo(configKeyInfo, configValueInfo)); } -return new ConfigInfos(connectorClass.toString(), errorCount, new ArrayList<>(groups), configInfoList); +return new ConfigInfos("", errorCount, new ArrayList<>(groups), configInfos); Review Comment: Yeah, it's a little weird with the empty string here. Hopefully it's fine for now but if we continue augmenting and refactoring this class I agree that it might be worth changing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
C0urante commented on code in PR #14309: URL: https://github.com/apache/kafka/pull/14309#discussion_r1586591136 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -392,6 +399,146 @@ protected Map validateSourceConnectorConfig(SourceConnector return configDef.validateAll(config); } +/** + * General-purpose validation logic for converters that are configured directly + * in a connector config (as opposed to inherited from the worker config). + * @param connectorConfig the configuration for the connector; may not be null + * @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config; + * may be null, in which case no validation will be performed under the assumption that the + * connector will use inherit the converter settings from the worker + * @param pluginInterface the interface for the plugin type + *(e.g., {@code org.apache.kafka.connect.storage.Converter.class}); + *may not be null + * @param configDefAccessor an accessor that can be used to retrieve a {@link ConfigDef} + * from an instance of the plugin type (e.g., {@code Converter::config}); + * may not be null + * @param pluginName a lowercase, human-readable name for the type of plugin (e.g., {@code "key converter"}); + * may not be null + * @param pluginProperty the property used to define a custom class for the plugin type + * in a connector config (e.g., {@link ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG}); + * may not be null + * @param defaultProperties any default properties to include in the configuration that will be used for + * the plugin; may be null + + * @return a {@link ConfigInfos} object containing validation results for the plugin in the connector config, + * or null if no custom validation was performed (possibly because no custom plugin was defined in the connector + * config) + + * @param the plugin class to perform validation for + */ +private ConfigInfos validateConverterConfig( +Map connectorConfig, +ConfigValue pluginConfigValue, +Class pluginInterface, +Function configDefAccessor, +String pluginName, +String pluginProperty, +Map defaultProperties +) { +Objects.requireNonNull(connectorConfig); +Objects.requireNonNull(pluginInterface); +Objects.requireNonNull(configDefAccessor); +Objects.requireNonNull(pluginName); +Objects.requireNonNull(pluginProperty); + +String pluginClass = connectorConfig.get(pluginProperty); + +if (pluginClass == null +|| pluginConfigValue == null +|| !pluginConfigValue.errorMessages().isEmpty() +) { +// Either no custom converter was specified, or one was specified but there's a problem with it. +// No need to proceed any further. +return null; +} + +T pluginInstance; +try { +pluginInstance = Utils.newInstance(pluginClass, pluginInterface); Review Comment: I think this is actually correct. All calls to `validateConverterConfig` take place within a `LoaderSwap` that causes the connector's classloader to be used, which unless I'm mistaken matches the behavior when instantiating tasks (loader swap [here](https://github.com/apache/kafka/blob/4825c89d14e5f1b2da7e1f48dac97888602028d7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L654), converter instantiation [here](https://github.com/apache/kafka/blob/4825c89d14e5f1b2da7e1f48dac97888602028d7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L666-L674)). It's true that `Plugins::newConverter` and `Plugins::newHeaderConverter` are used instead of `Utils::newInstance` when starting tasks, but when invoking the `Plugins` methods with `classLoaderUsage` set to `CURRENT_CLASSLOADER`, no classloader swapping takes place, so the connector loader is still used. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -392,6 +399,146 @@ protected Map validateSourceConnectorConfig(SourceConnector return configDef.validateAll(config); } +/** + * General-purpose validation logic for converters that are configured directly + * in a connector config (as opposed to inherited from the worker config). + * @param connectorConfig the configuration for the connector; may not be null + * @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config; + *
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
C0urante commented on code in PR #14309: URL: https://github.com/apache/kafka/pull/14309#discussion_r1586590857 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -562,8 +709,13 @@ ConfigInfos validateConnectorConfig( configKeys.putAll(configDef.configKeys()); allGroups.addAll(configDef.groups()); configValues.addAll(config.configValues()); -ConfigInfos configInfos = generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups)); +// do custom converter-specific validation +ConfigInfos headerConverterConfigInfos = validateHeaderConverterConfig(connectorProps, validatedConnectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG)); +ConfigInfos keyConverterConfigInfos = validateKeyConverterConfig(connectorProps, validatedConnectorConfig.get(KEY_CONVERTER_CLASS_CONFIG)); +ConfigInfos valueConverterConfigInfos = validateValueConverterConfig(connectorProps, validatedConnectorConfig.get(VALUE_CONVERTER_CLASS_CONFIG)); Review Comment: Yeah, the alternative was to pass in the entire `validatedConnectorConfig` and let the various `validateXxxConverterConfig` methods pull out the relevant `ConfigValue` field. But this seemed strange considering those methods only require a single `ConfigValue` object. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -392,6 +399,146 @@ protected Map validateSourceConnectorConfig(SourceConnector return configDef.validateAll(config); } +/** + * General-purpose validation logic for converters that are configured directly + * in a connector config (as opposed to inherited from the worker config). + * @param connectorConfig the configuration for the connector; may not be null + * @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config; + * may be null, in which case no validation will be performed under the assumption that the + * connector will use inherit the converter settings from the worker + * @param pluginInterface the interface for the plugin type + *(e.g., {@code org.apache.kafka.connect.storage.Converter.class}); + *may not be null + * @param configDefAccessor an accessor that can be used to retrieve a {@link ConfigDef} + * from an instance of the plugin type (e.g., {@code Converter::config}); + * may not be null + * @param pluginName a lowercase, human-readable name for the type of plugin (e.g., {@code "key converter"}); + * may not be null + * @param pluginProperty the property used to define a custom class for the plugin type + * in a connector config (e.g., {@link ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG}); + * may not be null + * @param defaultProperties any default properties to include in the configuration that will be used for + * the plugin; may be null + + * @return a {@link ConfigInfos} object containing validation results for the plugin in the connector config, + * or null if no custom validation was performed (possibly because no custom plugin was defined in the connector + * config) + + * @param the plugin class to perform validation for + */ +private ConfigInfos validateConverterConfig( +Map connectorConfig, +ConfigValue pluginConfigValue, +Class pluginInterface, +Function configDefAccessor, +String pluginName, +String pluginProperty, +Map defaultProperties +) { +Objects.requireNonNull(connectorConfig); +Objects.requireNonNull(pluginInterface); +Objects.requireNonNull(configDefAccessor); +Objects.requireNonNull(pluginName); +Objects.requireNonNull(pluginProperty); + +String pluginClass = connectorConfig.get(pluginProperty); + +if (pluginClass == null +|| pluginConfigValue == null +|| !pluginConfigValue.errorMessages().isEmpty() +) { +// Either no custom converter was specified, or one was specified but there's a problem with it. +// No need to proceed any further. +return null; +} + +T pluginInstance; +try { +pluginInstance = Utils.newInstance(pluginClass, pluginInterface); +} catch (ClassNotFoundException | RuntimeException e) { +log.error("Failed to instantiate {} class {}; this should have been caught by prior validation logic", pluginName, pluginClass, e); +pluginConfigValue.addErrorMessage("Failed to load class " + plugin
[jira] [Commented] (KAFKA-16022) AsyncKafkaConsumer sometimes complains “No current assignment for partition {}”
[ https://issues.apache.org/jira/browse/KAFKA-16022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842730#comment-17842730 ] Philip Nee commented on KAFKA-16022: hi [~phuctran] - I believe this came up during integration testing. You can try to see if the fetch request manager test also emits this error. > AsyncKafkaConsumer sometimes complains “No current assignment for partition > {}” > --- > > Key: KAFKA-16022 > URL: https://issues.apache.org/jira/browse/KAFKA-16022 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > This seems to be a timing issue that before the member receives any > assignment from the coordinator, the fetcher will try to find the current > position causing "No current assignment for partition {}". This creates a > small amount of noise to the log. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586559943 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -273,9 +310,18 @@ void cleanup() { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { sendUnsentRequests(timer); + +LinkedList allEvents = new LinkedList<>(); +applicationEventQueue.drainTo(allEvents); +List> completableEvents = allEvents +.stream() +.filter(e -> e instanceof CompletableApplicationEvent) +.map(e -> (CompletableApplicationEvent) e) +.collect(Collectors.toList()); Review Comment: 😆 Yes, I went back and forth on this at least three times during development. I'll look at switching back to the approach you suggest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586557142 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection topics, Optional T processBackgroundEvents(EventProcessor eventProcessor, + T processBackgroundEvents(EventProcessor eventProcessor, Review Comment: I made the documentation changes you requested and removed the logging to make the logic simpler. When you state that it seems like "overkill to have all this code for something we don't need now, or know if we we'll need some day)," I'm a bit confused 🤔 Because unsubscribing may require invoking `ConsumerRebalanceListener` callbacks, we need a way to check and run those events that are coming from the background thread, right. I do agree that it's overkill to have this broken out as a separate method since it's only used for the `unsubscribe()` case. IIRC, there was some talk of another use case for this, and it does make unit testing it easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586557142 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection topics, Optional T processBackgroundEvents(EventProcessor eventProcessor, + T processBackgroundEvents(EventProcessor eventProcessor, Review Comment: I made the documentation changes you requested and removed the logging to make the logic simpler. When you state that it seems like "overkill to have all this code for something we don't need now, or know if we we'll need some day," I'm a bit confused 🤔 Because unsubscribing may require invoking `ConsumerRebalanceListener` callbacks, we need a way to check and run those events that are coming from the background thread, right. I do agree that it's overkill to have this broken out as a separate method since it's only used for the `unsubscribe()` case. IIRC, there was some talk of another use case for this, and it does make unit testing it easier. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection topics, Optional T processBackgroundEvents(EventProcessor eventProcessor, + T processBackgroundEvents(EventProcessor eventProcessor, Review Comment: I made the documentation changes you requested and removed the logging to make the logic simpler. When you state that it seems like "overkill to have all this code for something we don't need now, or know if we we'll need some day," I'm a bit confused 🤔 Because unsubscribing may require invoking `ConsumerRebalanceListener` callbacks, we need a way to check and run those events that are coming from the background thread, right? I do agree that it's overkill to have this broken out as a separate method since it's only used for the `unsubscribe()` case. IIRC, there was some talk of another use case for this, and it does make unit testing it easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership [kafka]
chia7712 commented on PR #15055: URL: https://github.com/apache/kafka/pull/15055#issuecomment-2088823785 > I have the correct file still saved on my new forced update but I can't link it to this issue. I don't want to revert because I accidently pushed all the commits from the past few months as part of this one. maybe you can file a new PR ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership
[ https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842727#comment-17842727 ] Alexander Aghili commented on KAFKA-16027: -- Hi [~johnnyhsu], I have added it the feature but messed up with some git things. See: https://github.com/apache/kafka/pull/15055 My fork on my Kafka-16027 branch has the new implementation so I don't think this is necessary to work on but if you want to help sort out the issues related to making a pull request please feel free. > Refactor MetadataTest#testUpdatePartitionLeadership > --- > > Key: KAFKA-16027 > URL: https://issues.apache.org/jira/browse/KAFKA-16027 > Project: Kafka > Issue Type: Improvement >Reporter: Philip Nee >Assignee: Alexander Aghili >Priority: Minor > Labels: newbie > > MetadataTest#testUpdatePartitionLeadership is extremely long. I think it is > pretty close to the 160 line method limit - I tried to modfity it but it > would hit the limit when i tried to break things into separated lines. > The test also contains two tests, so it is best to split it into two separate > tests. > We should also move this to ConsumerMetadata.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586553978 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.feature.SupportedVersionRange; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting + * from the static configuration. + */ +final public class VoterSet { +private final Map voters; + +VoterSet(Map voters) { +if (voters.isEmpty()) { +throw new IllegalArgumentException("Voters cannot be empty"); +} + +this.voters = voters; +} + +/** + * Returns the socket address for a given voter at a given listener. + * + * @param voter the id of the voter + * @param listener the name of the listener + * @return the socket address if it exist, otherwise {@code Optional.empty()} + */ +public Optional voterAddress(int voter, String listener) { +return Optional.ofNullable(voters.get(voter)) +.flatMap(voterNode -> voterNode.address(listener)); +} + +/** + * Returns all of the voter ids. + */ +public Set voterIds() { +return voters.keySet(); +} + +/** + * Adds a voter to the voter set. + * + * This object is immutable. A new voter set is returned if the voter was added. + * + * A new voter can be added to a voter set if its id doesn't already exist in the voter set. + * + * @param voter the new voter to add + * @return a new voter set if the voter was added, otherwise {@code Optional.empty()} + */ +public Optional addVoter(VoterNode voter) { +if (voters.containsKey(voter.id())) { +return Optional.empty(); Review Comment: But we are checking the invariant here. Let me give an example. Let's say that the set of voters is `(0, DirId0), (1, DirId1), (2, DirId2)`. If `(2, DirId2')` (notice that the replica id is the same but the directory id is different) tries to join the set of voters, the leader will call `VoterSet::addVoter((2, DirId2'))`. This call will return `Optional.empty()`. Meaning that the voter was not added. I think I understand the source of the confusion. Maybe `addVoter` should return different values if `replica id` already exist vs `(replica id, replica directory id)` already exist. The KIP doesn't distinguish between these two cases: > DUPLICATE_VOTER - when the request contains a replica id is already in the committed set of voters. Note that this means that duplicate replica ids are not allowed. This is useful to make automatic voter addition safer. from [AddVoter handling](https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes#KIP853:KRaftControllerMembershipChanges-Handling.3). Do you mind if we revisit this when we implement [AddVoter RPC and request handling](https://issues.apache.org/jira/browse/KAFKA-16535)? I mainly added this so that I can write some of the tests in `VoterSetTest`. This code is not used by `raft/src/main`. Note that to implement the `UpdateVoter` RPC I will probably add an `VoterSet::updateVoter` methods that will implement the invariants of that operations. > If the replica id tracked doesn't have a replica directory id, update it with the replica directory id provided in the request. from [UpdateVoter handling](https://cwiki.apache.org/confluence/
Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership [kafka]
Alexander-Aghili commented on PR #15055: URL: https://github.com/apache/kafka/pull/15055#issuecomment-2088813846 I have the correct file still saved on my new forced update but I can't link it to this issue. I don't want to revert because I accidently pushed all the commits from the past few months as part of this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586542049 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1853,6 +1824,40 @@ private void subscribeInternal(Collection topics, Optional processor) { Review Comment: I renamed `process()` as `processBackgroundEvents()`. Is that OK? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586539952 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +/** + * The {@code CompletableEventReaper} is responsible for tracking any {@link CompletableEvent}s that were processed, + * making sure to reap them if they complete normally or pass their deadline. This is done so that we enforce an upper + * bound on the amount of time the event logic will execute. + */ +public class CompletableEventReaper> { + +private final Logger log; + +/** + * List of tracked events that we are candidates to expire or cancel when reviewed. Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586537358 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +/** + * The {@code CompletableEventReaper} is responsible for tracking any {@link CompletableEvent}s that were processed, Review Comment: I reworked the comments/documentation to avoid that altogether. PTAL. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842724#comment-17842724 ] Chia-Ping Tsai commented on KAFKA-16223: {quote} I have another set of tests already migrated and plan to open the second PR soon. {quote} this is great! Let's review/merge your PR first. And we will take over remaining if you have no enough time. > Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest > --- > > Key: KAFKA-16223 > URL: https://issues.apache.org/jira/browse/KAFKA-16223 > Project: Kafka > Issue Type: Sub-task > Components: connect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586526016 ## raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java: ## @@ -261,7 +260,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) { * @param snapshotId the end offset and epoch that identifies the snapshot * @return a writable snapshot if it doesn't already exist */ -Optional storeSnapshot(OffsetAndEpoch snapshotId); +Optional createNewSnapshotUnchecked(OffsetAndEpoch snapshotId); Review Comment: So the use cases are very different hence the reason why I separated the methods. `createNewSnapshot` is indirectly called by the KRaft state machine. In this case the `ReplicatedLog` makes sure that the provided snapshot id (offset and epoch) is consistent with the log. `createNewSnapshotUnchecked` is used by the local replica to override the entire log with the leader's snapshot. For example, right after downloading the snapshot we fully truncate the log: https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459-L1473 This part of the code calls `truncateToLatestSnapshot` which deletes all of the old snapshots and replaces the log with an empty log that start at the just downloaded snapshot (the latest snapshot). In other words `truncateToLatestSnapshot` makes the `ReplicadLog` consistent after the direct use of `createNewSnapshotUnchecked`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586524076 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +/** + * The {@code CompletableEventReaper} is responsible for tracking any {@link CompletableEvent}s that were processed, Review Comment: Unfortunately, the term "processed" is sufficiently ambiguous 😞 So we're _both_ right 😉 Here, I'm referring to events that had been passed to the `EventProcessor`'s [process()](https://github.com/apache/kafka/blob/74a7ed78cc69f0d28bd18139b90c28468058e111/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java#L40) method. Which, sadly, isn't even correct, because they're being `add()`-ed to the reaper _before_ they're passed to `EventProcessor.process()` 🤦♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [WIP] KAFKA-16027: refactor MetadataTest [kafka]
johnnychhsu opened a new pull request, #15842: URL: https://github.com/apache/kafka/pull/15842 Jira: https://issues.apache.org/jira/browse/KAFKA-16027 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (2/3) [kafka]
hgeraldino opened a new pull request, #15841: URL: https://github.com/apache/kafka/pull/15841 This is the last remaining Kafka Connect test that needs migration from PowerMock/EasyMock to Mockito. Previous PR: https://github.com/apache/kafka/pull/15520 As usual, I Iook forward for your comments and feedback @C0urante @gharris1727 @clolov @mukkachaitanya @chia7712 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15309: Add custom error handler to Producer [kafka]
aliehsaeedii closed pull request #15731: KAFKA-15309: Add custom error handler to Producer URL: https://github.com/apache/kafka/pull/15731 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1586499310 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +/** + * The {@code CompletableEventReaper} is responsible for tracking any {@link CompletableEvent}s that were processed, + * making sure to reap them if they complete normally or pass their deadline. This is done so that we enforce an upper + * bound on the amount of time the event logic will execute. + */ +public class CompletableEventReaper> { + +private final Logger log; + +/** + * List of tracked events that we are candidates to expire or cancel when reviewed. + */ +private final List tracked; + +public CompletableEventReaper(LogContext logContext) { +this.log = logContext.logger(CompletableEventReaper.class); +this.tracked = new ArrayList<>(); +} + +/** + * Adds a new {@link CompletableEvent event} to track for later completion/expiration. + * + * @param event Event to track + */ +public void add(T event) { +tracked.add(Objects.requireNonNull(event, "Event to track must be non-null")); +} + +/** + * This method "completes" any {@link CompletableEvent}s that have either expired or completed normally. So this + * is a two-step process: + * + * + * + * For each tracked event which has exceeded its {@link CompletableEvent#deadlineMs() deadline}, an + * instance of {@link TimeoutException} is created and passed to + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * + * For each tracked event of which its {@link CompletableEvent#future() future} is already in the + * {@link CompletableFuture#isDone() done} state, it will be removed from the list of tracked events. + * + * + * + * + * + * This method should be called at regular intervals, based upon the needs of the resource that owns the reaper. + * + * @param currentTimeMs Current time with which to compare against the + * {@link CompletableEvent#deadlineMs() expiration time} + */ +public void reapExpiredAndCompleted(long currentTimeMs) { +log.trace("Reaping expired events"); + +Consumer> timeoutEvent = e -> { +TimeoutException error = new TimeoutException(String.format("%s could not be completed within its timeout", e.getClass().getSimpleName())); +long pastDueMs = currentTimeMs - e.deadlineMs(); +log.debug("Completing event {} exceptionally since it expired {} ms ago", e, pastDueMs); +CompletableFuture f = e.future(); +f.completeExceptionally(error); +}; + +// First, complete (exceptionally) any events that have passed their deadline AND aren't already complete. +tracked.stream() +.filter(e -> !e.future().isDone()) +.filter(e -> currentTimeMs > e.deadlineMs()) +.forEach(timeoutEvent); +// Second, remove any events that are already complete, just to make sure we don't hold references. This will +// include any events that finished successfully as well as any events we just completed exceptionally above. +tracked.removeIf(e -> e.future().isDone()); Review Comment: My first attempt at this resulted in a `ConcurrentModificationException`, since we're removing each entry from the very same list we're iterat
[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842717#comment-17842717 ] Hector Geraldino commented on KAFKA-16223: -- Thanks [~chia7712] [~cmukka20] for following up on this. As you've already noticed, the strategy I took was to create a separate [KafkaConfigBackingStoreMockitoTest|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java] and migrate the tests in batches to facilitate reviews. My original plan was to do this in 3 separate batches, the [first batch|https://github.com/apache/kafka/pull/15520] was merged a few weeks ago, and I have another set of tests already migrated and plan to open the second PR soon. If you guys want to pick up the remaining that's fine by me. I haven't had enough time these past weeks to work on this, but I'd love to see this completed (and the migration to JUnit5 started) before 3.8 > Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest > --- > > Key: KAFKA-16223 > URL: https://issues.apache.org/jira/browse/KAFKA-16223 > Project: Kafka > Issue Type: Sub-task > Components: connect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16652) add unit test for ClusterTemplate offering zero ClusterConfig
[ https://issues.apache.org/jira/browse/KAFKA-16652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TaiJuWu reassigned KAFKA-16652: --- Assignee: TaiJuWu (was: Chia-Ping Tsai) > add unit test for ClusterTemplate offering zero ClusterConfig > - > > Key: KAFKA-16652 > URL: https://issues.apache.org/jira/browse/KAFKA-16652 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Minor > > https://github.com/apache/kafka/blob/31355ef8f948f369e240ebc203f889f187116d75/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java#L94 > If `ClusterTemplate`does not generate any `ClusterConfig`, we will throw > exception. However, we don't have UT for such scenario currently. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586489784 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java: ## @@ -807,4 +809,62 @@ private static void writeSnapshotFooterRecord( builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord); } } + +public static MemoryRecords withKRaftVersionRecord( +long initialOffset, +long timestamp, +int leaderEpoch, +ByteBuffer buffer, +KRaftVersionRecord kraftVersionRecord +) { +writeKRaftVersionRecord(buffer, initialOffset, timestamp, leaderEpoch, kraftVersionRecord); +buffer.flip(); +return MemoryRecords.readableRecords(buffer); +} + +private static void writeKRaftVersionRecord( +ByteBuffer buffer, +long initialOffset, +long timestamp, +int leaderEpoch, +KRaftVersionRecord kraftVersionRecord +) { +try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder( +buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, +TimestampType.CREATE_TIME, initialOffset, timestamp, +RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, +false, true, leaderEpoch, buffer.capacity()) +) { +builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord); +} +} + +public static MemoryRecords withVotersRecord( +long initialOffset, +long timestamp, +int leaderEpoch, +ByteBuffer buffer, +VotersRecord votersRecord +) { +writeVotersRecord(buffer, initialOffset, timestamp, leaderEpoch, votersRecord); Review Comment: Yeah. I see that. Looks like this is an existing issue with existing control record builders. Let me fix the ones that are specific for KRaft. We can fix the other ones in another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]
TaiJuWu commented on code in PR #15800: URL: https://github.com/apache/kafka/pull/15800#discussion_r1586484285 ## core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.junit; + +import kafka.test.annotation.ClusterTemplate; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.TestTemplateInvocationContext; +import java.util.function.Consumer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ClusterTestExtensionsUnitTest { +@Test +@SuppressWarnings("unchecked") +void testProcessClusterTemplate() { +ClusterTestExtensions ext = new ClusterTestExtensions(); +ExtensionContext context = mock(ExtensionContext.class); +Consumer testInvocations = mock(Consumer.class); +ClusterTemplate annot = mock(ClusterTemplate.class); +when(annot.value()).thenReturn(""); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16272: -- Fix Version/s: 3.8.0 (was: 4.0.0) > Update connect_distributed_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16272 > URL: https://issues.apache.org/jira/browse/KAFKA-16272 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Sagar Rao >Priority: Major > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{connect_distributed_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16272: -- Priority: Major (was: Blocker) > Update connect_distributed_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16272 > URL: https://issues.apache.org/jira/browse/KAFKA-16272 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Sagar Rao >Priority: Major > Labels: kip-848-client-support, system-tests > Fix For: 4.0.0 > > > This task is to update the test method(s) in {{connect_distributed_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)