Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]
philipnee commented on PR #14680: URL: https://github.com/apache/kafka/pull/14680#issuecomment-1791913330 Hi @dajac @kirktrue - Thanks for offering valuable feedbacks to this PR, much appreciated. I wonder how do you feel about using a local queue to stash the new callbacks? The goal really is to ensure these callbacks can be invoked on commit, poll, and close calls. Previously, David suggested that we could try to use the background queue; however, this is not really a feasible solution because we don't want the consumer to poll the queue during a commit (which could spit out a bunch of errors and rebalance events). I don't have a clean way of doing this unless we make a KIP change to enforce callbacks only invoked on poll. This is idea, but probably out of the scope. Any suggestions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]
jeffkbkim commented on code in PR #14387: URL: https://github.com/apache/kafka/pull/14387#discussion_r1381174247 ## checkstyle/import-control.xml: ## @@ -240,6 +240,7 @@ + Review Comment: it's ready -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15769: Fix wrong log with exception [kafka]
GOODBOY008 commented on PR #14683: URL: https://github.com/apache/kafka/pull/14683#issuecomment-1791899817 > LGTM, Can you check in the code base if there are other cases like this? Yes , I'm willing to check all code like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15782) Establish concrete project conventions to define public APIs that require a KIP
A. Sophie Blee-Goldman created KAFKA-15782: -- Summary: Establish concrete project conventions to define public APIs that require a KIP Key: KAFKA-15782 URL: https://issues.apache.org/jira/browse/KAFKA-15782 Project: Kafka Issue Type: Improvement Reporter: A. Sophie Blee-Goldman There seems to be no concrete definition that establishes project-specific conventions for what is and is not considered a public API change that requires a KIP. This results in frequent drawn-out debates that revisit the same topic and slow things down, and often ends up forcing trivial changes through the KIP process. For a recent example, KIP-998 was required for a one-line change just to add the "protected" access modifier to an otherwise package-private class. See [this comment thread|https://github.com/apache/kafka/pull/14681#discussion_r1378591228] for the full debate on this subject. It would be beneficial and in the long run save us all time to just sit down and hash out the project conventions, such as whether a package-private/protected method on a non-final java class is to be considered a public API, even if the method itself is/was never a public method. This will of course require a KIP, but should help to establish some ground rules to avoid any more superfluous KIPs in the future -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15781: KIP-998, Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
ableegoldman commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1381158659 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: Here is the KIP for this change: https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access As for establishing the rules going forward, I don't personally have the time to drive that right now but I did file jira ticket for a followup KIP to avoid these things in the future: https://issues.apache.org/jira/browse/KAFKA-15782 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Small LogValidator clean ups [kafka]
ex172000 commented on code in PR #14697: URL: https://github.com/apache/kafka/pull/14697#discussion_r1381157089 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -399,12 +402,20 @@ class LogValidatorTest { assertEquals(i, offsetCounter.value); assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") -assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs, + +val expectedShallowOffsetOfMaxTimestamp = if (magic >= RecordVersion.V2.value) { Review Comment: Currently we have up to V2, if this behavior changes in the future, it could break the test? How about making it just equals V2? ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -350,11 +350,14 @@ class LogValidatorTest { (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH) -val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, TimestampType.CREATE_TIME, producerId, - producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, +val recordList = List( Review Comment: ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -350,11 +350,14 @@ class LogValidatorTest { (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH) -val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, TimestampType.CREATE_TIME, producerId, - producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, +val recordList = List( Review Comment: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15781) Change ProducerConfig(props, doLog) constructor to protected
[ https://issues.apache.org/jira/browse/KAFKA-15781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15781: --- Description: See [https://github.com/apache/kafka/pull/14681] KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access was:See https://github.com/apache/kafka/pull/14681 > Change ProducerConfig(props, doLog) constructor to protected > > > Key: KAFKA-15781 > URL: https://issues.apache.org/jira/browse/KAFKA-15781 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Labels: kip > > See [https://github.com/apache/kafka/pull/14681] > > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15781) Change ProducerConfig(props, doLog) constructor to protected
A. Sophie Blee-Goldman created KAFKA-15781: -- Summary: Change ProducerConfig(props, doLog) constructor to protected Key: KAFKA-15781 URL: https://issues.apache.org/jira/browse/KAFKA-15781 Project: Kafka Issue Type: Improvement Components: producer Reporter: A. Sophie Blee-Goldman Assignee: A. Sophie Blee-Goldman See https://github.com/apache/kafka/pull/14681 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
ableegoldman commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r138110 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: Fair enough -- I will write up a quick KIP. Thanks for clarifying about the project-specific conventions, or rather, our lack of them. I suppose it would be a good idea to have a discussion around the project conventions and define some concrete rules and/or exceptions, but I will leave that to someone/sometime else -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
mjsax commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1381144344 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: I would be in strong favor of clarifying (or changing) the rules. Requiring a KIP for a change like this seems to defeat he purpose of the KIP process. Sophie could do a quick KIP in parallel to unblock this PR if she want, but to avoid such discussion in the future and to make it simpler to just "do stuff" that is clearly straightforward, changing/clarifying the rules seems to be worth the effort. @ableegoldman do you want to drive a discussion to clarify the rules? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381143945 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE + * state if it is fully reconciled. * - * - ASSIGNING: - * This state means that the member waits on partitions which are still owned by other members in the - * group. It remains in this state until they are all freed up. + * - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT + * When the assignment is acknowledged, the member remains in the + * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed. * - * - STABLE: - * This state means that the member has received all its assigned partitions. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. + * + * - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS + * When the assignment is acknowledged, the member transitions to the + * UNRELEASED_PARTITIONS if newly assigned partitions are not available yet. + * + * - UNRELEASED_PARTITIONS: + * The member's reconciliation cannot progress because newly assigned partitions + * are still owned by other members in the group. They are not released yet. * - * The reconciliation process is started or re-started whenever a new target assignment is installed; - * the epoch of the new target assignment is different from the next epoch of the member. In this transient - * state, the assigned partitions, the partitions pending revocation and the partitions pending assignment - * are updated. If the partitions pending revocation is not empty, the state machine transitions to - * REVOKING; if partitions pending assignment is not empty, it transitions to ASSIGNING; otherwise it - * transitions to STABLE. + * Valid Transitions: + * - UNRELEASED_PARTITIONS -> STABLE + * The member may transition to the STABLE state if the partitions
Re: [PR] [WIP]KAFKA-15444: Native docker image [kafka]
VedarthConfluent commented on code in PR #14556: URL: https://github.com/apache/kafka/pull/14556#discussion_r1381142228 ## docker/test/requirements.txt: ## @@ -0,0 +1,6 @@ +confluent_kafka Review Comment: It has been removed. These changes are part of KIP 975 PR raised here:- https://github.com/apache/kafka/pull/14552. (commit 6f64ce54254b985a50b3c5d8d63711cc6b71ab18) They'll reflect here once this PR is updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14957: Update-Description-String [kafka]
mjsax commented on code in PR #13909: URL: https://github.com/apache/kafka/pull/13909#discussion_r1381139811 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -1216,6 +1254,7 @@ public static class ConfigKey { public final List dependents; public final Recommender recommender; public final boolean internalConfig; +public final Object overwrittenValue; Review Comment: Can we find a better name? Or at least add a comment to explain what this is? Given what we use `overwrittenValue` to generate HTML only, should it be `String` type? ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -1238,6 +1277,31 @@ public ConfigKey(String name, Type type, Object defaultValue, Validator validato this.displayName = displayName; this.recommender = recommender; this.internalConfig = internalConfig; +this.overwrittenValue = null; +} + +public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Review Comment: Do we need a new constructor? Might be simpler to just keep one, and pass in `null` for the new parameter if not used? ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -155,6 +155,29 @@ public ConfigDef define(String name, Type type, Object defaultValue, Validator v return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender, false)); } +/** + * Define a new configuration + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param validator the validator to use in checking the correctness of the config + * @param importancethe importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param dependentsthe configurations that are dependents of this configuration + * @param recommender the recommender provides valid values given the parent configuration values + * @param overwrittenValue the overwritten value of this configuration Review Comment: > the overwritten value Not every descriptive IMHO. Can we explain it better? ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -1265,6 +1329,8 @@ protected String getConfigValue(ConfigKey key, String headerName) { if (key.hasDefault()) { if (key.defaultValue == null) return "null"; +if (key.overwrittenValue != null) Review Comment: Would this need to go before `if (key.defaultValue == null)` ? (For `state.dir` it does not matter because it does have a default, but for general purpose it seems better to change the order and make `overwrittenValue` the winning parameter if used?) ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -820,6 +820,7 @@ public class StreamsConfig extends AbstractConfig { .define(STATE_DIR_CONFIG, Type.STRING, System.getProperty("java.io.tmpdir") + File.separator + "kafka-streams", +System.getProperty("kafka.streams.state.dir"), Review Comment: I think it might be simpler to just hardcode what we want to put into the docs, ie, just pass in String `"${java.io.tmpdir}"`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. Review Comment: small question for my understanding, why are we just waiting for _at least_ one of them to be released? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE Review Comment: Small question for my understanding, is the ack sent by the client once the new target assignment is received/reconciliation starts or once the reconciliation is complete? Or once partial reconciliation is complete without the revoked partitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381127886 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -251,144 +260,52 @@ private ConsumerGroupMember transitionToNewTargetAssignmentState() { } if (!newPartitionsPendingRevocation.isEmpty()) { -// If the partition pending revocation set is not empty, we transition the -// member to revoking and keep the current epoch. The transition to the new -// state is done when the member is updated. +// If there are partitions to be revoked, the member remains in its current +// epoch and requests the revocation of those partitions. It transitions to +// the UNACKNOWLEDGED_ASSIGNMENT state to wait until the new assignment is +// acknowledged. return new ConsumerGroupMember.Builder(member) + .setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT) .setAssignedPartitions(newAssignedPartitions) -.setPartitionsPendingRevocation(newPartitionsPendingRevocation) -.setPartitionsPendingAssignment(newPartitionsPendingAssignment) -.setTargetMemberEpoch(targetAssignmentEpoch) +.setRevokedPartitions(newPartitionsPendingRevocation) .build(); -} else { -if (!newPartitionsPendingAssignment.isEmpty()) { -// If the partitions pending assignment set is not empty, we check -// if some or all partitions are free to use. If they are, we move -// them to the partitions assigned set. -maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment); -} - -// We transition to the target epoch. If the partitions pending assignment -// set is empty, the member transition to stable, otherwise to assigning. -// The transition to the new state is done when the member is updated. +} else if (!newPartitionsPendingAssignment.isEmpty()) { +// If there are partitions to be assigned, the member transitions to the +// target epoch and requests the assignment of those partitions. Note that +// the partitions are directly added to the assigned partitions set. The +// member transitions to the UNACKNOWLEDGED_ASSIGNMENT state to wait until +// the new assignment is acknowledged. +newPartitionsPendingAssignment.forEach((topicId, partitions) -> { +newAssignedPartitions +.computeIfAbsent(topicId, __ -> new HashSet<>()) +.addAll(partitions); +}); return new ConsumerGroupMember.Builder(member) + .setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT) +.updateMemberEpoch(targetAssignmentEpoch) .setAssignedPartitions(newAssignedPartitions) -.setPartitionsPendingRevocation(Collections.emptyMap()) -.setPartitionsPendingAssignment(newPartitionsPendingAssignment) -.setPreviousMemberEpoch(member.memberEpoch()) -.setMemberEpoch(targetAssignmentEpoch) -.setTargetMemberEpoch(targetAssignmentEpoch) +.setRevokedPartitions(Collections.emptyMap()) .build(); -} -} - -/** - * Tries to transition from Revoke to Assigning or Stable. This is only - * possible when the member acknowledges that it only owns the partition - * in the assigned partitions. - * - * @return A new ConsumerGroupMember with the new state or the current one - * if the member stays in the current state. - */ -private ConsumerGroupMember maybeTransitionFromRevokingToAssigningOrStable() { -if (member.partitionsPendingRevocation().isEmpty() || matchesAssignedPartitions(ownedTopicPartitions)) { -Map> newAssignedPartitions = deepCopy(member.assignedPartitions()); -Map> newPartitionsPendingAssignment = deepCopy(member.partitionsPendingAssignment()); - -if (!newPartitionsPendingAssignment.isEmpty()) { -// If the partitions pending assignment set is not empty, we check -// if some or all partitions are free to use. If they are, we move -// them to the assigned set. -maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment); -} - -// We transition to the target epoch. If the partitions pending assignment -// set is empty, the member transition to stable, otherwise to assigning. -// The transition to the new state is done when the member is updated. +
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381123599 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS Review Comment: I'm confused about this as well, I would think it would go from stable -> unack (when new assignment computed) -> unreleased (if there's unreleased partitions). If it's possible to ack the assignment even when there's unreleased partitions, we're transitioning from unack to unreleased right so how does it get to stable with unreleased partitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE Review Comment: Small question for my understanding, is the ack sent by the client once the new target assignment is received/reconciliation starts or once the reconciliation is complete? Or once partial reconciliation is complete without the revoked partitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE Review Comment: Small question for my understanding, is the ack sent once the assignment is received or once the reconciliation is complete? Or once partial reconciliation is complete without the revoked partitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15776) Configurable delay timeout for DelayedRemoteFetch request
[ https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15776: - Description: We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the given amount of time when there is no data available to serve the FETCH request. {code:java} The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes. {code} [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41] Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user on how to configure optimal value for each purpose. Moreover, the config is of *LOW* importance and most of the users won't configure it and use the default value of 500 ms. Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to higher number of expired delayed remote fetch requests when the remote storage have any degradation. We should introduce one config (preferably server config) to define the delay timeout for DelayedRemoteFetch requests (or) take it from client similar to {{request.timeout.ms}}. was: We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the given amount of time when there is no data available to serve the FETCH request. {code:java} The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes. {code} [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41] Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user on how to configure optimal value for each purpose. Moreover, the config is of *LOW* importance and most of the users won't configure it and use the default value of 500 ms. Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to higher number of expired delayed remote fetch request when the remote storage have degrades to serve within the timeout. We should introduce one config (preferably server config) to define the delay timeout for DelayedRemoteFetch requests (or) take it from client similar to {{request.timeout.ms}}. > Configurable delay timeout for DelayedRemoteFetch request > - > > Key: KAFKA-15776 > URL: https://issues.apache.org/jira/browse/KAFKA-15776 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for > DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the > given amount of time when there is no data available to serve the FETCH > request. > {code:java} > The maximum amount of time the server will block before answering the fetch > request if there isn't sufficient data to immediately satisfy the requirement > given by fetch.min.bytes. > {code} > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41] > Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the > user on how to configure optimal value for each purpose. Moreover, the config > is of *LOW* importance and most of the users won't configure it and use the > default value of 500 ms. > Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to > higher number of expired delayed remote fetch requests when the remote > storage have any degradation. > We should introduce one config (preferably server config) to define the delay > timeout for DelayedRemoteFetch requests (or) take it from client similar to > {{request.timeout.ms}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer
[ https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15777: - Description: A consumer can configure the amount of local bytes to read from each partition in the FETCH request. {{max.fetch.bytes}} = 50 MB {{max.partition.fetch.bytes}} = 1 MB Similar to this, the consumer should be able to configure {{max.remote.partition.fetch.bytes}} = 4 MB. While handling the {{FETCH}} request, if we encounter a partition to read data from remote storage, then rest of the partitions in the request are ignored. Essentially, we are serving only 1 MB of remote data per FETCH request when all the partitions in the request are to be served from the remote storage. Providing one more configuration to the client help the user to tune the values depending on their storage plugin. The user might want to optimise the number of calls to remote storage vs amount of bytes returned back to the client in the FETCH response. [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454] was: A consumer can configure the amount of local bytes to read from each partition in the FETCH request. {{max.fetch.bytes}} = 50 MB {{max.partition.fetch.bytes}} = 1 MB Similar to this, the consumer should be able to configure {{max.remote.partition.fetch.bytes}} = 4 MB. While handling the {{FETCH}} request, if we encounter a partition to read data from remote storage, then rest of the partitions in the request are ignored. Essentially, we are serving only 1 MB of remote data per FETCH request when all the partitions in the request are to be served from the remote storage. Providing one more configuration to the client help the user to tune the values depends on their storage plugin. The user might want to optimise the number of calls to remote storage vs amount of bytes returned back to the client in the FETCH response. [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454] > Configurable remote fetch bytes per partition from Consumer > --- > > Key: KAFKA-15777 > URL: https://issues.apache.org/jira/browse/KAFKA-15777 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > A consumer can configure the amount of local bytes to read from each > partition in the FETCH request. > {{max.fetch.bytes}} = 50 MB > {{max.partition.fetch.bytes}} = 1 MB > Similar to this, the consumer should be able to configure > {{max.remote.partition.fetch.bytes}} = 4 MB. > While handling the {{FETCH}} request, if we encounter a partition to read > data from remote storage, then rest of the partitions in the request are > ignored. Essentially, we are serving only 1 MB of remote data per FETCH > request when all the partitions in the request are to be served from the > remote storage. > Providing one more configuration to the client help the user to tune the > values depending on their storage plugin. The user might want to optimise the > number of calls to remote storage vs amount of bytes returned back to the > client in the FETCH response. > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381123599 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS Review Comment: I'm confused about this as well, I would think it would go from stable -> unack (when new assignment computed) -> unreleased (if there's unreleased partitions). If it's possible to ack the assignment even when there's unreleased partitions, we're transitioning from unack to unreleased right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. Review Comment: small question for my understanding, why are we just waiting for _at least_ one of them to be released? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381118918 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. Review Comment: Do we want to transition to unacknowledged assignment every time a new assignment is computed and sent? I'm confused about the "when a new target assignment is installed" prerequisite -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. Review Comment: small question for my understanding, why are we waiting for just _at least_ one of them to be released? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380922729 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member Review Comment: Just for my understanding, when we say new target assignment installed do we mean the assignment reconciliation on the client aka assignment installed on the client? Also does installed mean that the assignment is already acknowledged ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381118918 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. Review Comment: Do we want to transition to unacknowledged assignment every time a new assignment is computed and sent? I'm confused about the necessity of the "when a new target assignment is installed" prerequisite -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15740: KRaft support in DeleteOffsetsConsumerGroupCommandIntegrationTest [kafka]
linzihao1999 commented on code in PR #14669: URL: https://github.com/apache/kafka/pull/14669#discussion_r1381106437 ## core/src/test/scala/unit/kafka/admin/DeleteOffsetsConsumerGroupCommandIntegrationTest.scala: ## @@ -20,7 +20,7 @@ package kafka.admin import java.util.Properties import kafka.server.Defaults -import kafka.utils.TestUtils +import kafka.utils.{TestInfoUtils, TestUtils} Review Comment: yes, more pretty -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14957: Update-Description-String [kafka]
Owen-CH-Leung commented on PR #13909: URL: https://github.com/apache/kafka/pull/13909#issuecomment-1791810020 @mjsax Thanks. It really clarifies a lot! I've modified the code to allow overriding default value for `state.dir` by setting the system property `kafka.streams.state.dir`. So in `build.gradle`, if we set the following: ```shell task genStreamsConfigDocs(type: JavaExec) { classpath = sourceSets.main.runtimeClasspath mainClass = 'org.apache.kafka.streams.StreamsConfig' if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } standardOutput = new File(generatedDocsDir, "streams_config.html").newOutputStream() systemProperty 'kafka.streams.state.dir', '/some/random/path' } ``` then the output html page will have the default value shown as `/some/random/path`. If we set it as empty string, the output will be an empty string also. If not set, the default logic `System.getProperty("java.io.tmpdir") + File.separator + "kafka-streams"` will be applied Let me know what you think. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954663 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE + * state if it is fully reconciled. * - * - ASSIGNING: - * This state means that the member waits on partitions which are still owned by other members in the - * group. It remains in this state until they are all freed up. + * - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT + * When the assignment is acknowledged, the member remains in the + * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed. * - * - STABLE: - * This state means that the member has received all its assigned partitions. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. + * + * - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS + * When the assignment is acknowledged, the member transitions to the + * UNRELEASED_PARTITIONS if newly assigned partitions are not available yet. + * + * - UNRELEASED_PARTITIONS: + * The member's reconciliation cannot progress because newly assigned partitions + * are still owned by other members in the group. They are not released yet. * - * The reconciliation process is started or re-started whenever a new target assignment is installed; - * the epoch of the new target assignment is different from the next epoch of the member. In this transient - * state, the assigned partitions, the partitions pending revocation and the partitions pending assignment - * are updated. If the partitions pending revocation is not empty, the state machine transitions to - * REVOKING; if partitions pending assignment is not empty, it transitions to ASSIGNING; otherwise it - * transitions to STABLE. + * Valid Transitions: + * - UNRELEASED_PARTITIONS -> STABLE + * The member may transition to the STABLE state if the partitions
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954345 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE + * state if it is fully reconciled. * - * - ASSIGNING: - * This state means that the member waits on partitions which are still owned by other members in the - * group. It remains in this state until they are all freed up. + * - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT + * When the assignment is acknowledged, the member remains in the + * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed. * - * - STABLE: - * This state means that the member has received all its assigned partitions. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. + * + * - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS + * When the assignment is acknowledged, the member transitions to the + * UNRELEASED_PARTITIONS if newly assigned partitions are not available yet. + * + * - UNRELEASED_PARTITIONS: + * The member's reconciliation cannot progress because newly assigned partitions + * are still owned by other members in the group. They are not released yet. * - * The reconciliation process is started or re-started whenever a new target assignment is installed; - * the epoch of the new target assignment is different from the next epoch of the member. In this transient - * state, the assigned partitions, the partitions pending revocation and the partitions pending assignment - * are updated. If the partitions pending revocation is not empty, the state machine transitions to - * REVOKING; if partitions pending assignment is not empty, it transitions to ASSIGNING; otherwise it - * transitions to STABLE. + * Valid Transitions: + * - UNRELEASED_PARTITIONS -> STABLE + * The member may transition to the STABLE state if the partitions
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954188 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS Review Comment: For this transition, I'm confused which one is the default first transition. Ie, if the client has not yet acknowledged AND we have unreleased partitions which one do we go to first? And is going between these two both ways only possible if a new assignment occurs in the middle? I'm struggling to understand how all transitions are valid ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS Review Comment: For this transition, I'm confused which one is the default first transition. Ie, if the client has not yet acknowledged AND we have unreleased partitions which one do we go to first? And is going between these two both ways only possible if a new assignment occurs in the middle? I'm struggling to understand how all transitions are valid -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380949384 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member Review Comment: I think this is the case where we get a new assignment server side, but nothing changed for the member? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380943451 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout( } /** - * Schedules a revocation timeout for the member. + * Schedules a rebalance timeout for the member. * - * @param groupId The group id. - * @param memberId The member id. - * @param revocationTimeoutMs The revocation timeout. - * @param expectedMemberEpoch The expected member epoch. + * @param groupId The group id. + * @param expectedMemberThe expected member. */ -private void scheduleConsumerGroupRevocationTimeout( +private void scheduleConsumerGroupRebalanceTimeout( String groupId, -String memberId, -long revocationTimeoutMs, -int expectedMemberEpoch +ConsumerGroupMember expectedMember ) { -String key = consumerGroupRevocationTimeoutKey(groupId, memberId); -timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +String key = consumerGroupRebalanceTimeoutKey(groupId, expectedMember.memberId()); +timer.schedule(key, expectedMember.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, true, () -> { try { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); -ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(expectedMember.memberId(), false); -if (member.state() != ConsumerGroupMember.MemberState.REVOKING || -member.memberEpoch() != expectedMemberEpoch) { -log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member " + -"state does not match the expected state.", groupId, memberId); +if (!member.assignmentEquals(expectedMember)) { Review Comment: I also wasn't sure about the states being the same. Is it possible when we are unstable we will be not equal to the expected member? Or am I not understanding when we transition to stable correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380936875 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -301,11 +322,9 @@ public String toString() { private final int previousMemberEpoch; /** - * The next member epoch. This corresponds to the target - * assignment epoch used to compute the current assigned, - * revoking and assigning partitions. + * The member state. */ -private final int targetMemberEpoch; +private final MemberState state; Review Comment: Is the idea with the new code that we just want to match the desired state (ie, add the assigned and remove the revoked) If we change the assignment then we will just bump the epoch and fence older ones? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380929415 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout( } /** - * Schedules a revocation timeout for the member. + * Schedules a rebalance timeout for the member. * - * @param groupId The group id. - * @param memberId The member id. - * @param revocationTimeoutMs The revocation timeout. - * @param expectedMemberEpoch The expected member epoch. + * @param groupId The group id. + * @param expectedMemberThe expected member. */ -private void scheduleConsumerGroupRevocationTimeout( +private void scheduleConsumerGroupRebalanceTimeout( String groupId, -String memberId, -long revocationTimeoutMs, -int expectedMemberEpoch +ConsumerGroupMember expectedMember ) { -String key = consumerGroupRevocationTimeoutKey(groupId, memberId); -timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +String key = consumerGroupRebalanceTimeoutKey(groupId, expectedMember.memberId()); +timer.schedule(key, expectedMember.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, true, () -> { try { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); -ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(expectedMember.memberId(), false); -if (member.state() != ConsumerGroupMember.MemberState.REVOKING || -member.memberEpoch() != expectedMemberEpoch) { -log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member " + -"state does not match the expected state.", groupId, memberId); +if (!member.assignmentEquals(expectedMember)) { Review Comment: So this works because we removed the pending assignment fields from the equality check? In other words, this should only be false when the fields that change indicate the assignment is no longer valid and not that the assignment just failed to complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923649 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed Review Comment: Small question, irrespective of anything else, whenever there's a new assignment sent to the member we transition to stable, is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923649 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed Review Comment: Small question, irrespective of anything else, whenever there's a new assignment sent to the member we transition to stable, is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923297 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout( } /** - * Schedules a revocation timeout for the member. + * Schedules a rebalance timeout for the member. * - * @param groupId The group id. Review Comment: should we have removed this? It is still a param below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380922729 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member Review Comment: Just for my understanding, when we say new target assignment installed do we mean the assignment reconciliation on the client aka assignment installed on the client? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15774: introduce internal StoreFactory [kafka]
ableegoldman commented on code in PR #14659: URL: https://github.com/apache/kafka/pull/14659#discussion_r1380921878 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java: ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.internals.SessionStoreBuilder; +import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.WindowStoreBuilder; + +public class StoreBuilderWrapper implements StoreFactory { Review Comment: Ok one more thing: the javadocs explaining the StoreFactory are great, but I think this needs javaodcs as well. Is it just the default implementation of the StoreFactory class? Is it just here to cover the window/session store cases because they don't yet have a Materializer? etc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380913612 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -934,32 +934,25 @@ private CoordinatorResult consumerGr // 3. Reconcile the member's assignment with the target assignment. This is only required if // the member is not stable or if a new target assignment has been installed. boolean assignmentUpdated = false; -if (updatedMember.state() != ConsumerGroupMember.MemberState.STABLE || updatedMember.targetMemberEpoch() != targetAssignmentEpoch) { +if (!updatedMember.isReconciledTo(targetAssignmentEpoch)) { ConsumerGroupMember prevMember = updatedMember; updatedMember = new CurrentAssignmentBuilder(updatedMember) .withTargetAssignment(targetAssignmentEpoch, targetAssignment) .withCurrentPartitionEpoch(group::currentPartitionEpoch) .withOwnedTopicPartitions(ownedTopicPartitions) .build(); -// Checking the reference is enough here because a new instance -// is created only when the state has changed. -if (updatedMember != prevMember) { Review Comment: Is this due to the change with assignments or just a cleanup we wanted to do? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15774: introduce internal StoreFactory [kafka]
ableegoldman commented on code in PR #14659: URL: https://github.com/apache/kafka/pull/14659#discussion_r1380912197 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; +import java.util.Set; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyConfig; +import org.apache.kafka.streams.processor.StateStore; + +/** + * What! Another mechanism for obtaining a {@link StateStore}? This isn't just + * an abuse of Java-isms... there's good reason for it. Here's how they are + * interconnected: + * + * + * {@link org.apache.kafka.streams.state.StoreBuilder} is the innermost + * layer to provide state stores and is used directly by the PAPI. + * + * {@link org.apache.kafka.streams.state.StoreSupplier} is used by the + * DSL to provide preconfigured state stores as well as type-safe stores + * (e.g. {@link org.apache.kafka.streams.state.KeyValueBytesStoreSupplier}. + * Before passing the {@code StoreSupplier} into the Review Comment: incomplete sentence? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15774: introduce internal StoreFactory [kafka]
ableegoldman commented on code in PR #14659: URL: https://github.com/apache/kafka/pull/14659#discussion_r1379396433 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java: ## @@ -27,26 +31,44 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + /** * Materializes a key-value store as either a {@link TimestampedKeyValueStoreBuilder} or a * {@link VersionedKeyValueStoreBuilder} depending on whether the store is versioned or not. */ -public class KeyValueStoreMaterializer { +public class KeyValueStoreMaterializer implements StoreFactory { private static final Logger LOG = LoggerFactory.getLogger(KeyValueStoreMaterializer.class); private final MaterializedInternal> materialized; +private final Set connectedProcessorNames = new HashSet<>(); + +private Materialized.StoreType defaultStoreType = Materialized.StoreType.ROCKS_DB; Review Comment: nit: I don't love that we define the default store type in this random hard-to-find location. If we ever wanted to change it going forward I think this would be easily missed (or at the least, a pain to debug/locate). Can we at least define a variable for the default defaultStoreType (so...defaultDefaultStoreType? ) and put that in a sensible location? I guess maybe where the storeType itself is defined? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java: ## @@ -27,26 +31,44 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + /** * Materializes a key-value store as either a {@link TimestampedKeyValueStoreBuilder} or a * {@link VersionedKeyValueStoreBuilder} depending on whether the store is versioned or not. */ -public class KeyValueStoreMaterializer { +public class KeyValueStoreMaterializer implements StoreFactory { private static final Logger LOG = LoggerFactory.getLogger(KeyValueStoreMaterializer.class); private final MaterializedInternal> materialized; +private final Set connectedProcessorNames = new HashSet<>(); + +private Materialized.StoreType defaultStoreType = Materialized.StoreType.ROCKS_DB; Review Comment: nit: I don't love that we define the default store type in this random hard-to-find location. If we ever wanted to change it going forward I think this would be easily missed (or at the least, a pain to debug/locate). Can we at least define a variable for the default defaultStoreType (so...defaultDefaultStoreType? ) and put that in a sensible location? I guess maybe where the storeType itself is defined? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on PR #14673: URL: https://github.com/apache/kafka/pull/14673#issuecomment-1791722750 > When partitions are revoked, the state machine cannot advanced until the consumer acknowledges the revocation. However, when partitions are assigned, the state machine does not wait on receiving an acknowledgement at all. This means that the next assignment can be delivered anytime. Just for my understanding, we decided that the assignment should be the same as revocation -- that we must acknowledge before taking on anything new. Is this because the other way (advancing the state machine on revocations) was not possible? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I was saying that appendEntries queues up a log record and we could use it there. I think Artem was not necessarily disagreeing with that idea but rather that a lock should be used at all. My understanding of his comment was that we should move to a model where changes are pending and those state transitions block further work. I think we also discussed that the ordering of the log is not the important part but rather committing stale data. It was unclear to me from Jason's comments if this validation is fully necessary due to how we can sometimes write stale data to an offset partition when a rebalance happens after offsets are are written as part of the transaction but before the transaction is committed. Basically there are two questions I see: 1. Can we make this code more async friendly to avoid locking? (And does the new group coordinator do this?) 2. Do we really need the lock if we can write inconsistent data outside the typical locking mechanisms and we handle it correctly in those cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380899701 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > I was saying that appendEntries queues up a log record and we could use it there. > > I think Artem was not necessarily disagreeing with that idea but rather that a lock should be used at all. My understanding of his comment was that we should move to a model where changes are pending and those state transitions block further work. > > I think we also discussed that the ordering of the log is not the important part but rather committing stale data. It was unclear to me from Jason's comments if this validation is fully necessary due to how we can sometimes write stale data to a transaction when a rebalance happens after records are written but before the transaction is committed. > > Basically there are two questions I see: > > 1. Can we make this code more async friendly to avoid locking? (And does the new group coordinator do this?) > 2. Do we really need the lock if we can write inconsistent data outside the typical locking mechanisms and we handle it correctly in those cases? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > I was saying that appendEntries queues up a log record and we could use it there. > > I think Artem was not necessarily disagreeing with that idea but rather that a lock should be used at all. My understanding of his comment was that we should move to a model where changes are pending and those state transitions block further work. > > I think we also discussed that the ordering of the log is not the important part but rather committing stale data. It was unclear to me from Jason's comments if this validation is fully necessary due to how we can sometimes write stale data to a transaction when a rebalance happens after records are written but before the transaction is committed. > > Basically there are two questions I see: > > 1. Can we make this code more async friendly to avoid locking? (And does the new group coordinator do this?) > 2. Do we really need the lock if we can write inconsistent data outside the typical locking mechanisms and we handle it correctly in those cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I was saying that appendEntries queues up a log record and we could use it there. I think Artem was not necessarily disagreeing with that idea but rather that a lock should be used at all. My understanding of his comment was that we should move to a model where changes are pending and those state transitions block further work. I think we also discussed that the ordering of the log is not the important part but rather committing stale data. It was unclear to me from Jason's comments if this validation is fully necessary due to how we can sometimes write stale data to a transaction when a rebalance happens after records are written but before the transaction is committed. Basically there are two questions I see: 1. Can we make this code more async friendly to avoid locking? (And does the new group coordinator do this?) 2. Do we really need the lock if we can write inconsistent data outside the typical locking mechanisms and we handle it correctly in those cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I was saying that appendEntries queues up a log record and we could use it there. I think Artem was not necessarily disagreeing with that idea but rather that a lock should be used at all. I think we also discussed that the ordering of the log is not the important part but rather committing stale data. It was unclear to me from Jason's comments if this validation is fully necessary due to how we can sometimes write stale data to a transaction when a rebalance happens after records are written but before the transaction is committed. Basically there are two questions I see: 1. Can we make this code more async friendly to avoid locking? (And does the new group coordinator do this?) 2. Do we really need the lock if we can write inconsistent data outside the typical locking mechanisms and we handle it correctly in those cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
ijuma commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1380892982 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: The KIP page states: > Any change that impacts the public interfaces of the project Then it lists examples: > Any class for which the build generates Javadoc Hopefully these parts are not controversial. Then the next question is which parts of a class are part of the public interface. This is defined by Java accessibility rules combined with project specific conventions. When it comes to Java accessibility rules, public classes and methods are obvious. The next bucket are protected methods of non-final public classes - those are also part of the public interface since one can call such methods from a subclass. Newer Java versions also have the concept of sealed classes, but we can ignore those for now. Finally, you have project-specific conventions. Some projects state explicitly which classes are designed for inheritance and all other ones are not supposed to be inherited (and hence their protected methods are not part of the public interface). Kafka (as far as I know) hasn't done this yet, but we probably should. One example of a project specific convention we have is the InterfaceStability annotation (it relaxes compatibility guarantees, but doesn't affect what's considered part of the public interface). Going back to the original question, does this need a KIP? If we are to follow the rules of the project, absolutely. If we want to change the rules, we should have the discussion and update the KIP documentation. Otherwise, it's difficult to be fair and consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380888754 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Yes, the current flow in group coordinator is updating the state, queuing up a corresponding log record for commit, and calling the callback when the log record is committed. It needs the log records for a group to be queued up and committed in the same order as the updating of the state so that (1) the latest state can be replayed from the log records correctly in case of failure and (2) the callback can be called in the updating order. Currently, this is achieved through holding the group lock during updating the state and queuing up of the log record (implemented through `ReplicaManager.append`). The expectation is that queuing up of the log record can be done synchronously. Adding async txn validation dependency inside `ReplicaManager.append` breaks that expectation. To address this issue, we could move the async txn validation dependency out of `ReplicaManager.append`. Alternatively, we could implement another way of synchronously queuing up a log record since `ReplicaManager.append` does more than just queuing up a record. I am not sure holding the group lock just for updating the state is enough for ensuring the ordering of the log records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
ableegoldman commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1380884002 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: Personally I suppose I can see the argument that `protected` APIs in a public and non-final class are themselves part of the public API, as one is technically free to extend that class and would then need to (or at least be able to) leverage those APIs. However...I think one could also argue that the public API is only what is directly accessible to the user, and that if you need to extend a public class in order to access a given API, then that API is not strictly speaking part of the API as it is only indirectly accessible from a custom user-defined class. Imo this is more of a hack/breach of internal APIs than a legitimate use of the public API -- as a user, I would not expect my custom implementation to have any guarantees or to be covered by the public contract, and would considered it my own responsibility/fault if my code failed to compile after an upgrade because there was a change to the `protected` API I was sneakily using via inheritance. Just my own two cents about the situation: that frankly, accessing protected classes via inheritance is closer in spirit to accessing them (or even private methods) via reflection, than it is to accessing a true public API covered by the public contract. Just because one can leverage a feature of Java to gain access to an API, that doesn't make it a public API in itself. All that said, I am still happy to do a KIP if that will be the path of least resistance. @ijuma just let me know because I don't want to merge something that you would feel the need to revert ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: Personally I suppose I can see the argument that `protected` APIs in a public and non-final class are themselves part of the public API, as one is technically free to extend that class and would then need to (or at least be able to) leverage those APIs. However...I think one could also argue that the public API is only what is directly accessible to the user, and that if you need to extend a public class in order to access a given API, then that API is not strictly speaking part of the API as it is only indirectly accessible from a custom user-defined class. Imo this is more of a hack/breach of internal APIs than a legitimate use of the public API -- as a user, I would not expect my custom implementation to have any guarantees or to be covered by the public contract, and would considered it my own responsibility/fault if my code failed to compile after an upgrade because there was a change to the `protected` API I was sneakily using via inheritance. Just my own two cents about the situation: that frankly, accessing protected classes via inheritance is closer in spirit to accessing them (or even private methods) via reflection, than it is to accessing a true public API covered by the public contract. Just because one can leverage a feature of Java to gain access to an API, that doesn't make it a public API in itself. All that said, I am still happy to do a KIP if that will be the path of least resistance. @ijuma just let me know because I don't want to merge something that you would feel the need to revert -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380879868 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: Review Comment: nit: Could we also include a list of all the valid states in the beginning, might be easier to follow the total states -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380878154 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. Review Comment: nit: could we change all the he's to it maybe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]
ableegoldman commented on code in PR #14681: URL: https://github.com/apache/kafka/pull/14681#discussion_r1380876249 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -618,7 +618,7 @@ public ProducerConfig(Map props) { super(CONFIG, props); } -ProducerConfig(Map props, boolean doLog) { +protected ProducerConfig(Map props, boolean doLog) { Review Comment: @ijuma I can do a very quick KIP if you would like (and kick off the voting immediately as I think the change itself is uncontroversial). Obviously I would prefer not to, but am happy with whatever gets this through the fastest, even if that means just doing a KIP to avoid a long drawn-out debate Also, just to take a quick step back, we are not saying that "a change to javadocs of a public class" is what defines the public API, but rather anything that changes the javadocs of what is established as a public API then needs a KIP? In other words, the question is not really about the javadocs, but simply whether `protected` APIs are part of the public API? (I hadn't refreshed the page and so didn't see any of the responses after Bruno's initial comment when I wrote my reply above, but I still would like to clarify this concretely) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Small LogValidator clean ups [kafka]
hachikuji opened a new pull request, #14697: URL: https://github.com/apache/kafka/pull/14697 1. Set shallowOffsetOfMaxTimestamp consistently as the last offset in the batch for v2 compressed and non-compressed data. 2. Rename `RecordConversionStats` to `RecordValidationStats` since one of its fields `temporaryMemoryBytes` does not depend on conversion. 3. Rename `batchIndex` in `recordIndex` in loops over the records in each batch inside `LogValidator`. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [KAFKA-15022] optimization: detect negative cycle from one source [kafka]
lihaosky opened a new pull request, #14696: URL: https://github.com/apache/kafka/pull/14696 ### Description Introduce a dummy node connected to every other node and run Bellman-ford from the dummy node once instead of from every node in the graph. ### Testing Existing test ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]
philipnee commented on code in PR #14680: URL: https://github.com/apache/kafka/pull/14680#discussion_r1380850627 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -1096,4 +1114,47 @@ private void subscribeInternal(Collection topics, Optional 0) { +invoker.executeCallbacks(); +} +} + +// Visible for testing +int callbacks() { +return invoker.callbackQueue.size(); +} + +/** + * Utility class that helps the application thread to invoke user registered callbacks such as + * {@link OffsetCommitCallback}. This is achieved by having the background thread to register a runnable to the + * invoker upon the future completion, and execute the callbacks when user polls the consumer. + */ +private static class CallbackInvoker { +// Thread-safe queue to store callbacks +private final BlockingQueue callbackQueue = new LinkedBlockingQueue<>(); Review Comment: In fact - using the backgroundEventQueue would not be feasible. I just realized the callbacks need to be invoked on all commit calls, and poll. I don't know if there's a better way to do it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15578: Migrating other system tests to use the group coordinator [kafka]
rreddy-22 commented on PR #14582: URL: https://github.com/apache/kafka/pull/14582#issuecomment-1791632833 https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5930/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15780: Wait for consistent KRaft metadata when creating topics [kafka]
splett2 opened a new pull request, #14695: URL: https://github.com/apache/kafka/pull/14695 ### What TestUtils.createTopicWithAdmin calls `waitForAllPartitionsMetadata` which waits for partition(s) to be present in each brokers' metadata cache. This is a sufficient check in ZK mode because the controller sends an LISR request before sending an UpdateMetadataRequest which means that the partition in the `ReplicaManager` will be updated before the metadata cache. In KRaft mode, the metadata cache is updated first, so the check may return before partitions and other metadata listeners are fully initialized. ### Testing Insert a `Thread.sleep(100)` in `BrokerMetadataPublisher.onMetadataUpdate` after ``` // Publish the new metadata image to the metadata cache. metadataCache.setImage(newImage) ``` and run `EdgeCaseRequestTest.testProduceRequestWithNullClientId` and the test will fail locally nearly deterministically. After the change(s), the test no longer fails. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA 14515: Optimized Uniform Rack Aware Assignor [kafka]
rreddy-22 commented on code in PR #14416: URL: https://github.com/apache/kafka/pull/14416#discussion_r1380818617 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * The assignment builder is used to construct the target assignment based on the members' subscriptions. + * + * This class contains common utility methods and a class for obtaining and storing rack information. + */ +public abstract class AbstractUniformAssignmentBuilder { +protected abstract GroupAssignment buildAssignment(); + +/** + * Determines if rack-aware assignment is appropriate based on the provided rack information. + * + * @param memberRacks Racks where members are located. + * @param allPartitionRacks Racks where partitions are located. + * @param racksPerPartition Map of partitions to their associated racks. + * + * @return {@code true} if rack-aware assignment should be applied; {@code false} otherwise. + */ +protected static boolean useRackAwareAssignment( +Set memberRacks, +Set allPartitionRacks, +Map> racksPerPartition +) { +if (memberRacks.isEmpty() || Collections.disjoint(memberRacks, allPartitionRacks)) +return false; +else { +return !racksPerPartition.values().stream().allMatch(allPartitionRacks::equals); +} +} + +/** + * Adds the topic's partition to the member's target assignment. + */ +protected static void addPartitionToAssignment( +Map memberAssignments, +String memberId, +Uuid topicId, +int partition +) { +memberAssignments.get(memberId) +.targetPartitions() +.computeIfAbsent(topicId, __ -> new HashSet<>()) +.add(partition); +} + +/** + * Constructs a set of {@code TopicIdPartition} including all the given topic Ids based on their partition counts. + * + * @param topicIds The topic Ids. + * @param subscribedTopicDescriber Utility to fetch the partition count for a given topic. + * + * @return Set of {@code TopicIdPartition} including all the provided topic Ids. + */ +protected static Set topicIdPartitions( +Collection topicIds, +SubscribedTopicDescriber subscribedTopicDescriber +) { +return topicIds.stream() +.flatMap(topic -> +IntStream.range(0, subscribedTopicDescriber.numPartitions(topic)) +.mapToObj(i -> new TopicIdPartition(topic, i)) +).collect(Collectors.toSet()); +} + +/** + * Represents the rack information of members and partitions along with utility methods + * to facilitate rack-aware assignment strategies for a given consumer group. + */ +protected static class RackInfo { +/** + * Map of every member to its rack. + */ +protected final Map memberRacks; + +/** + * Map of every partition to a list of its racks. + */ +protected final Map> partitionRacks; + +/** + * List of members with the same rack as the partition. + */ +protected final Map> membersWithSameRackAsPartition; + +/** + * Indicates if a rack aware assignment can be done. + * True if racks are defined for both members and partitions and there is an intersection between the sets. + */ +protected final boolean useRackStrategy; + +/** + * Constructs rack information based on the assignment
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380469707 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: That makes sense to me. So maybe the verification would go through a different flow where the callback would include both the state update and the write via appendEntries. The main question I have is whether we should consider this as a separate JIRA and do in a follow-up PR. I am inclined to go with this option. ~(But do it ASAP for 3.6.1)~ We don't need this for 3.6.1 since the offsets are not in 3.6.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380813788 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Hmm, I think it would break the abstraction of the appendRecords being a fundamentally asynchronous call, i.e. it has the syntax of being asynchronous (i.e. completion callback is surely called in a different thread) but still a concrete implementation expects that a specific part of the call is synchronous, but the code has no indication which one (maybe some day we'd use async IO and again make it more asynchronous then the code around it would expect). Like why would the group coordinator need to know about transaction verification flow which has it's own trickiness with verification guard and may actually (likely) be different for KIP-890 part2? I think it would be easier for separation of concerns if the lock is held only around im-memory state updates and just transition the state into a "pending state", so that the new updated would be queued or failed (with a retriable error) until async call is done. Then we wouldn't have a situation where we have a lock with a syntax that looks like "lock around in-memory update + call", but with the desired semantics "[lock] [in-memory update] [some part of call] [unlock] [some other part of call]" and the place of [unlock] is fairly deep in logic that has no indication that by doing an unrelated action (making the code as asynchronous as need to avoid holding threads) it could actually break the desired semantics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380469707 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: That makes sense to me. So maybe the verification would go through a different flow where the callback would include both the state update and the write via appendEntries. The main question I have is whether we should consider this as a separate JIRA and do in a follow-up PR. I am inclined to go with this option. ~(But do it ASAP for 3.6.1)~ We don't need this since the offsets are not in 3.6.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15780) Wait for consistent kraft metadata when creating topics in tests
David Mao created KAFKA-15780: - Summary: Wait for consistent kraft metadata when creating topics in tests Key: KAFKA-15780 URL: https://issues.apache.org/jira/browse/KAFKA-15780 Project: Kafka Issue Type: Test Reporter: David Mao Tests occasionally flake when not retrying stale metadata in KRaft mode. I suspect that the root cause is because TestUtils.createTopicWithAdmin waits for partitions to be present in the metadata cache but does not wait for the metadata to be fully published to the broker. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA 14515: Optimized Uniform Rack Aware Assignor [kafka]
rreddy-22 commented on code in PR #14416: URL: https://github.com/apache/kafka/pull/14416#discussion_r1380781750 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java: ## @@ -43,27 +45,62 @@ public void testAttribute() { new TopicMetadata(topicId, topicName, 5, partitionRacks) ); } -assertEquals(topicMetadataMap, new SubscribedTopicMetadata(topicMetadataMap).topicMetadata()); + subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap); +} + + +@Test +public void testAttribute() { +setup(); Review Comment: thankss -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA 14515: Optimized Uniform Rack Aware Assignor [kafka]
rreddy-22 commented on code in PR #14416: URL: https://github.com/apache/kafka/pull/14416#discussion_r1380777612 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.lang.Math.min; + +/** + * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * all its members subscribed to the same set of topics. + * It is optimized since the assignment can be done in fewer, less complicated steps compared to when + * the subscriptions are different across the members. + * + * Assignments are done according to the following principles: + * + * Balance: Ensure partitions are distributed equally among all members. + *The difference in assignments sizes between any two members + *should not exceed one partition. + * Rack Matching:When feasible, aim to assign partitions to members + *located on the same rack thus avoiding cross-zone traffic. + * Stickiness: Minimize partition movements among members by retaining + *as much of the existing assignment as possible. + * + * The assignment builder prioritizes the properties in the following order: + * Balance > Rack Matching > Stickiness. + */ +public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { +private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); + +/** + * The assignment specification which includes member metadata. + */ +private final AssignmentSpec assignmentSpec; + +/** + * The topic and partition metadata describer. + */ +private final SubscribedTopicDescriber subscribedTopicDescriber; + +/** + * The set of topic Ids that the consumer group is subscribed to. + */ +private final Set subscribedTopicIds; + +/** + * Rack information and helper methods. + */ +private final RackInfo rackInfo; + +/** + * The number of members to receive an extra partition beyond the minimum quota. + * Minimum Quota = Total Partitions / Total Members + * Example: If there are 11 partitions to be distributed among 3 members, + * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. + */ +private int remainingMembersToGetAnExtraPartition; + +/** + * Members mapped to the remaining number of partitions needed to meet the minimum quota. + * Minimum quota = total partitions / total members. + */ +private Map potentiallyUnfilledMembers; + +/** + * The partitions that still need to be assigned. + * Initially this contains all the subscribed topics' partitions. + */ +private Set unassignedPartitions; + +/** + * The target assignment. + */ +private final Map targetAssignment; + +/** + * Tracks the existing owner of each partition. + * Only populated when the rack awareness strategy is used. + */ +private final Map currentPartitionOwners; + +OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.assignmentSpec = assignmentSpec; +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscribedTopicIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); +this.rackInfo = new
Re: [PR] [WIP]KAFKA-15444: Native docker image [kafka]
ijuma commented on code in PR #14556: URL: https://github.com/apache/kafka/pull/14556#discussion_r1380771281 ## docker/test/requirements.txt: ## @@ -0,0 +1,6 @@ +confluent_kafka Review Comment: Can we remove `confluent_kafka` from here then? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA 14515: Optimized Uniform Rack Aware Assignor [kafka]
rreddy-22 commented on code in PR #14416: URL: https://github.com/apache/kafka/pull/14416#discussion_r1380768591 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * The assignment builder is used to construct the target assignment based on the members' subscriptions. + * + * This class contains common utility methods and a class for obtaining and storing rack information. + */ +public abstract class AbstractUniformAssignmentBuilder { +protected abstract GroupAssignment buildAssignment(); + +/** + * Determines if rack-aware assignment is appropriate based on the provided rack information. + * + * @param memberRacks Racks where members are located. + * @param allPartitionRacks Racks where partitions are located. + * @param racksPerPartition Map of partitions to their associated racks. + * + * @return {@code true} if rack-aware assignment should be applied; {@code false} otherwise. + */ +protected static boolean useRackAwareAssignment( +Set memberRacks, +Set allPartitionRacks, +Map> racksPerPartition +) { +if (memberRacks.isEmpty() || Collections.disjoint(memberRacks, allPartitionRacks)) +return false; +else { +return !racksPerPartition.values().stream().allMatch(allPartitionRacks::equals); +} +} + +/** + * Adds the topic's partition to the member's target assignment. + */ +protected static void addPartitionToAssignment( +Map memberAssignments, +String memberId, +Uuid topicId, +int partition +) { +memberAssignments.get(memberId) +.targetPartitions() +.computeIfAbsent(topicId, __ -> new HashSet<>()) +.add(partition); +} + +/** + * Constructs a set of {@code TopicIdPartition} including all the given topic Ids based on their partition counts. + * + * @param topicIds The topic Ids. + * @param subscribedTopicDescriber Utility to fetch the partition count for a given topic. + * + * @return Set of {@code TopicIdPartition} including all the provided topic Ids. + */ +protected static Set topicIdPartitions( +Collection topicIds, +SubscribedTopicDescriber subscribedTopicDescriber +) { +return topicIds.stream() +.flatMap(topic -> +IntStream.range(0, subscribedTopicDescriber.numPartitions(topic)) +.mapToObj(i -> new TopicIdPartition(topic, i)) Review Comment: oh yep changed it, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA 14515: Optimized Uniform Rack Aware Assignor [kafka]
rreddy-22 commented on code in PR #14416: URL: https://github.com/apache/kafka/pull/14416#discussion_r1380756376 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.lang.Math.min; + +/** + * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * all its members subscribed to the same set of topics. + * It is optimized since the assignment can be done in fewer, less complicated steps compared to when + * the subscriptions are different across the members. + * + * Assignments are done according to the following principles: + * + * + * Balance: Ensure partitions are distributed equally among all members. + *The difference in assignments sizes between any two members + *should not exceed one partition. + * Rack Matching:When feasible, aim to assign partitions to members + *located on the same rack thus avoiding cross-zone traffic. + * Stickiness: Minimize partition movements among members by retaining + *as much of the existing assignment as possible. + * + * The assignment builder prioritizes the properties in the following order: + * Balance > Rack Matching > Stickiness. + */ +public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { +private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); + +/** + * The assignment specification which includes member metadata. + */ +private final AssignmentSpec assignmentSpec; + +/** + * The topic and partition metadata describer. + */ +private final SubscribedTopicDescriber subscribedTopicDescriber; + +/** + * The set of topic Ids that the consumer group is subscribed to. + */ +private final Set subscriptionIds; + +/** + * Rack information and helper methods. + */ +private final RackInfo rackInfo; + +/** + * The number of members to receive an extra partition beyond the minimum quota. + * Minimum Quota = Total Partitions / Total Members + * Example: If there are 11 partitions to be distributed among 3 members, + * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. + */ +private int remainingMembersToGetAnExtraPartition; + +/** + * Members mapped to the remaining number of partitions needed to meet the minimum quota. + * Minimum quota = total partitions / total members. + */ +private Map potentiallyUnfilledMembers; + +/** + * The partitions that still need to be assigned. + * Initially this contains all the subscribed topics' partitions. + */ +private Set unassignedPartitions; + +/** + * The target assignment. + */ +private final Map targetAssignment; + +/** + * Tracks the existing owner of each partition. + * Only populated when the rack awareness strategy is used. + */ +private final Map currentPartitionOwners; + +OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.assignmentSpec = assignmentSpec; +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); +
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380752726 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: This is effectively what we are doing right? appendEntries is just replica manager append w/o further txn validation. But maybe I misunderstood the suggestion since we still need the callback logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380752726 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: This is effectively what we are doing right? appendEntries is just replica manager append w/o further txn validation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380747077 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ## @@ -67,17 +67,11 @@ public void testOverrideClientId() { @Test public void testOverrideEnableAutoCommit() { ConsumerConfig config = new ConsumerConfig(properties); -boolean overrideEnableAutoCommit = config.maybeOverrideEnableAutoCommit(); -assertFalse(overrideEnableAutoCommit); +//boolean overrideEnableAutoCommit = InternalConsumerConfig.maybeOverrideEnableAutoCommit(config); Review Comment: Agreed. I revised this test method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380746726 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ## @@ -0,0 +1,2554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; +import org.slf4j.event.Level; + +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors; +import static org.apache.kafka.common.utils.Utils.closeQuietly; +import static org.apache.kafka.common.utils.Utils.isBlank; +import static
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1380724965 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I guess that proposal is that when serving an `OffsetCommit` request, KafkaAPIs first determines the partitions for txn validation. It then initiates the txn validation asychnronously. Once the validation is done, it calls `ReplicaManager.append` w/o further txn validation. If we do this, I am wondering if we should just do the same for regular produce requests too. This way, we don't need the callback logic for txn validation in `ReplicaManager.append`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380719222 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig { */ static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close"; -/** - * internal.throw.on.fetch.stable.offset.unsupported - * Whether or not the consumer should throw when the new stable offset feature is supported. - * If set to true then the client shall crash upon hitting it. - * The purpose of this flag is to prevent unexpected broker downgrade which makes - * the offset fetch protection against pending commit invalid. The safest approach - * is to fail fast to avoid introducing correctness issue. - * - * - * Note: this is an internal configuration and could be changed in the future in a backward incompatible way - * - */ -static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported"; Review Comment: I have reverted this change. I have left `THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED` in `ConsumerConfig` but I've added a string constant in `ConsumerUtils` just so it can be referenced in the `internals` package. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380718362 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -982,23 +987,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { } } -// This is here temporary as we don't have public access to the ConsumerConfig in this module. -public static Map appendDeserializerToConfig(Map configs, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { -// validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value -Map newConfigs = new HashMap<>(configs); -if (keyDeserializer != null) -newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass()); -else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null) -throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, "must be non-null."); -if (valueDeserializer != null) -newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass()); -else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null) -throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, "must be non-null."); -return newConfigs; -} - Review Comment: This appears to be outdated. This is not in `ConsumerUtils` but stays in `ConsumerConfig`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380717397 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -666,19 +651,6 @@ else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null) return newConfigs; } -boolean maybeOverrideEnableAutoCommit() { -Optional groupId = Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG)); -boolean enableAutoCommit = getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); -if (!groupId.isPresent()) { // overwrite in case of default group id where the config is not explicitly provided -if (!originals().containsKey(ENABLE_AUTO_COMMIT_CONFIG)) { -enableAutoCommit = false; -} else if (enableAutoCommit) { -throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used."); -} -} -return enableAutoCommit; -} - Review Comment: This is now outdated. It's back in `ConsumerConfig`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380717923 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -250,7 +255,7 @@ public PrototypeAsyncConsumer(final Time time, // no coordinator will be constructed for the default (null) group id if (!groupId.isPresent()) { config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); - //config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); + config.ignore(ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); Review Comment: I've moved the core variable back to `ConsumerConfig` but left a copy in `ConsumerUtils` with an explanation of why it's redefined there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
AndrewJSchofield commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380705419 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ## @@ -0,0 +1,2554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; +import org.slf4j.event.Level; + +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors; +import static org.apache.kafka.common.utils.Utils.closeQuietly; +import static org.apache.kafka.common.utils.Utils.isBlank; +import static
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1380683789 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,33 +242,465 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoinGroup(); +}); } /** * {@inheritDoc} */ @Override public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = -1; +invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + +transitionTo(MemberState.FATAL); +} + +/** + * {@inheritDoc} + */ +@Override +public void transitionToJoinGroup() { +resetEpoch(); +transitionTo(MemberState.JOINING); +} + +/** + * {@inheritDoc} + */ +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; } +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +callbackResult = CompletableFuture.completedFuture(null); +} else { +// Release assignment +if (memberEpoch > 0) { +// Member is part of the group. Invoke onPartitionsRevoked. +callbackResult = revokePartitions(droppedPartitions); +} else { +// Member is not part of the group anymore. Invoke onPartitionsLost. +callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); +} +} +return callbackResult; +} + +/** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so that a heartbeat + * request is sent out with it. + */ +private void transitionToSendingLeaveGroup() { +memberEpoch = leaveGroupEpoch(); +currentAssignment = new HashSet<>(); +targetAssignment = Optional.empty(); +transitionTo(MemberState.SENDING_LEAVE_REQUEST); +}
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1380694833 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -73,30 +84,55 @@ public interface MembershipManager { /** * @return Current assignment for the member. */ -ConsumerGroupHeartbeatResponseData.Assignment currentAssignment(); +Set currentAssignment(); /** - * Update the assignment for the member, indicating that the provided assignment is the new - * current assignment. + * @return Assignment that the member received from the server but hasn't finished processing + * yet. */ -void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment); +Optional targetAssignment(); Review Comment: Totally, it seemed it was needed here at some point but not anymore. Removing it & cleaning up, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1380693973 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -260,42 +743,16 @@ public Optional serverAssignor() { * {@inheritDoc} */ @Override -public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { +public Set currentAssignment() { return this.currentAssignment; } /** * @return Assignment that the member received from the server but hasn't completely processed - * yet. Visible for testing. Review Comment: Good catch, I needed it public at some point but it ended up not being needed in the end. So putting it back to package-private and "Visible for testing" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15779) Define broker configurations and exceptions
Apoorv Mittal created KAFKA-15779: - Summary: Define broker configurations and exceptions Key: KAFKA-15779 URL: https://issues.apache.org/jira/browse/KAFKA-15779 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal The KIP-714 adds 2 exceptions and 1 server configuration: {{TELEMETRY_TOO_LARGE}} (NEW) {{UNKNOWN_SUBSCRIPTION_ID}} (NEW) Configuration: {{telemetry.max.bytes}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1380683789 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,33 +242,465 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoinGroup(); +}); } /** * {@inheritDoc} */ @Override public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = -1; +invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + +transitionTo(MemberState.FATAL); +} + +/** + * {@inheritDoc} + */ +@Override +public void transitionToJoinGroup() { +resetEpoch(); +transitionTo(MemberState.JOINING); +} + +/** + * {@inheritDoc} + */ +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; } +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +callbackResult = CompletableFuture.completedFuture(null); +} else { +// Release assignment +if (memberEpoch > 0) { +// Member is part of the group. Invoke onPartitionsRevoked. +callbackResult = revokePartitions(droppedPartitions); +} else { +// Member is not part of the group anymore. Invoke onPartitionsLost. +callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); +} +} +return callbackResult; +} + +/** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so that a heartbeat + * request is sent out with it. + */ +private void transitionToSendingLeaveGroup() { +memberEpoch = leaveGroupEpoch(); +currentAssignment = new HashSet<>(); +targetAssignment = Optional.empty(); +transitionTo(MemberState.SENDING_LEAVE_REQUEST); +}
[jira] [Created] (KAFKA-15778) Implement ClientMetricsManager to process request
Apoorv Mittal created KAFKA-15778: - Summary: Implement ClientMetricsManager to process request Key: KAFKA-15778 URL: https://issues.apache.org/jira/browse/KAFKA-15778 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal A manager to handle client telemetry request/responses along with cache to persist client instance state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Log a warning when connectors generate greater than tasks.max task configs [kafka]
C0urante commented on PR #14694: URL: https://github.com/apache/kafka/pull/14694#issuecomment-1791420759 @gharris1727 @mimaison given your activity on [KAFKA-15575](https://issues.apache.org/jira/browse/KAFKA-15575), this may interest you. We can also backport this change (which is unlikely for any of the approaches discussed on that ticket so far). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Log a warning when connectors generate greater than tasks.max task configs [kafka]
C0urante opened a new pull request, #14694: URL: https://github.com/apache/kafka/pull/14694 The discussion around [KIP-987](https://cwiki.apache.org/confluence/display/KAFKA/KIP-987%3A+Connect+Static+Assignments) has highlighted some strange behavior in the Connect runtime: if a connector's [taskConfigs method](https://kafka.apache.org/36/javadoc/org/apache/kafka/connect/connector/Connector.html#taskConfigs(int)) returns more tasks than the value for its configuration's [tasks.max property](https://kafka.apache.org/documentation.html#sourceconnectorconfigs_tasks.max), nothing happens (i.e., every task the connector generated a config for is brought up). In [KAFKA-15575](https://issues.apache.org/jira/browse/KAFKA-15575), we discuss the possibility of taking action in this situation: failing the connector, only bringing up `tasks.max` tasks, and others. We may adopt one of these approaches in the future. For now, this warning message should at least get the ball rolling and decrease the likelihood of buggy connectors causing problems for Connect clusters that use static assignment (if/when that feature lands). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380667322 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the presence and value of the {@code group.protocol} configuration value. If the value is present + * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be returned. Otherwise, the + * {@link LegacyKafkaConsumer} will be returned. + * + * + * + * This is not to be called by end users and callers should not attempt to determine the underlying implementation + * as this will make such code very brittle. Users of this facility should honor the top-level {@link Consumer} API + * contract as-is. + */ +public class ConsumerDelegateCreator { + +/** + * This is it! This is the core logic. It's extremely rudimentary. + */ +private static boolean useNewConsumer(Map configs) { +Object groupProtocol = configs.get("group.protocol"); + +// Takes care of both the null and type checks. +if (!(groupProtocol instanceof String)) +return false; + +return ((String) groupProtocol).equalsIgnoreCase("consumer"); Review Comment: Agreed. This code was written before the `ConsumerConfig` changes were in. I've updated it to use the `ConsumerConfig` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r138030 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the presence and value of the {@code group.protocol} configuration value. If the value is present + * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be returned. Otherwise, the + * {@link LegacyKafkaConsumer} will be returned. + * + * + * + * This is not to be called by end users and callers should not attempt to determine the underlying implementation + * as this will make such code very brittle. Users of this facility should honor the top-level {@link Consumer} API + * contract as-is. + */ +public class ConsumerDelegateCreator { + +/** + * This is it! This is the core logic. It's extremely rudimentary. + */ +private static boolean useNewConsumer(Map configs) { +Object groupProtocol = configs.get("group.protocol"); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380667051 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the presence and value of the {@code group.protocol} configuration value. If the value is present + * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be returned. Otherwise, the + * {@link LegacyKafkaConsumer} will be returned. + * + * + * + * This is not to be called by end users and callers should not attempt to determine the underlying implementation + * as this will make such code very brittle. Users of this facility should honor the top-level {@link Consumer} API + * contract as-is. + */ +public class ConsumerDelegateCreator { + +/** + * This is it! This is the core logic. It's extremely rudimentary. + */ +private static boolean useNewConsumer(Map configs) { +Object groupProtocol = configs.get("group.protocol"); + +// Takes care of both the null and type checks. +if (!(groupProtocol instanceof String)) +return false; Review Comment: This code was written before the `ConsumerConfig` changes were in. I've updated it to use the `ConsumerConfig` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380664987 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java: ## @@ -2582,7 +2581,7 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, return fetchResponse(Collections.singletonMap(partition, fetchInfo)); } -private KafkaConsumer newConsumer(Time time, +private LegacyKafkaConsumer newConsumer(Time time, KafkaClient client, Review Comment: Thanks. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380663703 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java: ## @@ -164,7 +163,7 @@ * Note to future authors in this class. If you close the consumer, close with DURATION.ZERO to reduce the duration of * the test. */ -public class KafkaConsumerTest { +public class LegacyKafkaConsumerTest { Review Comment: We have similar tests. My other attempt at the delegate work was much _deeper_ so that we could reuse this test and run it against both implementations, but it got out of hand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380662917 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ## @@ -0,0 +1,2623 @@ +/* Review Comment: Yes. The only changes are the removal of the extra constructors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380662469 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -108,13 +106,19 @@ import static org.apache.kafka.common.utils.Utils.propsToMap; /** - * This prototype consumer uses an {@link ApplicationEventHandler event handler} to process - * {@link ApplicationEvent application events} so that the network IO can be processed in a dedicated + * This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process + * {@link ApplicationEvent application events} so that the network I/O can be processed in a dedicated * {@link ConsumerNetworkThread network thread}. Visit * https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor;>this document - * for detail implementation. + * for implementation detail. + * + * + * + * Note: this {@link Consumer} implementation is part of the revised consumer group protocol from KIP-848. + * This class should not be invoked directly; users should instead create a {@link KafkaConsumer} as before. + * This consumer implements the new consumer group protocol and is intended to be the default in coming releases. */ -public class PrototypeAsyncConsumer implements Consumer { +public class AsyncKafkaConsumer implements Consumer { Review Comment: Done. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -148,30 +152,38 @@ public class PrototypeAsyncConsumer implements Consumer { private boolean cachedSubscriptionHasAllFetchPositions; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); -public PrototypeAsyncConsumer(final Properties properties, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer) { +public AsyncKafkaConsumer(Map configs) { Review Comment: Yep-done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380661924 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the presence and value of the {@code group.protocol} configuration value. If the value is present + * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be returned. Otherwise, the + * {@link LegacyKafkaConsumer} will be returned. + * + * + * + * This is not to be called by end users and callers should not attempt to determine the underlying implementation + * as this will make such code very brittle. Users of this facility should honor the top-level {@link Consumer} API + * contract as-is. + */ +public class ConsumerDelegateCreator { + +/** + * This is it! This is the core logic. It's extremely rudimentary. + */ +private static boolean useNewConsumer(Map configs) { +Object groupProtocol = configs.get("group.protocol"); + +// Takes care of both the null and type checks. +if (!(groupProtocol instanceof String)) +return false; + +return ((String) groupProtocol).equalsIgnoreCase("consumer"); +} + +public Consumer create(Map configs) { +return createInternal(() -> { +if (useNewConsumer(configs)) { +return new AsyncKafkaConsumer<>(configs); +} else { +return new LegacyKafkaConsumer<>(configs); +} +}); +} + +public Consumer create(Properties properties) { +return createInternal(() -> { +if (useNewConsumer(properties)) { +return new AsyncKafkaConsumer<>(properties); +} else { +return new LegacyKafkaConsumer<>(properties); +} +}); +} + +public Consumer create(Properties properties, +Deserializer keyDeserializer, +Deserializer valueDeserializer) { +return createInternal(() -> { +if (useNewConsumer(properties)) { +return new AsyncKafkaConsumer<>(properties, keyDeserializer, valueDeserializer); +} else { +return new LegacyKafkaConsumer<>(properties, keyDeserializer, valueDeserializer); +} +}); +} + +public Consumer create(Map configs, +Deserializer keyDeserializer, +Deserializer valueDeserializer) { +return createInternal(() -> { +if (useNewConsumer(configs)) { +return new AsyncKafkaConsumer<>(configs, keyDeserializer, valueDeserializer); +} else { +return new LegacyKafkaConsumer<>(configs, keyDeserializer, valueDeserializer); +} +}); +} + +public Consumer create(ConsumerConfig config, +Deserializer keyDeserializer, +Deserializer
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1380661746 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the presence and value of the {@code group.protocol} configuration value. If the value is present + * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be returned. Otherwise, the + * {@link LegacyKafkaConsumer} will be returned. + * + * + * + * This is not to be called by end users and callers should not attempt to determine the underlying implementation + * as this will make such code very brittle. Users of this facility should honor the top-level {@link Consumer} API + * contract as-is. + */ +public class ConsumerDelegateCreator { + +/** + * This is it! This is the core logic. It's extremely rudimentary. + */ +private static boolean useNewConsumer(Map configs) { Review Comment: I've refactored the code to use fewer constructors in the delegates and pass a fully-constructed `ConsumerConfig` to the delegates. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1380660843 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -24,21 +24,34 @@ public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member is not part of the group. This could be the case when it has never joined (no call + * has been made to the subscribe API), or when the member intentionally leaves the group + * after a call to the unsubscribe API. */ -UNJOINED, +NOT_IN_GROUP, Review Comment: You're right, this is the case where a consumer, with a groupId, is not part of the group (either it hasn't called subscribed, or it called unsubscribe) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1380659584 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -729,6 +729,13 @@ protected MetadataRequest.Builder newMetadataRequestBuilderForNewTopics() { return null; } +/** + * @return Mapping from topic IDs to topic names for all topics in the cache. + */ +public synchronized Map topicNames() { Review Comment: I moved it to the internals for now, as it's truly for internal use (even though I expect we might want something similar later on as we use topicId more in the client code). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
AndrewJSchofield commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1380393617 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -48,26 +61,50 @@ public enum MemberState { * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the * broker. This is a recoverable state, where the member * gives up its partitions by invoking the user callbacks for onPartitionsLost, and then - * transitions to {@link #UNJOINED} to rejoin the group as a new member. + * transitions to {@link #JOINING} to rejoin the group as a new member. */ FENCED, /** - * The member failed with an unrecoverable error + * The member transitions to this state when it is leaving the group after a call to + * unsubscribe. It stays in this state while releasing its assignment (calling user's callback + * for partitions revoked or lost), until the callback completes and a heartbeat request is + * sent out to effectively leave the group (without waiting for a response). + */ +LEAVING_GROUP, + +/** + * Member has completed releasing its assignment, and stays in this state until the next + * heartbeat request is sent out to leave the group. */ -FAILED; +SENDING_LEAVE_REQUEST, +/** + * The member failed with an unrecoverable error. + */ +FATAL; + +/** + * Valid state transitions + */ static { -// Valid state transitions -STABLE.previousValidStates = Arrays.asList(UNJOINED, RECONCILING); -RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED); +STABLE.previousValidStates = Arrays.asList(JOINING, ACKNOWLEDGING_RECONCILED_ASSIGNMENT); + +RECONCILING.previousValidStates = Arrays.asList(STABLE, JOINING); + +ACKNOWLEDGING_RECONCILED_ASSIGNMENT.previousValidStates = Arrays.asList(RECONCILING); + +FATAL.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING_RECONCILED_ASSIGNMENT); Review Comment: That's OK. I was just asking an innocent question. Makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org