Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]

2023-11-02 Thread via GitHub


philipnee commented on PR #14680:
URL: https://github.com/apache/kafka/pull/14680#issuecomment-1791913330

   Hi @dajac @kirktrue - Thanks for offering valuable feedbacks to this PR, 
much appreciated. I wonder how do you feel about using a local queue to stash 
the new callbacks? The goal really is to ensure these callbacks can be invoked 
on commit, poll, and close calls.  Previously, David suggested that we could 
try to use the background queue; however, this is not really a feasible 
solution because we don't want the consumer to poll the queue during a commit 
(which could spit out a bunch of errors and rebalance events).
   
   I don't have a clean way of doing this unless we make a KIP change to 
enforce callbacks only invoked on poll.  This is idea, but probably out of the 
scope.  Any suggestions?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-02 Thread via GitHub


jeffkbkim commented on code in PR #14387:
URL: https://github.com/apache/kafka/pull/14387#discussion_r1381174247


##
checkstyle/import-control.xml:
##
@@ -240,6 +240,7 @@
   
   
   
+

Review Comment:
   it's ready



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15769: Fix wrong log with exception [kafka]

2023-11-02 Thread via GitHub


GOODBOY008 commented on PR #14683:
URL: https://github.com/apache/kafka/pull/14683#issuecomment-1791899817

   > LGTM, Can you check in the code base if there are other cases like this?
   
   Yes , I'm willing to check all code like this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15782) Establish concrete project conventions to define public APIs that require a KIP

2023-11-02 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-15782:
--

 Summary: Establish concrete project conventions to define public 
APIs that require a KIP
 Key: KAFKA-15782
 URL: https://issues.apache.org/jira/browse/KAFKA-15782
 Project: Kafka
  Issue Type: Improvement
Reporter: A. Sophie Blee-Goldman


There seems to be no concrete definition that establishes project-specific 
conventions for what is and is not considered a public API change that requires 
a KIP. This results in frequent drawn-out debates that revisit the same topic 
and slow things down, and often ends up forcing trivial changes through the KIP 
process. For a recent example, KIP-998 was required for a one-line change just 
to add the "protected" access modifier to an otherwise package-private class. 
See [this comment 
thread|https://github.com/apache/kafka/pull/14681#discussion_r1378591228] for 
the full debate on this subject.

It would be beneficial and in the long run save us all time to just sit down 
and hash out the project conventions, such as whether a 
package-private/protected method on a non-final java class is to be considered 
a public API, even if the method itself is/was never a public method. This will 
of course require a KIP, but should help to establish some ground rules to 
avoid any more superfluous KIPs in the future



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15781: KIP-998, Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-02 Thread via GitHub


ableegoldman commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1381158659


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   Here is the KIP for this change: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access
   
   As for establishing the rules going forward, I don't personally have the 
time to drive that right now but I did file jira ticket for a followup KIP to 
avoid these things in the future: 
https://issues.apache.org/jira/browse/KAFKA-15782



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Small LogValidator clean ups [kafka]

2023-11-02 Thread via GitHub


ex172000 commented on code in PR #14697:
URL: https://github.com/apache/kafka/pull/14697#discussion_r1381157089


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -399,12 +402,20 @@ class LogValidatorTest {
 assertEquals(i, offsetCounter.value);
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
-assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs,
+
+val expectedShallowOffsetOfMaxTimestamp = if (magic >= 
RecordVersion.V2.value) {

Review Comment:
   Currently we have up to V2, if this behavior changes in the future, it could 
break the test? How about making it just equals V2?



##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -350,11 +350,14 @@ class LogValidatorTest {
 (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, false,
   RecordBatch.NO_PARTITION_LEADER_EPOCH)
 
-val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, 
TimestampType.CREATE_TIME, producerId,
-  producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional,
+val recordList = List(

Review Comment:
    



##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -350,11 +350,14 @@ class LogValidatorTest {
 (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, false,
   RecordBatch.NO_PARTITION_LEADER_EPOCH)
 
-val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, 
TimestampType.CREATE_TIME, producerId,
-  producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional,
+val recordList = List(

Review Comment:
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15781) Change ProducerConfig(props, doLog) constructor to protected

2023-11-02 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15781:
---
Description: 
See [https://github.com/apache/kafka/pull/14681]

 

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access

  was:See https://github.com/apache/kafka/pull/14681


> Change ProducerConfig(props, doLog) constructor to protected
> 
>
> Key: KAFKA-15781
> URL: https://issues.apache.org/jira/browse/KAFKA-15781
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: kip
>
> See [https://github.com/apache/kafka/pull/14681]
>  
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15781) Change ProducerConfig(props, doLog) constructor to protected

2023-11-02 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-15781:
--

 Summary: Change ProducerConfig(props, doLog) constructor to 
protected
 Key: KAFKA-15781
 URL: https://issues.apache.org/jira/browse/KAFKA-15781
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Reporter: A. Sophie Blee-Goldman
Assignee: A. Sophie Blee-Goldman


See https://github.com/apache/kafka/pull/14681



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-02 Thread via GitHub


ableegoldman commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r138110


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   Fair enough -- I will write up a quick KIP. Thanks for clarifying about the 
project-specific conventions, or rather, our lack of them. I suppose it would 
be a good idea to have a discussion around the project conventions and define 
some concrete rules and/or exceptions, but I will leave that to 
someone/sometime else



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-02 Thread via GitHub


mjsax commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1381144344


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   I would be in strong favor of clarifying (or changing) the rules. Requiring 
a KIP for a change like this seems to defeat he purpose of the KIP process.
   
   Sophie could do a quick KIP in parallel to unblock this PR if she want, but 
to avoid such discussion in the future and to make it simpler to just "do 
stuff" that is clearly straightforward, changing/clarifying the rules seems to 
be worth the effort.
   
   @ableegoldman do you want to drive a discussion to clarify the rules?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381143945


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE
+ * state if it is fully reconciled.
  *
- * - ASSIGNING:
- *   This state means that the member waits on partitions which are still 
owned by other members in the
- *   group. It remains in this state until they are all freed up.
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT
+ * When the assignment is acknowledged, the member remains in the
+ * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed.
  *
- * - STABLE:
- *   This state means that the member has received all its assigned partitions.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
+ *
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS
+ * When the assignment is acknowledged, the member transitions to the
+ * UNRELEASED_PARTITIONS if newly assigned partitions are not available 
yet.
+ *
+ * - UNRELEASED_PARTITIONS:
+ *   The member's reconciliation cannot progress because newly assigned 
partitions
+ *   are still owned by other members in the group. They are not released yet.
  *
- * The reconciliation process is started or re-started whenever a new target 
assignment is installed;
- * the epoch of the new target assignment is different from the next epoch of 
the member. In this transient
- * state, the assigned partitions, the partitions pending revocation and the 
partitions pending assignment
- * are updated. If the partitions pending revocation is not empty, the state 
machine transitions to
- * REVOKING; if partitions pending assignment is not empty, it transitions to 
ASSIGNING; otherwise it
- * transitions to STABLE.
+ *   Valid Transitions:
+ *   - UNRELEASED_PARTITIONS -> STABLE
+ * The member may transition to the STABLE state if the partitions 

Re: [PR] [WIP]KAFKA-15444: Native docker image [kafka]

2023-11-02 Thread via GitHub


VedarthConfluent commented on code in PR #14556:
URL: https://github.com/apache/kafka/pull/14556#discussion_r1381142228


##
docker/test/requirements.txt:
##
@@ -0,0 +1,6 @@
+confluent_kafka

Review Comment:
   It has been removed. These changes are part of KIP 975 PR raised here:- 
https://github.com/apache/kafka/pull/14552. (commit 
6f64ce54254b985a50b3c5d8d63711cc6b71ab18)
   They'll reflect here once this PR is updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14957: Update-Description-String [kafka]

2023-11-02 Thread via GitHub


mjsax commented on code in PR #13909:
URL: https://github.com/apache/kafka/pull/13909#discussion_r1381139811


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1216,6 +1254,7 @@ public static class ConfigKey {
 public final List dependents;
 public final Recommender recommender;
 public final boolean internalConfig;
+public final Object overwrittenValue;

Review Comment:
   Can we find a better name? Or at least add a comment to explain what this is?
   
   Given what we use `overwrittenValue` to generate HTML only, should it be 
`String` type?



##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1238,6 +1277,31 @@ public ConfigKey(String name, Type type, Object 
defaultValue, Validator validato
 this.displayName = displayName;
 this.recommender = recommender;
 this.internalConfig = internalConfig;
+this.overwrittenValue = null;
+}
+
+public ConfigKey(String name, Type type, Object defaultValue, 
Validator validator,

Review Comment:
   Do we need a new constructor? Might be simpler to just keep one, and pass in 
`null` for the new parameter if not used?



##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -155,6 +155,29 @@ public ConfigDef define(String name, Type type, Object 
defaultValue, Validator v
 return define(new ConfigKey(name, type, defaultValue, validator, 
importance, documentation, group, orderInGroup, width, displayName, dependents, 
recommender, false));
 }
 
+/**
+ * Define a new configuration
+ * @param name  the name of the config parameter
+ * @param type  the type of the config
+ * @param defaultValue  the default value to use if this config isn't 
present
+ * @param validator the validator to use in checking the 
correctness of the config
+ * @param importancethe importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup  the order of this config in the group
+ * @param width the width of the config
+ * @param displayName   the name suitable for display
+ * @param dependentsthe configurations that are dependents of this 
configuration
+ * @param recommender   the recommender provides valid values given 
the parent configuration values
+ * @param overwrittenValue  the overwritten value of this configuration

Review Comment:
   > the overwritten value
   
   Not every descriptive IMHO. Can we explain it better?



##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1265,6 +1329,8 @@ protected String getConfigValue(ConfigKey key, String 
headerName) {
 if (key.hasDefault()) {
 if (key.defaultValue == null)
 return "null";
+if (key.overwrittenValue != null)

Review Comment:
   Would this need to go before `if (key.defaultValue == null)` ? (For 
`state.dir` it does not matter because it does have a default, but for general 
purpose it seems better to change the order and make `overwrittenValue` the 
winning parameter if used?)



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -820,6 +820,7 @@ public class StreamsConfig extends AbstractConfig {
 .define(STATE_DIR_CONFIG,
 Type.STRING,
 System.getProperty("java.io.tmpdir") + File.separator + 
"kafka-streams",
+System.getProperty("kafka.streams.state.dir"),

Review Comment:
   I think it might be simpler to just hardcode what we want to put into the 
docs, ie, just pass in String `"${java.io.tmpdir}"`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.

Review Comment:
   small question for my understanding, why are we just waiting for  _at least_ 
one of them to be released?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE

Review Comment:
   Small question for my understanding, is the ack sent by the client once the 
new target assignment is received/reconciliation starts or once the 
reconciliation is complete? Or once partial reconciliation is complete without 
the revoked partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381127886


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -251,144 +260,52 @@ private ConsumerGroupMember 
transitionToNewTargetAssignmentState() {
 }
 
 if (!newPartitionsPendingRevocation.isEmpty()) {
-// If the partition pending revocation set is not empty, we 
transition the
-// member to revoking and keep the current epoch. The transition 
to the new
-// state is done when the member is updated.
+// If there are partitions to be revoked, the member remains in 
its current
+// epoch and requests the revocation of those partitions. It 
transitions to
+// the UNACKNOWLEDGED_ASSIGNMENT state to wait until the new 
assignment is
+// acknowledged.
 return new ConsumerGroupMember.Builder(member)
+
.setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT)
 .setAssignedPartitions(newAssignedPartitions)
-.setPartitionsPendingRevocation(newPartitionsPendingRevocation)
-.setPartitionsPendingAssignment(newPartitionsPendingAssignment)
-.setTargetMemberEpoch(targetAssignmentEpoch)
+.setRevokedPartitions(newPartitionsPendingRevocation)
 .build();
-} else {
-if (!newPartitionsPendingAssignment.isEmpty()) {
-// If the partitions pending assignment set is not empty, we 
check
-// if some or all partitions are free to use. If they are, we 
move
-// them to the partitions assigned set.
-maybeAssignPendingPartitions(newAssignedPartitions, 
newPartitionsPendingAssignment);
-}
-
-// We transition to the target epoch. If the partitions pending 
assignment
-// set is empty, the member transition to stable, otherwise to 
assigning.
-// The transition to the new state is done when the member is 
updated.
+} else if (!newPartitionsPendingAssignment.isEmpty()) {
+// If there are partitions to be assigned, the member transitions 
to the
+// target epoch and requests the assignment of those partitions. 
Note that
+// the partitions are directly added to the assigned partitions 
set. The
+// member transitions to the UNACKNOWLEDGED_ASSIGNMENT state to 
wait until
+// the new assignment is acknowledged.
+newPartitionsPendingAssignment.forEach((topicId, partitions) -> {
+newAssignedPartitions
+.computeIfAbsent(topicId, __ -> new HashSet<>())
+.addAll(partitions);
+});
 return new ConsumerGroupMember.Builder(member)
+
.setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT)
+.updateMemberEpoch(targetAssignmentEpoch)
 .setAssignedPartitions(newAssignedPartitions)
-.setPartitionsPendingRevocation(Collections.emptyMap())
-.setPartitionsPendingAssignment(newPartitionsPendingAssignment)
-.setPreviousMemberEpoch(member.memberEpoch())
-.setMemberEpoch(targetAssignmentEpoch)
-.setTargetMemberEpoch(targetAssignmentEpoch)
+.setRevokedPartitions(Collections.emptyMap())
 .build();
-}
-}
-
-/**
- * Tries to transition from Revoke to Assigning or Stable. This is only
- * possible when the member acknowledges that it only owns the partition
- * in the assigned partitions.
- *
- * @return A new ConsumerGroupMember with the new state or the current one
- * if the member stays in the current state.
- */
-private ConsumerGroupMember 
maybeTransitionFromRevokingToAssigningOrStable() {
-if (member.partitionsPendingRevocation().isEmpty() || 
matchesAssignedPartitions(ownedTopicPartitions)) {
-Map> newAssignedPartitions = 
deepCopy(member.assignedPartitions());
-Map> newPartitionsPendingAssignment = 
deepCopy(member.partitionsPendingAssignment());
-
-if (!newPartitionsPendingAssignment.isEmpty()) {
-// If the partitions pending assignment set is not empty, we 
check
-// if some or all partitions are free to use. If they are, we 
move
-// them to the assigned set.
-maybeAssignPendingPartitions(newAssignedPartitions, 
newPartitionsPendingAssignment);
-}
-
-// We transition to the target epoch. If the partitions pending 
assignment
-// set is empty, the member transition to stable, otherwise to 
assigning.
-// The transition to the new state is done when the member is 
updated.
+  

Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381123599


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS

Review Comment:
   I'm confused about this as well, I would think it would go from stable -> 
unack (when new assignment computed) -> unreleased (if there's unreleased 
partitions). If it's possible to ack the assignment even when there's 
unreleased partitions, we're transitioning from unack to unreleased right so 
how does it get to stable with unreleased partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE

Review Comment:
   Small question for my understanding, is the ack sent by the client once the 
new target assignment is received/reconciliation starts or once the 
reconciliation is complete? Or once partial reconciliation is complete without 
the revoked partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE

Review Comment:
   Small question for my understanding, is the ack sent once the assignment is 
received or once the reconciliation is complete? Or once partial reconciliation 
is complete without the revoked partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15776) Configurable delay timeout for DelayedRemoteFetch request

2023-11-02 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15776:
-
Description: 
We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
given amount of time when there is no data available to serve the FETCH request.
{code:java}
The maximum amount of time the server will block before answering the fetch 
request if there isn't sufficient data to immediately satisfy the requirement 
given by fetch.min.bytes.
{code}
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]

Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user 
on how to configure optimal value for each purpose. Moreover, the config is of 
*LOW* importance and most of the users won't configure it and use the default 
value of 500 ms.

Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
higher number of expired delayed remote fetch requests when the remote storage 
have any degradation.

We should introduce one config (preferably server config) to define the delay 
timeout for DelayedRemoteFetch requests (or) take it from client similar to 
{{request.timeout.ms}}.

  was:
We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
given amount of time when there is no data available to serve the FETCH request.
{code:java}
The maximum amount of time the server will block before answering the fetch 
request if there isn't sufficient data to immediately satisfy the requirement 
given by fetch.min.bytes.
{code}
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]

Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user 
on how to configure optimal value for each purpose. Moreover, the config is of 
*LOW* importance and most of the users won't configure it and use the default 
value of 500 ms.

Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
higher number of expired delayed remote fetch request when the remote storage 
have degrades to serve within the timeout.

We should introduce one config (preferably server config) to define the delay 
timeout for DelayedRemoteFetch requests (or) take it from client similar to 
{{request.timeout.ms}}.


> Configurable delay timeout for DelayedRemoteFetch request
> -
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one config (preferably server config) to define the delay 
> timeout for DelayedRemoteFetch requests (or) take it from client similar to 
> {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer

2023-11-02 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15777:
-
Description: 
A consumer can configure the amount of local bytes to read from each partition 
in the FETCH request.

{{max.fetch.bytes}} = 50 MB
{{max.partition.fetch.bytes}} = 1 MB

Similar to this, the consumer should be able to configure 
{{max.remote.partition.fetch.bytes}} = 4 MB.

While handling the {{FETCH}} request, if we encounter a partition to read data 
from remote storage, then rest of the partitions in the request are ignored. 
Essentially, we are serving only 1 MB of remote data per FETCH request when all 
the partitions in the request are to be served from the remote storage.

Providing one more configuration to the client help the user to tune the values 
depending on their storage plugin. The user might want to optimise the number 
of calls to remote storage vs amount of bytes returned back to the client in 
the FETCH response.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]

  was:
A consumer can configure the amount of local bytes to read from each partition 
in the FETCH request.

{{max.fetch.bytes}} = 50 MB
{{max.partition.fetch.bytes}} = 1 MB

Similar to this, the consumer should be able to configure 
{{max.remote.partition.fetch.bytes}} = 4 MB.

While handling the {{FETCH}} request, if we encounter a partition to read data 
from remote storage, then rest of the partitions in the request are ignored. 
Essentially, we are serving only 1 MB of remote data per FETCH request when all 
the partitions in the request are to be served from the remote storage.

Providing one more configuration to the client help the user to tune the values 
depends on their storage plugin. The user might want to optimise the number of 
calls to remote storage vs amount of bytes returned back to the client in the 
FETCH response.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]


> Configurable remote fetch bytes per partition from Consumer
> ---
>
> Key: KAFKA-15777
> URL: https://issues.apache.org/jira/browse/KAFKA-15777
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> A consumer can configure the amount of local bytes to read from each 
> partition in the FETCH request.
> {{max.fetch.bytes}} = 50 MB
> {{max.partition.fetch.bytes}} = 1 MB
> Similar to this, the consumer should be able to configure 
> {{max.remote.partition.fetch.bytes}} = 4 MB.
> While handling the {{FETCH}} request, if we encounter a partition to read 
> data from remote storage, then rest of the partitions in the request are 
> ignored. Essentially, we are serving only 1 MB of remote data per FETCH 
> request when all the partitions in the request are to be served from the 
> remote storage.
> Providing one more configuration to the client help the user to tune the 
> values depending on their storage plugin. The user might want to optimise the 
> number of calls to remote storage vs amount of bytes returned back to the 
> client in the FETCH response.
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381123599


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS

Review Comment:
   I'm confused about this as well, I would think it would go from stable -> 
unack (when new assignment computed) -> unreleased (if there's unreleased 
partitions). If it's possible to ack the assignment even when there's 
unreleased partitions, we're transitioning from unack to unreleased right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.

Review Comment:
   small question for my understanding, why are we just waiting for  _at least_ 
one of them to be released?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381118918


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.

Review Comment:
   Do we want to transition to unacknowledged assignment every time a new 
assignment is computed and sent? I'm confused about the "when a new target 
assignment is installed" prerequisite 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.

Review Comment:
   small question for my understanding, why are we waiting for just _at least_ 
one of them to be released?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380922729


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member

Review Comment:
   Just for my understanding, when we say new target assignment installed do we 
mean the assignment reconciliation on the client aka assignment installed on 
the client? 
   Also does installed mean that the assignment is already acknowledged ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381118918


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.

Review Comment:
   Do we want to transition to unacknowledged assignment every time a new 
assignment is computed and sent? I'm confused about the necessity of the "when 
a new target assignment is installed" prerequisite 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15740: KRaft support in DeleteOffsetsConsumerGroupCommandIntegrationTest [kafka]

2023-11-02 Thread via GitHub


linzihao1999 commented on code in PR #14669:
URL: https://github.com/apache/kafka/pull/14669#discussion_r1381106437


##
core/src/test/scala/unit/kafka/admin/DeleteOffsetsConsumerGroupCommandIntegrationTest.scala:
##
@@ -20,7 +20,7 @@ package kafka.admin
 import java.util.Properties
 
 import kafka.server.Defaults
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}

Review Comment:
   yes, more pretty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14957: Update-Description-String [kafka]

2023-11-02 Thread via GitHub


Owen-CH-Leung commented on PR #13909:
URL: https://github.com/apache/kafka/pull/13909#issuecomment-1791810020

   @mjsax Thanks. It really clarifies a lot!
   
   I've modified the code to allow overriding default value for `state.dir` by 
setting the system property `kafka.streams.state.dir`. So in `build.gradle`, if 
we set the following: 
   
   ```shell
   task genStreamsConfigDocs(type: JavaExec) {
   classpath = sourceSets.main.runtimeClasspath
   mainClass = 'org.apache.kafka.streams.StreamsConfig'
   if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
   standardOutput = new File(generatedDocsDir, 
"streams_config.html").newOutputStream()
   systemProperty 'kafka.streams.state.dir', '/some/random/path'
 }
   ```
   
   then the output html page will have the default value shown as 
`/some/random/path`. If we set it as empty string, the output will be an empty 
string also. If not set, the default logic 
`System.getProperty("java.io.tmpdir") + File.separator + "kafka-streams"` will 
be applied
   
   Let me know what you think. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954663


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE
+ * state if it is fully reconciled.
  *
- * - ASSIGNING:
- *   This state means that the member waits on partitions which are still 
owned by other members in the
- *   group. It remains in this state until they are all freed up.
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT
+ * When the assignment is acknowledged, the member remains in the
+ * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed.
  *
- * - STABLE:
- *   This state means that the member has received all its assigned partitions.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
+ *
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS
+ * When the assignment is acknowledged, the member transitions to the
+ * UNRELEASED_PARTITIONS if newly assigned partitions are not available 
yet.
+ *
+ * - UNRELEASED_PARTITIONS:
+ *   The member's reconciliation cannot progress because newly assigned 
partitions
+ *   are still owned by other members in the group. They are not released yet.
  *
- * The reconciliation process is started or re-started whenever a new target 
assignment is installed;
- * the epoch of the new target assignment is different from the next epoch of 
the member. In this transient
- * state, the assigned partitions, the partitions pending revocation and the 
partitions pending assignment
- * are updated. If the partitions pending revocation is not empty, the state 
machine transitions to
- * REVOKING; if partitions pending assignment is not empty, it transitions to 
ASSIGNING; otherwise it
- * transitions to STABLE.
+ *   Valid Transitions:
+ *   - UNRELEASED_PARTITIONS -> STABLE
+ * The member may transition to the STABLE state if the partitions 

Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954345


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE
+ * state if it is fully reconciled.
  *
- * - ASSIGNING:
- *   This state means that the member waits on partitions which are still 
owned by other members in the
- *   group. It remains in this state until they are all freed up.
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT
+ * When the assignment is acknowledged, the member remains in the
+ * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed.
  *
- * - STABLE:
- *   This state means that the member has received all its assigned partitions.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
+ *
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS
+ * When the assignment is acknowledged, the member transitions to the
+ * UNRELEASED_PARTITIONS if newly assigned partitions are not available 
yet.
+ *
+ * - UNRELEASED_PARTITIONS:
+ *   The member's reconciliation cannot progress because newly assigned 
partitions
+ *   are still owned by other members in the group. They are not released yet.
  *
- * The reconciliation process is started or re-started whenever a new target 
assignment is installed;
- * the epoch of the new target assignment is different from the next epoch of 
the member. In this transient
- * state, the assigned partitions, the partitions pending revocation and the 
partitions pending assignment
- * are updated. If the partitions pending revocation is not empty, the state 
machine transitions to
- * REVOKING; if partitions pending assignment is not empty, it transitions to 
ASSIGNING; otherwise it
- * transitions to STABLE.
+ *   Valid Transitions:
+ *   - UNRELEASED_PARTITIONS -> STABLE
+ * The member may transition to the STABLE state if the partitions 

Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954188


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS

Review Comment:
   For this transition, I'm confused which one is the default first transition. 
Ie, if the client has not yet acknowledged AND we have unreleased partitions 
which one do we go to first? And is going between these two both ways only 
possible if a new assignment occurs in the middle? I'm struggling to understand 
how all transitions are valid  



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS

Review Comment:
   For this transition, I'm confused which one is the default first transition. 
Ie, if the client has not yet acknowledged AND we have unreleased partitions 
which one do we go to first? And is going between these two both ways only 
possible if a new assignment occurs in the middle? I'm struggling to understand 
how all transitions are valid  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380949384


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member

Review Comment:
   I think this is the case where we get a new assignment server side, but 
nothing changed for the member?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380943451


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout(
 }
 
 /**
- * Schedules a revocation timeout for the member.
+ * Schedules a rebalance timeout for the member.
  *
- * @param groupId   The group id.
- * @param memberId  The member id.
- * @param revocationTimeoutMs   The revocation timeout.
- * @param expectedMemberEpoch   The expected member epoch.
+ * @param groupId   The group id.
+ * @param expectedMemberThe expected member.
  */
-private void scheduleConsumerGroupRevocationTimeout(
+private void scheduleConsumerGroupRebalanceTimeout(
 String groupId,
-String memberId,
-long revocationTimeoutMs,
-int expectedMemberEpoch
+ConsumerGroupMember expectedMember
 ) {
-String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
-timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+String key = consumerGroupRebalanceTimeoutKey(groupId, 
expectedMember.memberId());
+timer.schedule(key, expectedMember.rebalanceTimeoutMs(), 
TimeUnit.MILLISECONDS, true, () -> {
 try {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
-ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(expectedMember.memberId(), false);
 
-if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
-member.memberEpoch() != expectedMemberEpoch) {
-log.debug("[GroupId {}] Ignoring revocation timeout for {} 
because the member " +
-"state does not match the expected state.", groupId, 
memberId);
+if (!member.assignmentEquals(expectedMember)) {

Review Comment:
   I also wasn't sure about the states being the same. Is it possible when we 
are unstable we will be not equal to the expected member? Or am I not 
understanding when we transition to stable correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380936875


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -301,11 +322,9 @@ public String toString() {
 private final int previousMemberEpoch;
 
 /**
- * The next member epoch. This corresponds to the target
- * assignment epoch used to compute the current assigned,
- * revoking and assigning partitions.
+ * The member state.
  */
-private final int targetMemberEpoch;
+private final MemberState state;

Review Comment:
   Is the idea with the new code that we just want to match the desired state 
(ie, add the assigned and remove the revoked)
   
   If we change the assignment then we will just bump the epoch and fence older 
ones?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380929415


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout(
 }
 
 /**
- * Schedules a revocation timeout for the member.
+ * Schedules a rebalance timeout for the member.
  *
- * @param groupId   The group id.
- * @param memberId  The member id.
- * @param revocationTimeoutMs   The revocation timeout.
- * @param expectedMemberEpoch   The expected member epoch.
+ * @param groupId   The group id.
+ * @param expectedMemberThe expected member.
  */
-private void scheduleConsumerGroupRevocationTimeout(
+private void scheduleConsumerGroupRebalanceTimeout(
 String groupId,
-String memberId,
-long revocationTimeoutMs,
-int expectedMemberEpoch
+ConsumerGroupMember expectedMember
 ) {
-String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
-timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+String key = consumerGroupRebalanceTimeoutKey(groupId, 
expectedMember.memberId());
+timer.schedule(key, expectedMember.rebalanceTimeoutMs(), 
TimeUnit.MILLISECONDS, true, () -> {
 try {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
-ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(expectedMember.memberId(), false);
 
-if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
-member.memberEpoch() != expectedMemberEpoch) {
-log.debug("[GroupId {}] Ignoring revocation timeout for {} 
because the member " +
-"state does not match the expected state.", groupId, 
memberId);
+if (!member.assignmentEquals(expectedMember)) {

Review Comment:
   So this works because we removed the pending assignment fields from the 
equality check? In other words, this should only be false when the fields that 
change indicate the assignment is no longer valid and not that the assignment 
just failed to complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923649


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed

Review Comment:
   Small question, irrespective of anything else, whenever there's a new 
assignment sent to the member we transition to stable, is that right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923649


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed

Review Comment:
   Small question, irrespective of anything else, whenever there's a new 
assignment sent to the member we transition to stable, is that right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923297


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout(
 }
 
 /**
- * Schedules a revocation timeout for the member.
+ * Schedules a rebalance timeout for the member.
  *
- * @param groupId   The group id.

Review Comment:
   should we have removed this? It is still a param below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380922729


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member

Review Comment:
   Just for my understanding, when we say new target assignment installed do we 
mean the assignment reconciliation on the client aka assignment installed on 
the client? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15774: introduce internal StoreFactory [kafka]

2023-11-02 Thread via GitHub


ableegoldman commented on code in PR #14659:
URL: https://github.com/apache/kafka/pull/14659#discussion_r1380921878


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java:
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
+
+public class StoreBuilderWrapper implements StoreFactory {

Review Comment:
   Ok one more thing: the javadocs explaining the StoreFactory are great, but I 
think this needs javaodcs as well. Is it just the default implementation of the 
StoreFactory class? Is it just here to cover the window/session store cases 
because they don't yet have a Materializer? etc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380913612


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -934,32 +934,25 @@ private 
CoordinatorResult consumerGr
 // 3. Reconcile the member's assignment with the target assignment. 
This is only required if
 // the member is not stable or if a new target assignment has been 
installed.
 boolean assignmentUpdated = false;
-if (updatedMember.state() != ConsumerGroupMember.MemberState.STABLE || 
updatedMember.targetMemberEpoch() != targetAssignmentEpoch) {
+if (!updatedMember.isReconciledTo(targetAssignmentEpoch)) {
 ConsumerGroupMember prevMember = updatedMember;
 updatedMember = new CurrentAssignmentBuilder(updatedMember)
 .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
 .withCurrentPartitionEpoch(group::currentPartitionEpoch)
 .withOwnedTopicPartitions(ownedTopicPartitions)
 .build();
 
-// Checking the reference is enough here because a new instance
-// is created only when the state has changed.
-if (updatedMember != prevMember) {

Review Comment:
   Is this due to the change with assignments or just a cleanup we wanted to do?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15774: introduce internal StoreFactory [kafka]

2023-11-02 Thread via GitHub


ableegoldman commented on code in PR #14659:
URL: https://github.com/apache/kafka/pull/14659#discussion_r1380912197


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyConfig;
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * What! Another mechanism for obtaining a {@link StateStore}? This isn't just
+ * an abuse of Java-isms... there's good reason for it. Here's how they are
+ * interconnected:
+ *
+ * 
+ * {@link org.apache.kafka.streams.state.StoreBuilder} is the innermost
+ * layer to provide state stores and is used directly by the PAPI.
+ *
+ * {@link org.apache.kafka.streams.state.StoreSupplier} is used by the
+ * DSL to provide preconfigured state stores as well as type-safe stores
+ * (e.g. {@link org.apache.kafka.streams.state.KeyValueBytesStoreSupplier}.
+ * Before passing the {@code StoreSupplier} into the 

Review Comment:
   incomplete sentence?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15774: introduce internal StoreFactory [kafka]

2023-11-02 Thread via GitHub


ableegoldman commented on code in PR #14659:
URL: https://github.com/apache/kafka/pull/14659#discussion_r1379396433


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##
@@ -27,26 +31,44 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * Materializes a key-value store as either a {@link 
TimestampedKeyValueStoreBuilder} or a
  * {@link VersionedKeyValueStoreBuilder} depending on whether the store is 
versioned or not.
  */
-public class KeyValueStoreMaterializer {
+public class KeyValueStoreMaterializer implements StoreFactory {
 private static final Logger LOG = 
LoggerFactory.getLogger(KeyValueStoreMaterializer.class);
 
 private final MaterializedInternal> 
materialized;
+private final Set connectedProcessorNames = new HashSet<>();
+
+private Materialized.StoreType defaultStoreType = 
Materialized.StoreType.ROCKS_DB;

Review Comment:
   nit: I don't love that we define the default store type in this random 
hard-to-find location. If we ever wanted to change it going forward I think 
this would be easily missed (or at the least, a pain to debug/locate).
   
   Can we at least define a variable for the default defaultStoreType 
(so...defaultDefaultStoreType?  ) and put that in a sensible location? I guess 
maybe where the storeType itself is defined?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##
@@ -27,26 +31,44 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * Materializes a key-value store as either a {@link 
TimestampedKeyValueStoreBuilder} or a
  * {@link VersionedKeyValueStoreBuilder} depending on whether the store is 
versioned or not.
  */
-public class KeyValueStoreMaterializer {
+public class KeyValueStoreMaterializer implements StoreFactory {
 private static final Logger LOG = 
LoggerFactory.getLogger(KeyValueStoreMaterializer.class);
 
 private final MaterializedInternal> 
materialized;
+private final Set connectedProcessorNames = new HashSet<>();
+
+private Materialized.StoreType defaultStoreType = 
Materialized.StoreType.ROCKS_DB;

Review Comment:
   nit: I don't love that we define the default store type in this random 
hard-to-find location. If we ever wanted to change it going forward I think 
this would be easily missed (or at the least, a pain to debug/locate).
   
   Can we at least define a variable for the default defaultStoreType 
(so...defaultDefaultStoreType?  ) and put that in a sensible location? I guess 
maybe where the storeType itself is defined?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on PR #14673:
URL: https://github.com/apache/kafka/pull/14673#issuecomment-1791722750

   > When partitions are revoked, the state machine cannot advanced until the 
consumer acknowledges the revocation. However, when partitions are assigned, 
the state machine does not wait on receiving an acknowledgement at all. This 
means that the next assignment can be delivered anytime.
   
   Just for my understanding, we decided that the assignment should be the same 
as revocation -- that we must acknowledge before taking on anything new. Is 
this because the other way (advancing the state machine on revocations) was not 
possible?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I was saying that appendEntries queues up a log record and we could use it 
there.
   
   I think Artem was not necessarily disagreeing with that idea but rather that 
a lock should be used at all. My understanding of his comment was that we 
should move to a model where changes are pending and those state transitions 
block further work. 
   
   I think we also discussed that the ordering of the log is not the important 
part but rather committing stale data. It was unclear to me from Jason's 
comments if this validation is fully necessary due to how we can sometimes 
write stale data to an offset partition when a rebalance happens after offsets 
are are written as part of the transaction but before the transaction is 
committed.
   
   Basically there are two questions I see:
   
   1. Can we make this code more async friendly to avoid locking? (And does the 
new group coordinator do this?)
   2. Do we really need the lock if we can write inconsistent data outside the 
typical locking mechanisms and we handle it correctly in those cases? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380899701


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > I was saying that appendEntries queues up a log record and we could use it 
there.
   > 
   > I think Artem was not necessarily disagreeing with that idea but rather 
that a lock should be used at all. My understanding of his comment was that we 
should move to a model where changes are pending and those state transitions 
block further work.
   > 
   > I think we also discussed that the ordering of the log is not the 
important part but rather committing stale data. It was unclear to me from 
Jason's comments if this validation is fully necessary due to how we can 
sometimes write stale data to a transaction when a rebalance happens after 
records are written but before the transaction is committed.
   > 
   > Basically there are two questions I see:
   > 
   > 1. Can we make this code more async friendly to avoid locking? (And does 
the new group coordinator do this?)
   > 2. Do we really need the lock if we can write inconsistent data outside 
the typical locking mechanisms and we handle it correctly in those cases?
   
   



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > I was saying that appendEntries queues up a log record and we could use it 
there.
   > 
   > I think Artem was not necessarily disagreeing with that idea but rather 
that a lock should be used at all. My understanding of his comment was that we 
should move to a model where changes are pending and those state transitions 
block further work.
   > 
   > I think we also discussed that the ordering of the log is not the 
important part but rather committing stale data. It was unclear to me from 
Jason's comments if this validation is fully necessary due to how we can 
sometimes write stale data to a transaction when a rebalance happens after 
records are written but before the transaction is committed.
   > 
   > Basically there are two questions I see:
   > 
   > 1. Can we make this code more async friendly to avoid locking? (And does 
the new group coordinator do this?)
   > 2. Do we really need the lock if we can write inconsistent data outside 
the typical locking mechanisms and we handle it correctly in those cases?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I was saying that appendEntries queues up a log record and we could use it 
there.
   
   I think Artem was not necessarily disagreeing with that idea but rather that 
a lock should be used at all. My understanding of his comment was that we 
should move to a model where changes are pending and those state transitions 
block further work. 
   
   I think we also discussed that the ordering of the log is not the important 
part but rather committing stale data. It was unclear to me from Jason's 
comments if this validation is fully necessary due to how we can sometimes 
write stale data to a transaction when a rebalance happens after records are 
written but before the transaction is committed.
   
   Basically there are two questions I see:
   
   1. Can we make this code more async friendly to avoid locking? (And does the 
new group coordinator do this?)
   2. Do we really need the lock if we can write inconsistent data outside the 
typical locking mechanisms and we handle it correctly in those cases? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380898562


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I was saying that appendEntries queues up a log record and we could use it 
there.
   
   I think Artem was not necessarily disagreeing with that idea but rather that 
a lock should be used at all.
   
   I think we also discussed that the ordering of the log is not the important 
part but rather committing stale data. It was unclear to me from Jason's 
comments if this validation is fully necessary due to how we can sometimes 
write stale data to a transaction when a rebalance happens after records are 
written but before the transaction is committed.
   
   Basically there are two questions I see:
   
   1. Can we make this code more async friendly to avoid locking? (And does the 
new group coordinator do this?)
   2. Do we really need the lock if we can write inconsistent data outside the 
typical locking mechanisms and we handle it correctly in those cases? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-02 Thread via GitHub


ijuma commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1380892982


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   The KIP page states:
   
   > Any change that impacts the public interfaces of the project
   
   Then it lists examples:
   
   > Any class for which the build generates Javadoc
   
   Hopefully these parts are not controversial. Then the next question is which 
parts of a class are part of the public interface. This is defined by Java 
accessibility rules combined with project specific conventions.
   
   When it comes to Java accessibility rules, public classes and methods are 
obvious. The next bucket are protected methods of non-final public classes - 
those are also part of the public interface since one can call such methods 
from a subclass. Newer Java versions also have the concept of sealed classes, 
but we can ignore those for now.
   
   Finally, you have project-specific conventions. Some projects state 
explicitly which classes are designed for inheritance and all other ones are 
not supposed to be inherited (and hence their protected methods are not part of 
the public interface). Kafka (as far as I know) hasn't done this yet, but we 
probably should. One example of a project specific convention we have is the 
InterfaceStability annotation (it relaxes compatibility guarantees, but doesn't 
affect what's considered part of the public interface).
   
   Going back to the original question, does this need a KIP? If we are to 
follow the rules of the project, absolutely. If we want to change the rules, we 
should have the discussion and update the KIP documentation. Otherwise, it's 
difficult to be fair and consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380888754


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Yes, the current flow in group coordinator is updating the state, queuing up 
a corresponding log record for commit, and calling the callback when the log 
record is committed. It needs the log records for a group to be queued up and 
committed in the same order as the updating of the state so that (1) the latest 
state can be replayed from the log records correctly in case of failure and (2) 
the callback can be called in the updating order. Currently, this is achieved 
through holding the group lock during updating the state and queuing up of the 
log record (implemented through `ReplicaManager.append`). The expectation is 
that queuing up of the log record can be done synchronously. Adding async txn 
validation dependency inside `ReplicaManager.append` breaks that expectation.
   
   To address this issue, we could move the async txn validation dependency out 
of `ReplicaManager.append`. Alternatively, we could implement another way of 
synchronously queuing up a log record since `ReplicaManager.append` does more 
than just queuing up a record.
   
   I am not sure holding the group lock just for updating the state is enough 
for ensuring the ordering of the log records. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-02 Thread via GitHub


ableegoldman commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1380884002


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   Personally I suppose I can see the argument that `protected` APIs in a 
public and non-final class are themselves part of the public API, as one is 
technically free to extend that class and would then need to (or at least be 
able to) leverage those APIs. 
   
   However...I think one could also argue that the public API is only what is 
directly accessible to the user, and that if you need to extend a public class 
in order to access a given API, then that API is not strictly speaking part of 
the API as it is only indirectly accessible from a custom user-defined class. 
Imo this is more of a hack/breach of internal APIs than a legitimate use of the 
public API -- as a user, I would not expect my custom implementation to have 
any guarantees or to be covered by the public contract, and would considered it 
my own responsibility/fault if my code failed to compile after an upgrade 
because there was a change to the `protected` API I was sneakily using via 
inheritance. 
   
   Just my own two cents about the situation: that frankly, accessing protected 
classes via inheritance is closer in spirit to accessing them (or even private 
methods) via reflection, than it is to accessing a true public API covered by 
the public contract. Just because one can leverage a feature of Java to gain 
access to an API, that doesn't make it a public API in itself.
   
   All that said, I am still happy to do a KIP if that will be the path of 
least resistance. @ijuma just let me know because I don't want to merge 
something that you would feel the need to revert  



##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   Personally I suppose I can see the argument that `protected` APIs in a 
public and non-final class are themselves part of the public API, as one is 
technically free to extend that class and would then need to (or at least be 
able to) leverage those APIs. 
   
   However...I think one could also argue that the public API is only what is 
directly accessible to the user, and that if you need to extend a public class 
in order to access a given API, then that API is not strictly speaking part of 
the API as it is only indirectly accessible from a custom user-defined class. 
Imo this is more of a hack/breach of internal APIs than a legitimate use of the 
public API -- as a user, I would not expect my custom implementation to have 
any guarantees or to be covered by the public contract, and would considered it 
my own responsibility/fault if my code failed to compile after an upgrade 
because there was a change to the `protected` API I was sneakily using via 
inheritance. 
   
   Just my own two cents about the situation: that frankly, accessing protected 
classes via inheritance is closer in spirit to accessing them (or even private 
methods) via reflection, than it is to accessing a true public API covered by 
the public contract. Just because one can leverage a feature of Java to gain 
access to an API, that doesn't make it a public API in itself.
   
   All that said, I am still happy to do a KIP if that will be the path of 
least resistance. @ijuma just let me know because I don't want to merge 
something that you would feel the need to revert  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380879868


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:

Review Comment:
   nit: Could we also include a list of all the valid states in the beginning, 
might be easier to follow the total states



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380878154


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.

Review Comment:
   nit: could we change all the he's to it maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change the ProducerConfig constructor that allows disabling logging to "protected" [kafka]

2023-11-02 Thread via GitHub


ableegoldman commented on code in PR #14681:
URL: https://github.com/apache/kafka/pull/14681#discussion_r1380876249


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -618,7 +618,7 @@ public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
-ProducerConfig(Map props, boolean doLog) {
+protected ProducerConfig(Map props, boolean doLog) {

Review Comment:
   @ijuma I can do a very quick KIP if you would like (and kick off the voting 
immediately as I think the change itself is uncontroversial). Obviously I would 
prefer not to, but am happy with whatever gets this through the fastest, even 
if that means just doing a KIP to avoid a long drawn-out debate
   
   Also, just to take a quick step back, we are not saying that "a change to 
javadocs of a public class" is what defines the public API, but rather anything 
that changes the javadocs of what is established as a public API then needs a 
KIP? In other words, the question is not really about the javadocs, but simply 
whether `protected` APIs are part of the public API? (I hadn't refreshed the 
page and so didn't see any of the responses after Bruno's initial comment when 
I wrote my reply above, but I still would like to clarify this concretely)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Small LogValidator clean ups [kafka]

2023-11-02 Thread via GitHub


hachikuji opened a new pull request, #14697:
URL: https://github.com/apache/kafka/pull/14697

   1. Set shallowOffsetOfMaxTimestamp consistently as the last offset in the 
batch for v2 compressed and non-compressed data.
   2. Rename `RecordConversionStats` to `RecordValidationStats` since one of 
its fields `temporaryMemoryBytes` does not depend on conversion.
   3. Rename `batchIndex` in `recordIndex` in loops over the records in each 
batch inside `LogValidator`.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [KAFKA-15022] optimization: detect negative cycle from one source [kafka]

2023-11-02 Thread via GitHub


lihaosky opened a new pull request, #14696:
URL: https://github.com/apache/kafka/pull/14696

   ### Description
   Introduce a dummy node connected to every other node and run Bellman-ford 
from the dummy node once instead of from every node in the graph.
   
   ### Testing
   Existing test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]

2023-11-02 Thread via GitHub


philipnee commented on code in PR #14680:
URL: https://github.com/apache/kafka/pull/14680#discussion_r1380850627


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -1096,4 +1114,47 @@ private void subscribeInternal(Collection 
topics, Optional 0) {
+invoker.executeCallbacks();
+}
+}
+
+// Visible for testing
+int callbacks() {
+return invoker.callbackQueue.size();
+}
+
+/**
+ * Utility class that helps the application thread to invoke user 
registered callbacks such as
+ * {@link OffsetCommitCallback}.  This is achieved by having the 
background thread to register a runnable to the
+ * invoker upon the future completion, and execute the callbacks when user 
polls the consumer.
+ */
+private static class CallbackInvoker {
+// Thread-safe queue to store callbacks
+private final BlockingQueue callbackQueue = new 
LinkedBlockingQueue<>();

Review Comment:
   In fact - using the backgroundEventQueue would not be feasible. I just 
realized the callbacks need to be invoked on all commit calls, and poll.  I 
don't know if there's a better way to do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: Migrating other system tests to use the group coordinator [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on PR #14582:
URL: https://github.com/apache/kafka/pull/14582#issuecomment-1791632833

   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5930/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15780: Wait for consistent KRaft metadata when creating topics [kafka]

2023-11-02 Thread via GitHub


splett2 opened a new pull request, #14695:
URL: https://github.com/apache/kafka/pull/14695

   ### What
   TestUtils.createTopicWithAdmin calls `waitForAllPartitionsMetadata` which 
waits for partition(s) to be present in each brokers' metadata cache. This is a 
sufficient check in ZK mode because the controller sends an LISR request before 
sending an UpdateMetadataRequest which means that the partition in the 
`ReplicaManager` will be updated before the metadata cache.
   
   In KRaft mode, the metadata cache is updated first, so the check may return 
before partitions and other metadata listeners are fully initialized.
   
   ### Testing
   Insert a `Thread.sleep(100)` in `BrokerMetadataPublisher.onMetadataUpdate` 
after 
   ```
 // Publish the new metadata image to the metadata cache.
 metadataCache.setImage(newImage)
   ```
   and run `EdgeCaseRequestTest.testProduceRequestWithNullClientId` and the 
test will fail locally nearly deterministically. After the change(s), the test 
no longer fails.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA 14515: Optimized Uniform Rack Aware Assignor [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14416:
URL: https://github.com/apache/kafka/pull/14416#discussion_r1380818617


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * The assignment builder is used to construct the target assignment based on 
the members' subscriptions.
+ *
+ * This class contains common utility methods and a class for obtaining and 
storing rack information.
+ */
+public abstract class AbstractUniformAssignmentBuilder {
+protected abstract GroupAssignment buildAssignment();
+
+/**
+ * Determines if rack-aware assignment is appropriate based on the 
provided rack information.
+ *
+ * @param memberRacks   Racks where members are located.
+ * @param allPartitionRacks Racks where partitions are located.
+ * @param racksPerPartition Map of partitions to their associated 
racks.
+ *
+ * @return {@code true} if rack-aware assignment should be applied; {@code 
false} otherwise.
+ */
+protected static boolean useRackAwareAssignment(
+Set memberRacks,
+Set allPartitionRacks,
+Map> racksPerPartition
+) {
+if (memberRacks.isEmpty() || Collections.disjoint(memberRacks, 
allPartitionRacks))
+return false;
+else {
+return 
!racksPerPartition.values().stream().allMatch(allPartitionRacks::equals);
+}
+}
+
+/**
+ * Adds the topic's partition to the member's target assignment.
+ */
+protected static void addPartitionToAssignment(
+Map memberAssignments,
+String memberId,
+Uuid topicId,
+int partition
+) {
+memberAssignments.get(memberId)
+.targetPartitions()
+.computeIfAbsent(topicId, __ -> new HashSet<>())
+.add(partition);
+}
+
+/**
+ * Constructs a set of {@code TopicIdPartition} including all the given 
topic Ids based on their partition counts.
+ *
+ * @param topicIds  The topic Ids.
+ * @param subscribedTopicDescriber  Utility to fetch the partition 
count for a given topic.
+ *
+ * @return Set of {@code TopicIdPartition} including all the provided 
topic Ids.
+ */
+protected static Set topicIdPartitions(
+Collection topicIds,
+SubscribedTopicDescriber subscribedTopicDescriber
+) {
+return topicIds.stream()
+.flatMap(topic ->
+IntStream.range(0, 
subscribedTopicDescriber.numPartitions(topic))
+.mapToObj(i -> new TopicIdPartition(topic, i))
+).collect(Collectors.toSet());
+}
+
+/**
+ * Represents the rack information of members and partitions along with 
utility methods
+ * to facilitate rack-aware assignment strategies for a given consumer 
group.
+ */
+protected static class RackInfo {
+/**
+ * Map of every member to its rack.
+ */
+protected final Map memberRacks;
+
+/**
+ * Map of every partition to a list of its racks.
+ */
+protected final Map> partitionRacks;
+
+/**
+ * List of members with the same rack as the partition.
+ */
+protected final Map> 
membersWithSameRackAsPartition;
+
+/**
+ * Indicates if a rack aware assignment can be done.
+ * True if racks are defined for both members and partitions and there 
is an intersection between the sets.
+ */
+protected final boolean useRackStrategy;
+
+/**
+ * Constructs rack information based on the assignment 

Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380469707


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   That makes sense to me. So maybe the verification would go through a 
different flow where the callback would include both the state update and the 
write via appendEntries. The main question I have is whether we should consider 
this as a separate JIRA and do in a follow-up PR. I am inclined to go with this 
option. 
   ~(But do it ASAP for 3.6.1)~ We don't need this for 3.6.1 since the offsets 
are not in 3.6.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380813788


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Hmm, I think it would break the abstraction of the appendRecords being a 
fundamentally asynchronous call, i.e. it has the syntax of being asynchronous 
(i.e. completion callback is surely called in a different thread) but still a 
concrete implementation expects that a specific part of the call is 
synchronous, but the code has no indication which one (maybe some day we'd use 
async IO and again make it more asynchronous then the code around it would 
expect).  Like why would the group coordinator need to know about transaction 
verification flow which has it's own trickiness with verification guard and may 
actually (likely) be different for KIP-890 part2?
   
   I think it would be easier for separation of concerns if the lock is held 
only around im-memory  state updates and just transition the state into a 
"pending state", so that the new updated would be queued or failed (with a 
retriable error) until async call is done.
   
   Then we wouldn't have a situation where we have a lock with a syntax that 
looks like "lock around in-memory update + call", but with the desired 
semantics "[lock] [in-memory update] [some part of call] [unlock] [some other 
part of call]" and the place of [unlock] is fairly deep in logic that has no 
indication that by doing an unrelated action (making the code as asynchronous 
as need to avoid holding threads) it could actually break the desired semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380469707


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   That makes sense to me. So maybe the verification would go through a 
different flow where the callback would include both the state update and the 
write via appendEntries. The main question I have is whether we should consider 
this as a separate JIRA and do in a follow-up PR. I am inclined to go with this 
option. 
   ~(But do it ASAP for 3.6.1)~ We don't need this since the offsets are not in 
3.6.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15780) Wait for consistent kraft metadata when creating topics in tests

2023-11-02 Thread David Mao (Jira)
David Mao created KAFKA-15780:
-

 Summary: Wait for consistent kraft metadata when creating topics 
in tests
 Key: KAFKA-15780
 URL: https://issues.apache.org/jira/browse/KAFKA-15780
 Project: Kafka
  Issue Type: Test
Reporter: David Mao


Tests occasionally flake when not retrying stale metadata in KRaft mode.
I suspect that the root cause is because TestUtils.createTopicWithAdmin waits 
for partitions to be present in the metadata cache but does not wait for the 
metadata to be fully published to the broker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA 14515: Optimized Uniform Rack Aware Assignor [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14416:
URL: https://github.com/apache/kafka/pull/14416#discussion_r1380781750


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java:
##
@@ -43,27 +45,62 @@ public void testAttribute() {
 new TopicMetadata(topicId, topicName, 5, partitionRacks)
 );
 }
-assertEquals(topicMetadataMap, new 
SubscribedTopicMetadata(topicMetadataMap).topicMetadata());
+ subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadataMap);
+}
+
+
+@Test
+public void testAttribute() {
+setup();

Review Comment:
   thankss



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA 14515: Optimized Uniform Rack Aware Assignor [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14416:
URL: https://github.com/apache/kafka/pull/14416#discussion_r1380777612


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.lang.Math.min;
+
+/**
+ * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * all its members subscribed to the same set of topics.
+ * It is optimized since the assignment can be done in fewer, less complicated 
steps compared to when
+ * the subscriptions are different across the members.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *  Balance:  Ensure partitions are distributed equally among all 
members.
+ *The difference in assignments sizes between any two 
members
+ *should not exceed one partition. 
+ *  Rack Matching:When feasible, aim to assign partitions to members
+ *located on the same rack thus avoiding cross-zone 
traffic. 
+ *  Stickiness:   Minimize partition movements among members by 
retaining
+ *as much of the existing assignment as possible. 
+ *
+ * The assignment builder prioritizes the properties in the following order:
+ *  Balance > Rack Matching > Stickiness.
+ */
+public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
+private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+
+/**
+ * The assignment specification which includes member metadata.
+ */
+private final AssignmentSpec assignmentSpec;
+
+/**
+ * The topic and partition metadata describer.
+ */
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+
+/**
+ * The set of topic Ids that the consumer group is subscribed to.
+ */
+private final Set subscribedTopicIds;
+
+/**
+ * Rack information and helper methods.
+ */
+private final RackInfo rackInfo;
+
+/**
+ * The number of members to receive an extra partition beyond the minimum 
quota.
+ * Minimum Quota = Total Partitions / Total Members
+ * Example: If there are 11 partitions to be distributed among 3 members,
+ *  each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
+ */
+private int remainingMembersToGetAnExtraPartition;
+
+/**
+ * Members mapped to the remaining number of partitions needed to meet the 
minimum quota.
+ * Minimum quota = total partitions / total members.
+ */
+private Map potentiallyUnfilledMembers;
+
+/**
+ * The partitions that still need to be assigned.
+ * Initially this contains all the subscribed topics' partitions.
+ */
+private Set unassignedPartitions;
+
+/**
+ * The target assignment.
+ */
+private final Map targetAssignment;
+
+/**
+ * Tracks the existing owner of each partition.
+ * Only populated when the rack awareness strategy is used.
+ */
+private final Map currentPartitionOwners;
+
+OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.assignmentSpec = assignmentSpec;
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscribedTopicIds = new 
HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+this.rackInfo = new 

Re: [PR] [WIP]KAFKA-15444: Native docker image [kafka]

2023-11-02 Thread via GitHub


ijuma commented on code in PR #14556:
URL: https://github.com/apache/kafka/pull/14556#discussion_r1380771281


##
docker/test/requirements.txt:
##
@@ -0,0 +1,6 @@
+confluent_kafka

Review Comment:
   Can we remove `confluent_kafka` from here then?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA 14515: Optimized Uniform Rack Aware Assignor [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14416:
URL: https://github.com/apache/kafka/pull/14416#discussion_r1380768591


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * The assignment builder is used to construct the target assignment based on 
the members' subscriptions.
+ *
+ * This class contains common utility methods and a class for obtaining and 
storing rack information.
+ */
+public abstract class AbstractUniformAssignmentBuilder {
+protected abstract GroupAssignment buildAssignment();
+
+/**
+ * Determines if rack-aware assignment is appropriate based on the 
provided rack information.
+ *
+ * @param memberRacks   Racks where members are located.
+ * @param allPartitionRacks Racks where partitions are located.
+ * @param racksPerPartition Map of partitions to their associated 
racks.
+ *
+ * @return {@code true} if rack-aware assignment should be applied; {@code 
false} otherwise.
+ */
+protected static boolean useRackAwareAssignment(
+Set memberRacks,
+Set allPartitionRacks,
+Map> racksPerPartition
+) {
+if (memberRacks.isEmpty() || Collections.disjoint(memberRacks, 
allPartitionRacks))
+return false;
+else {
+return 
!racksPerPartition.values().stream().allMatch(allPartitionRacks::equals);
+}
+}
+
+/**
+ * Adds the topic's partition to the member's target assignment.
+ */
+protected static void addPartitionToAssignment(
+Map memberAssignments,
+String memberId,
+Uuid topicId,
+int partition
+) {
+memberAssignments.get(memberId)
+.targetPartitions()
+.computeIfAbsent(topicId, __ -> new HashSet<>())
+.add(partition);
+}
+
+/**
+ * Constructs a set of {@code TopicIdPartition} including all the given 
topic Ids based on their partition counts.
+ *
+ * @param topicIds  The topic Ids.
+ * @param subscribedTopicDescriber  Utility to fetch the partition 
count for a given topic.
+ *
+ * @return Set of {@code TopicIdPartition} including all the provided 
topic Ids.
+ */
+protected static Set topicIdPartitions(
+Collection topicIds,
+SubscribedTopicDescriber subscribedTopicDescriber
+) {
+return topicIds.stream()
+.flatMap(topic ->
+IntStream.range(0, 
subscribedTopicDescriber.numPartitions(topic))
+.mapToObj(i -> new TopicIdPartition(topic, i))

Review Comment:
   oh yep changed it, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA 14515: Optimized Uniform Rack Aware Assignor [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14416:
URL: https://github.com/apache/kafka/pull/14416#discussion_r1380756376


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.lang.Math.min;
+
+/**
+ * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * all its members subscribed to the same set of topics.
+ * It is optimized since the assignment can be done in fewer, less complicated 
steps compared to when
+ * the subscriptions are different across the members.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *
+ *  Balance:  Ensure partitions are distributed equally among all 
members.
+ *The difference in assignments sizes between any two 
members
+ *should not exceed one partition. 
+ *  Rack Matching:When feasible, aim to assign partitions to members
+ *located on the same rack thus avoiding cross-zone 
traffic. 
+ *  Stickiness:   Minimize partition movements among members by 
retaining
+ *as much of the existing assignment as possible. 
+ *
+ * The assignment builder prioritizes the properties in the following order:
+ *  Balance > Rack Matching > Stickiness.
+ */
+public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
+private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+
+/**
+ * The assignment specification which includes member metadata.
+ */
+private final AssignmentSpec assignmentSpec;
+
+/**
+ * The topic and partition metadata describer.
+ */
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+
+/**
+ * The set of topic Ids that the consumer group is subscribed to.
+ */
+private final Set subscriptionIds;
+
+/**
+ * Rack information and helper methods.
+ */
+private final RackInfo rackInfo;
+
+/**
+ * The number of members to receive an extra partition beyond the minimum 
quota.
+ * Minimum Quota = Total Partitions / Total Members
+ * Example: If there are 11 partitions to be distributed among 3 members,
+ *  each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
+ */
+private int remainingMembersToGetAnExtraPartition;
+
+/**
+ * Members mapped to the remaining number of partitions needed to meet the 
minimum quota.
+ * Minimum quota = total partitions / total members.
+ */
+private Map potentiallyUnfilledMembers;
+
+/**
+ * The partitions that still need to be assigned.
+ * Initially this contains all the subscribed topics' partitions.
+ */
+private Set unassignedPartitions;
+
+/**
+ * The target assignment.
+ */
+private final Map targetAssignment;
+
+/**
+ * Tracks the existing owner of each partition.
+ * Only populated when the rack awareness strategy is used.
+ */
+private final Map currentPartitionOwners;
+
+OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.assignmentSpec = assignmentSpec;
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = new 
HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+

Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380752726


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   This is effectively what we are doing right? appendEntries is just replica 
manager append w/o further txn validation. But maybe I misunderstood the 
suggestion since we still need the callback logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380752726


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   This is effectively what we are doing right? appendEntries is just replica 
manager append w/o further txn validation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380747077


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -67,17 +67,11 @@ public void testOverrideClientId() {
 @Test
 public void testOverrideEnableAutoCommit() {
 ConsumerConfig config = new ConsumerConfig(properties);
-boolean overrideEnableAutoCommit = 
config.maybeOverrideEnableAutoCommit();
-assertFalse(overrideEnableAutoCommit);
+//boolean overrideEnableAutoCommit = 
InternalConsumerConfig.maybeOverrideEnableAutoCommit(config);

Review Comment:
   Agreed. I revised this test method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380746726


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static 

Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-02 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1380724965


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I guess that proposal is that when serving an `OffsetCommit` request, 
KafkaAPIs first determines the partitions for txn validation. It then initiates 
the txn validation asychnronously. Once the validation is done, it calls 
`ReplicaManager.append` w/o further txn validation.
   
   If we do this, I am wondering if we should just do the same for regular 
produce requests too. This way, we don't need the callback logic for txn 
validation in `ReplicaManager.append`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380719222


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig {
  */
 static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
"internal.leave.group.on.close";
 
-/**
- * internal.throw.on.fetch.stable.offset.unsupported
- * Whether or not the consumer should throw when the new stable offset 
feature is supported.
- * If set to true then the client shall crash upon hitting it.
- * The purpose of this flag is to prevent unexpected broker downgrade 
which makes
- * the offset fetch protection against pending commit invalid. The safest 
approach
- * is to fail fast to avoid introducing correctness issue.
- *
- * 
- * Note: this is an internal configuration and could be changed in the 
future in a backward incompatible way
- *
- */
-static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = 
"internal.throw.on.fetch.stable.offset.unsupported";

Review Comment:
   I have reverted this change. I have left 
`THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED` in `ConsumerConfig` but I've added a 
string constant in `ConsumerUtils` just so it can be referenced in the 
`internals` package.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380718362


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -982,23 +987,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
 }
 }
 
-// This is here temporary as we don't have public access to the 
ConsumerConfig in this module.
-public static Map appendDeserializerToConfig(Map configs,
- 
Deserializer keyDeserializer,
- 
Deserializer valueDeserializer) {
-// validate deserializer configuration, if the passed deserializer 
instance is null, the user must explicitly set a valid deserializer 
configuration value
-Map newConfigs = new HashMap<>(configs);
-if (keyDeserializer != null)
-newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass());
-else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null)
-throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-if (valueDeserializer != null)
-newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass());
-else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
-throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-return newConfigs;
-}
-

Review Comment:
   This appears to be outdated. This is not in `ConsumerUtils` but stays in 
`ConsumerConfig`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380717397


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -666,19 +651,6 @@ else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) 
== null)
 return newConfigs;
 }
 
-boolean maybeOverrideEnableAutoCommit() {
-Optional groupId = 
Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
-boolean enableAutoCommit = 
getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
-if (!groupId.isPresent()) { // overwrite in case of default group id 
where the config is not explicitly provided
-if (!originals().containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
-enableAutoCommit = false;
-} else if (enableAutoCommit) {
-throw new 
InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " 
cannot be set to true when default group id (null) is used.");
-}
-}
-return enableAutoCommit;
-}
-

Review Comment:
   This is now outdated. It's back in `ConsumerConfig`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380717923


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -250,7 +255,7 @@ public PrototypeAsyncConsumer(final Time time,
 // no coordinator will be constructed for the default (null) group 
id
 if (!groupId.isPresent()) {
 config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-
//config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+
config.ignore(ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);

Review Comment:
   I've moved the core variable back to `ConsumerConfig` but left a copy in 
`ConsumerUtils` with an explanation of why it's redefined there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


AndrewJSchofield commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380705419


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static 

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-02 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1380683789


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,33 +242,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoinGroup();
+});
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
 public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = -1;
+invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+transitionTo(MemberState.FATAL);
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void transitionToJoinGroup() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
 }
 
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+callbackResult = CompletableFuture.completedFuture(null);
+} else {
+// Release assignment
+if (memberEpoch > 0) {
+// Member is part of the group. Invoke onPartitionsRevoked.
+callbackResult = revokePartitions(droppedPartitions);
+} else {
+// Member is not part of the group anymore. Invoke 
onPartitionsLost.
+callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+}
+}
+return callbackResult;
+}
+
+/**
+ * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+ * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+ * request is sent out with it.
+ */
+private void transitionToSendingLeaveGroup() {
+memberEpoch = leaveGroupEpoch();
+currentAssignment = new HashSet<>();
+targetAssignment = Optional.empty();
+transitionTo(MemberState.SENDING_LEAVE_REQUEST);
+}

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-02 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1380694833


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -73,30 +84,55 @@ public interface MembershipManager {
 /**
  * @return Current assignment for the member.
  */
-ConsumerGroupHeartbeatResponseData.Assignment currentAssignment();
+Set currentAssignment();
 
 /**
- * Update the assignment for the member, indicating that the provided 
assignment is the new
- * current assignment.
+ * @return Assignment that the member received from the server but hasn't 
finished processing
+ * yet.
  */
-void 
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
+Optional targetAssignment();

Review Comment:
   Totally, it seemed it was needed here at some point but not anymore. 
Removing it & cleaning up, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-02 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1380693973


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -260,42 +743,16 @@ public Optional serverAssignor() {
  * {@inheritDoc}
  */
 @Override
-public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
+public Set currentAssignment() {
 return this.currentAssignment;
 }
 
 
 /**
  * @return Assignment that the member received from the server but hasn't 
completely processed
- * yet. Visible for testing.

Review Comment:
   Good catch, I needed it public at some point but it ended up not being 
needed in the end. So putting it back to package-private and "Visible for 
testing"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15779) Define broker configurations and exceptions

2023-11-02 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15779:
-

 Summary: Define broker configurations and exceptions
 Key: KAFKA-15779
 URL: https://issues.apache.org/jira/browse/KAFKA-15779
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal


The KIP-714 adds 2 exceptions and 1 server configuration:

{{TELEMETRY_TOO_LARGE}}  (NEW)

{{UNKNOWN_SUBSCRIPTION_ID}} (NEW)

Configuration:

{{telemetry.max.bytes}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-02 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1380683789


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,33 +242,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoinGroup();
+});
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
 public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+// Update epoch to indicate that the member is not in the group 
anymore, so that the
+// onPartitionsLost is called to release assignment.
+memberEpoch = -1;
+invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+transitionTo(MemberState.FATAL);
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void transitionToJoinGroup() {
+resetEpoch();
+transitionTo(MemberState.JOINING);
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+public CompletableFuture leaveGroup() {
+transitionTo(MemberState.LEAVING);
+
+CompletableFuture callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+callbackResult.whenComplete((result, error) -> {
+
+// Clear the subscription, no matter if the callback execution 
failed or succeeded.
+subscriptions.assignFromSubscribed(Collections.emptySet());
+
+// Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+// group (even in the case where the member had no assignment to 
release or when the
+// callback execution failed.)
+transitionToSendingLeaveGroup();
+
+});
+
+// Return callback future to indicate that the leave group is done 
when the callbacks
+// complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+// but do not hold the leave group operation for it)
+return callbackResult;
 }
 
+/**
+ * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+ * onPartitionsLost.
+ * 
+ * If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+ * This will be the case when releasing assignment because the member 
is intentionally
+ * leaving the group (after a call to unsubscribe)
+ *
+ * If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+ * This will be the case when releasing assignment after being fenced 
.
+ * 
+ *
+ * @return Future that will complete when the callback execution completes.
+ */
+private CompletableFuture 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+SortedSet droppedPartitions = new 
TreeSet<>(COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+CompletableFuture callbackResult;
+if (droppedPartitions.isEmpty()) {
+// No assignment to release
+callbackResult = CompletableFuture.completedFuture(null);
+} else {
+// Release assignment
+if (memberEpoch > 0) {
+// Member is part of the group. Invoke onPartitionsRevoked.
+callbackResult = revokePartitions(droppedPartitions);
+} else {
+// Member is not part of the group anymore. Invoke 
onPartitionsLost.
+callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+}
+}
+return callbackResult;
+}
+
+/**
+ * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+ * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+ * request is sent out with it.
+ */
+private void transitionToSendingLeaveGroup() {
+memberEpoch = leaveGroupEpoch();
+currentAssignment = new HashSet<>();
+targetAssignment = Optional.empty();
+transitionTo(MemberState.SENDING_LEAVE_REQUEST);
+}

[jira] [Created] (KAFKA-15778) Implement ClientMetricsManager to process request

2023-11-02 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15778:
-

 Summary: Implement ClientMetricsManager to process request
 Key: KAFKA-15778
 URL: https://issues.apache.org/jira/browse/KAFKA-15778
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal


A manager to handle client telemetry request/responses along with cache to 
persist client instance state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Log a warning when connectors generate greater than tasks.max task configs [kafka]

2023-11-02 Thread via GitHub


C0urante commented on PR #14694:
URL: https://github.com/apache/kafka/pull/14694#issuecomment-1791420759

   @gharris1727 @mimaison given your activity on 
[KAFKA-15575](https://issues.apache.org/jira/browse/KAFKA-15575), this may 
interest you. We can also backport this change (which is unlikely for any of 
the approaches discussed on that ticket so far).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Log a warning when connectors generate greater than tasks.max task configs [kafka]

2023-11-02 Thread via GitHub


C0urante opened a new pull request, #14694:
URL: https://github.com/apache/kafka/pull/14694

   The discussion around 
[KIP-987](https://cwiki.apache.org/confluence/display/KAFKA/KIP-987%3A+Connect+Static+Assignments)
 has highlighted some strange behavior in the Connect runtime: if a connector's 
[taskConfigs 
method](https://kafka.apache.org/36/javadoc/org/apache/kafka/connect/connector/Connector.html#taskConfigs(int))
 returns more tasks than the value for its configuration's [tasks.max 
property](https://kafka.apache.org/documentation.html#sourceconnectorconfigs_tasks.max),
 nothing happens (i.e., every task the connector generated a config for is 
brought up).
   
   In [KAFKA-15575](https://issues.apache.org/jira/browse/KAFKA-15575), we 
discuss the possibility of taking action in this situation: failing the 
connector, only bringing up `tasks.max` tasks, and others. We may adopt one of 
these approaches in the future. For now, this warning message should at least 
get the ball rolling and decrease the likelihood of buggy connectors causing 
problems for Connect clusters that use static assignment (if/when that feature 
lands).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380667322


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");
+
+// Takes care of both the null and type checks.
+if (!(groupProtocol instanceof String))
+return false;
+
+return ((String) groupProtocol).equalsIgnoreCase("consumer");

Review Comment:
   Agreed. This code was written before the `ConsumerConfig` changes were in. 
I've updated it to use the `ConsumerConfig` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r138030


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380667051


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");
+
+// Takes care of both the null and type checks.
+if (!(groupProtocol instanceof String))
+return false;

Review Comment:
   This code was written before the `ConsumerConfig` changes were in. I've 
updated it to use the `ConsumerConfig` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380664987


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java:
##
@@ -2582,7 +2581,7 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
 return fetchResponse(Collections.singletonMap(partition, fetchInfo));
 }
 
-private KafkaConsumer newConsumer(Time time,
+private LegacyKafkaConsumer newConsumer(Time time,
   KafkaClient client,

Review Comment:
   Thanks. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380663703


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java:
##
@@ -164,7 +163,7 @@
  * Note to future authors in this class. If you close the consumer, close with 
DURATION.ZERO to reduce the duration of
  * the test.
  */
-public class KafkaConsumerTest {
+public class LegacyKafkaConsumerTest {

Review Comment:
   We have similar tests. My other attempt at the delegate work was much 
_deeper_ so that we could reuse this test and run it against both 
implementations, but it got out of hand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380662917


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2623 @@
+/*

Review Comment:
   Yes. The only changes are the removal of the extra constructors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380662469


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -108,13 +106,19 @@
 import static org.apache.kafka.common.utils.Utils.propsToMap;
 
 /**
- * This prototype consumer uses an {@link ApplicationEventHandler event 
handler} to process
- * {@link ApplicationEvent application events} so that the network IO can be 
processed in a dedicated
+ * This {@link Consumer} implementation uses an {@link ApplicationEventHandler 
event handler} to process
+ * {@link ApplicationEvent application events} so that the network I/O can be 
processed in a dedicated
  * {@link ConsumerNetworkThread network thread}. Visit
  * https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor;>this
 document
- * for detail implementation.
+ * for implementation detail.
+ *
+ * 
+ *
+ * Note: this {@link Consumer} implementation is part of the revised 
consumer group protocol from KIP-848.
+ * This class should not be invoked directly; users should instead create a 
{@link KafkaConsumer} as before.
+ * This consumer implements the new consumer group protocol and is intended to 
be the default in coming releases.
  */
-public class PrototypeAsyncConsumer implements Consumer {
+public class AsyncKafkaConsumer implements Consumer {

Review Comment:
   Done.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -148,30 +152,38 @@ public class PrototypeAsyncConsumer implements 
Consumer {
 private boolean cachedSubscriptionHasAllFetchPositions;
 private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
 
-public PrototypeAsyncConsumer(final Properties properties,
-  final Deserializer keyDeserializer,
-  final Deserializer valueDeserializer) {
+public AsyncKafkaConsumer(Map configs) {

Review Comment:
   Yep-done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380661924


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");
+
+// Takes care of both the null and type checks.
+if (!(groupProtocol instanceof String))
+return false;
+
+return ((String) groupProtocol).equalsIgnoreCase("consumer");
+}
+
+public  Consumer create(Map configs) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs);
+} else {
+return new LegacyKafkaConsumer<>(configs);
+}
+});
+}
+
+public  Consumer create(Properties properties) {
+return createInternal(() -> {
+if (useNewConsumer(properties)) {
+return new AsyncKafkaConsumer<>(properties);
+} else {
+return new LegacyKafkaConsumer<>(properties);
+}
+});
+}
+
+public  Consumer create(Properties properties,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+return createInternal(() -> {
+if (useNewConsumer(properties)) {
+return new AsyncKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+} else {
+return new LegacyKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+}
+});
+}
+
+public  Consumer create(Map configs,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+} else {
+return new LegacyKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+}
+});
+}
+
+public  Consumer create(ConsumerConfig config,
+Deserializer keyDeserializer,
+Deserializer 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380661746


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {

Review Comment:
   I've refactored the code to use fewer constructors in the delegates and pass 
a fully-constructed `ConsumerConfig` to the delegates.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the 

Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-02 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1380660843


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -24,21 +24,34 @@
 public enum MemberState {
 
 /**
- * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+ * Member is not part of the group. This could be the case when it has 
never joined (no call
+ * has been made to the subscribe API), or when the member intentionally 
leaves the group
+ * after a call to the unsubscribe API.
  */
-UNJOINED,
+NOT_IN_GROUP,

Review Comment:
   You're right, this is the case where a consumer, with a groupId, is not part 
of the group (either it hasn't called subscribed, or it called unsubscribe)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-02 Thread via GitHub


lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1380659584


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -729,6 +729,13 @@ protected MetadataRequest.Builder 
newMetadataRequestBuilderForNewTopics() {
 return null;
 }
 
+/**
+ * @return Mapping from topic IDs to topic names for all topics in the 
cache.
+ */
+public synchronized Map topicNames() {

Review Comment:
   I moved it to the internals for now, as it's truly for internal use (even 
though I expect we might want something similar later on as we use topicId more 
in the client code).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-02 Thread via GitHub


AndrewJSchofield commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1380393617


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##
@@ -48,26 +61,50 @@ public enum MemberState {
  * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
  * broker. This is a recoverable state, where the member
  * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
- * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+ * transitions to {@link #JOINING} to rejoin the group as a new member.
  */
 FENCED,
 
 /**
- * The member failed with an unrecoverable error
+ * The member transitions to this state when it is leaving the group after 
a call to
+ * unsubscribe. It stays in this state while releasing its assignment 
(calling user's callback
+ * for partitions revoked or lost), until the callback completes and a 
heartbeat request is
+ * sent out to effectively leave the group (without waiting for a 
response).
+ */
+LEAVING_GROUP,
+
+/**
+ * Member has completed releasing its assignment, and stays in this state 
until the next
+ * heartbeat request is sent out to leave the group.
  */
-FAILED;
+SENDING_LEAVE_REQUEST,
 
+/**
+ * The member failed with an unrecoverable error.
+ */
+FATAL;
+
+/**
+ * Valid state transitions
+ */
 static {
-// Valid state transitions
-STABLE.previousValidStates = Arrays.asList(UNJOINED, RECONCILING);
 
-RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED);
+STABLE.previousValidStates = Arrays.asList(JOINING, 
ACKNOWLEDGING_RECONCILED_ASSIGNMENT);
+
+RECONCILING.previousValidStates = Arrays.asList(STABLE, JOINING);
+
+ACKNOWLEDGING_RECONCILED_ASSIGNMENT.previousValidStates = 
Arrays.asList(RECONCILING);
+
+FATAL.previousValidStates = Arrays.asList(JOINING, STABLE, 
RECONCILING, ACKNOWLEDGING_RECONCILED_ASSIGNMENT);

Review Comment:
   That's OK. I was just asking an innocent question. Makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >