[jira] [Created] (KAFKA-16320) CreateTopics, DeleteTopics and CreatePartitions differences between Zookeeper and KRaft

2024-03-01 Thread Emanuele Sabellico (Jira)
Emanuele Sabellico created KAFKA-16320:
--

 Summary: CreateTopics, DeleteTopics and CreatePartitions 
differences between Zookeeper and KRaft
 Key: KAFKA-16320
 URL: https://issues.apache.org/jira/browse/KAFKA-16320
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Emanuele Sabellico


Test number 0081 with these operations  is failing in librdkafka when using 
KRaft but not when using Zookeeper. The test sets the operation timeout to 0 
and expects that those operations are executed asynchronously. The returned err 
was REQUEST_TIMED_OUT and it was converted to NO_ERROR if operation timeout is 
<= 0.
With KRaft instead NO_ERROR is returned, but the topics aren't created or 
deleted.
Also passing an invalid configuration option it's returning NO_ERROR instead of 
INVALID_CONFIG, that is what happens in Zookeeper or with KRaft if operation 
timeout is > 0.

https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L5174C9-L5174C29

{code:java}
/* For non-blocking CreateTopicsRequests the broker
 * will returned REQUEST_TIMED_OUT for topics
 * that were triggered for creation -
 * we hide this error code from the application
 * since the topic creation is in fact in progress. */
if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT &&
rd_kafka_confval_get_int(_req->rko_u.admin_request
.options.operation_timeout) <=
0) {
error_code  = RD_KAFKA_RESP_ERR_NO_ERROR;
this_errstr = NULL;
}
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-02-28 Thread Emanuele Sabellico (Jira)
Emanuele Sabellico created KAFKA-16310:
--

 Summary: ListOffsets doesn't report the offset with maxTimestamp 
anymore
 Key: KAFKA-16310
 URL: https://issues.apache.org/jira/browse/KAFKA-16310
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Emanuele Sabellico


The last one is reported instead.
A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
that the offset with the max timestamp is the middle one and not the last one. 
The tests is passing with 3.6.0 and previous versions



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16147) Partition is assigned to two members at the same time

2024-01-16 Thread Emanuele Sabellico (Jira)
Emanuele Sabellico created KAFKA-16147:
--

 Summary: Partition is assigned to two members at the same time
 Key: KAFKA-16147
 URL: https://issues.apache.org/jira/browse/KAFKA-16147
 Project: Kafka
  Issue Type: Sub-task
Reporter: Emanuele Sabellico


While running test 0113 of librdkafka, subtest 
_u_multiple_subscription_changes_ have received this error saying that a 
partition is assigned to two members at the same time.

{code:java}
Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] 
which is already assigned to consumer C_5#consumer-8 {code}
I've reconstructed this sequence:

C_5 SUBSCRIBES TO T1

{code:java}
%7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
"rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id 
"(null)", current assignment "", subscribe topics 
"rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{code}

C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12


{code:java}
[2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 topic=__consumer_offsets 
partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member 
RaTCu6RXQH-FiSl95iZzdw transitioned from CurrentAssignment(memberEpoch=6, 
previousMemberEpoch=0, targetMemberEpoch=6, state=assigning, 
assignedPartitions={}, partitionsPendingRevocation={}, 
partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to 
CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=14, 
state=stable, assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
partitionsPendingRevocation={}, partitionsPendingAssignment={}). 
(org.apache.kafka.coordinator.group.GroupMetadataManager)C_5 RECEIVES TARGET 
ASSIGNMENT %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: 
GroupCoordinator/1: Heartbeat response received target assignment 
"(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
(null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{code}


C_5 ACKS TARGET ASSIGNMENT

{code:java}
%7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
"rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
"NULL", current assignment 
"rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], 
rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], 
rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", subscribe 
topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" 
%7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat response received target assignment 
"(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
(null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{code}


C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are pending 
{code:java}
%7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
"rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
"NULL", current assignment "NULL", subscribe topics 
"rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], 
rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" [2024-01-16 12:10:52,615] 
INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] [GroupId 
rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated its 
subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, 
rdkafkatest_rnd5a91902462d61c2e_0113u_1]. 
(org.apache.kafka.coordinator.group.GroupMetadataManager) [2024-01-16 
12:10:52,616] INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] 
[GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
targetMemberEpoch=14, state=stable, 
assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
partitionsPendingRevocation={}, partitionsPendingAssignment={}) to 
CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=16, 
state=revoking, assignedPartitions={}, 
partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
partitionsPendingAssignment={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 8, 9]}). 
(org.apache.kafka.coordinator.group.GroupMetadataManager) 
%7|1705403452.618|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
Heartbeat response received target assignment ""{code}

C_5 FINISHES REVOCATION

{code:java}
%7|1705403452.618|CGRPJOINSTATE|C_5#consumer-8| [thrd:main]: Group 
"rdkafkatest_rnd53b4eb0c2de343_0113u" changed join state wait-assign-call -> 
steady (

[jira] [Created] (KAFKA-15997) Ensure fairness in the uniform assignor

2023-12-12 Thread Emanuele Sabellico (Jira)
Emanuele Sabellico created KAFKA-15997:
--

 Summary: Ensure fairness in the uniform assignor
 Key: KAFKA-15997
 URL: https://issues.apache.org/jira/browse/KAFKA-15997
 Project: Kafka
  Issue Type: Sub-task
Reporter: Emanuele Sabellico


 

 

Fairness has to be ensured in uniform assignor as it was in cooperative-sticky 
one.

There's this test 0113 subtest u_multiple_subscription_changes in librdkafka 
where 8 consumers are subscribing to the same topic, and it's verifying that 
all of them are getting 2 partitions assigned. But with new protocol it seems 
two consumers get assigned 3 partitions and 1 has zero partitions. The test 
doesn't configure any client.rack.


{code:java}
[0113_cooperative_rebalance  /478.183s] Consumer assignments 
(subscription_variation 0) (stabilized) (no rebalance cb):
[0113_cooperative_rebalance  /478.183s] Consumer C_0#consumer-3 assignment (2): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [5] (2000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [8] (4000msgs)
[0113_cooperative_rebalance  /478.183s] Consumer C_1#consumer-4 assignment (3): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [0] (1000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [3] (2000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [13] (1000msgs)
[0113_cooperative_rebalance  /478.184s] Consumer C_2#consumer-5 assignment (2): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [6] (1000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [10] (2000msgs)
[0113_cooperative_rebalance  /478.184s] Consumer C_3#consumer-6 assignment (2): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [7] (1000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [9] (2000msgs)
[0113_cooperative_rebalance  /478.184s] Consumer C_4#consumer-7 assignment (2): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [11] (1000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [14] (3000msgs)
[0113_cooperative_rebalance  /478.184s] Consumer C_5#consumer-8 assignment (3): 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [1] (2000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [2] (2000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [4] (1000msgs)
[0113_cooperative_rebalance  /478.184s] Consumer C_6#consumer-9 assignment (0): 
[0113_cooperative_rebalance  /478.184s] Consumer C_7#consumer-10 assignment 
(2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [12] (1000msgs), 
rdkafkatest_rnd24419cc75e59d8de_0113u_1 [15] (1000msgs)
[0113_cooperative_rebalance  /478.184s] 16/32 partitions assigned
[0113_cooperative_rebalance  /478.184s] Consumer C_0#consumer-3 has 2 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_1#consumer-4 has 3 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_2#consumer-5 has 2 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_3#consumer-6 has 2 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_4#consumer-7 has 2 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_5#consumer-8 has 3 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_6#consumer-9 has 0 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[0113_cooperative_rebalance  /478.184s] Consumer C_7#consumer-10 has 2 assigned 
partitions (1 subscribed topic(s)), expecting 2 assigned partitions
[                      /479.057s] 1 test(s) running: 
0113_cooperative_rebalance
[                      /480.057s] 1 test(s) running: 
0113_cooperative_rebalance
[                      /481.057s] 1 test(s) running: 
0113_cooperative_rebalance
[0113_cooperative_rebalance  /482.498s] TEST FAILURE
### Test "0113_cooperative_rebalance (u_multiple_subscription_changes:2390: 
use_rebalance_cb: 0, subscription_variation: 0)" failed at 
test.c:1243:check_test_timeouts() at Thu Dec  7 15:52:15 2023: ###
Test 0113_cooperative_rebalance (u_multiple_subscription_changes:2390: 
use_rebalance_cb: 0, subscription_variation: 0) timed out (timeout set to 480 
seconds)
./run-test.sh: line 62: 3512920 Killed                  $TEST $ARGS
###
### Test ./test-runner in bare mode FAILED! (return code 137) ###
###{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15989) Upgrade existing generic group to consumer group

2023-12-08 Thread Emanuele Sabellico (Jira)
Emanuele Sabellico created KAFKA-15989:
--

 Summary: Upgrade existing generic group to consumer group
 Key: KAFKA-15989
 URL: https://issues.apache.org/jira/browse/KAFKA-15989
 Project: Kafka
  Issue Type: Sub-task
Reporter: Emanuele Sabellico


It should be possible to upgrade an existing generic group to a new consumer 
group, in case it was using either the previous generic protocol or manual 
partition assignment and commit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


RE: Re: [DISCUSS] KIP-951: Leader discovery optimisations for the client

2023-07-21 Thread Emanuele Sabellico
The downsides of bumping the version is that clients have to have all 
the latest features implemented before being able to benefit from this 
performance improvement.
One of the benefits of using a tagged field is to make the field 
available to previous versions too.

Choosing a minimum value for taggedVersions could be an alternative.

On 2023/07/13 17:30:45 Andrew Schofield wrote:
> Hi Mayank,
> If we bump the version, the broker can tell whether it’s worth 
providing the leader
> endpoint information to the client when the leader has changed. 
That’s my reasoning.

>
> Thanks,
> Andrew
>
> > On 13 Jul 2023, at 18:02, Mayank Shekhar Narula wrote:
> >
> > Thanks both for looking into this.
> >
> > Jose,
> >
> > 1/2 & 4(changes for PRODUCE) & 5 makes sense, will follow
> >
> > 3. If I understood this correctly, certain replicas "aren't" 
brokers, what

> > are they then?
> >
> > Also how about replacing "Replica" with "Leader", this is more 
readable on

> > the client. so, how about this?
> > { "name": "LeaderEndpoints", "type": "[]Leader", "versions": "15+",
> > "taggedVersions": "15+", "tag": 3,
> > "about": "Endpoints for all current leaders enumerated in
> > PartitionData.", "fields": [
> > { "name": "NodeId", "type": "int32", "versions": "15+",
> > "mapKey": true, "entityType": "brokerId", "about": "The ID of the
> > associated leader"},
> > { "name": "Host", "type": "string", "versions": "15+",
> > "about": "The leader's hostname." },
> > { "name": "Port", "type": "int32", "versions": "15+",
> > "about": "The leader's port." },
> > { "name": "Rack", "type": "string", "versions": "15+", "ignorable":
> > true, "default": "null",
> > "about": "The rack of the leader, or null if it has not been
> > assigned to a rack." }
> > ]}
> >
> > Andrew
> >
> > 6. I wonder if non-Kafka clients might benefit from not bumping the
> > version. If versions are bumped, say for FetchResponse to 16, I believe
> > that client would have to support all versions until 16 to fully 
utilise
> > this feature. Whereas, if not bumped, they can simply support until 
version
> > 12( will change to version:12 for tagged fields ), and non-AK 
clients can
> > then implement this feature. What do you think? I am inclined to 
not bump.

> >
> > On Thu, Jul 13, 2023 at 5:21 PM Andrew Schofield <
> > andrew_schofield_j...@outlook.com> wrote:
> >
> >> Hi José,
> >> Thanks. Sounds good.
> >>
> >> Andrew
> >>
> >>> On 13 Jul 2023, at 16:45, José Armando García Sancio
> >> wrote:
> >>>
> >>> Hi Andrew,
> >>>
> >>> On Thu, Jul 13, 2023 at 8:35 AM Andrew Schofield
> >>> wrote:
>  I have a question about José’s comment (2). I can see that it’s
> >> possible for multiple
>  partitions to change leadership to the same broker/node and it’s
> >> wasteful to repeat
>  all of the connection information for each topic-partition. But, I
> >> think it’s important to
>  know which partitions are now lead by which node. That 
information at

> >> least needs to be
>  per-partition I think. I may have misunderstood, but it sounded like
> >> your comment
>  suggestion lost that relationship.
> >>>
> >>> Each partition in both the FETCH response and the PRODUCE response
> >>> will have the CurrentLeader, the tuple leader id and leader epoch.
> >>> Clients can use this information to update their partition to leader
> >>> id and leader epoch mapping.
> >>>
> >>> They can also use the NodeEndpoints to update their mapping from
> >>> replica id to the tuple host, port and rack so that they can connect
> >>> to the correct node for future FETCH requests and PRODUCE requests.
> >>>
> >>> Thanks,
> >>> --
> >>> -José
> >>
> >>
> >
> > --
> > Regards,
> > Mayank Shekhar Narula
>
>

RE: Re: [DISCUSS] KIP-951: Leader discovery optimisations for the client

2023-07-17 Thread Emanuele Sabellico
The downsides of bumping the version is that clients have to have all 
the latest features implemented before being able to benefit from this 
performance improvement.
One of the benefits of using a tagged field is to make the field 
available to previous versions too.

Choosing a minimum value for taggedVersions could be an alternative.

Emanuele

On 2023/07/13 17:30:45 Andrew Schofield wrote:
> Hi Mayank,
> If we bump the version, the broker can tell whether it’s worth 
providing the leader
> endpoint information to the client when the leader has changed. 
That’s my reasoning.

>
> Thanks,
> Andrew
>
> > On 13 Jul 2023, at 18:02, Mayank Shekhar Narula wrote:
> >
> > Thanks both for looking into this.
> >
> > Jose,
> >
> > 1/2 & 4(changes for PRODUCE) & 5 makes sense, will follow
> >
> > 3. If I understood this correctly, certain replicas "aren't" 
brokers, what

> > are they then?
> >
> > Also how about replacing "Replica" with "Leader", this is more 
readable on

> > the client. so, how about this?
> > { "name": "LeaderEndpoints", "type": "[]Leader", "versions": "15+",
> > "taggedVersions": "15+", "tag": 3,
> > "about": "Endpoints for all current leaders enumerated in
> > PartitionData.", "fields": [
> > { "name": "NodeId", "type": "int32", "versions": "15+",
> > "mapKey": true, "entityType": "brokerId", "about": "The ID of the
> > associated leader"},
> > { "name": "Host", "type": "string", "versions": "15+",
> > "about": "The leader's hostname." },
> > { "name": "Port", "type": "int32", "versions": "15+",
> > "about": "The leader's port." },
> > { "name": "Rack", "type": "string", "versions": "15+", "ignorable":
> > true, "default": "null",
> > "about": "The rack of the leader, or null if it has not been
> > assigned to a rack." }
> > ]}
> >
> > Andrew
> >
> > 6. I wonder if non-Kafka clients might benefit from not bumping the
> > version. If versions are bumped, say for FetchResponse to 16, I believe
> > that client would have to support all versions until 16 to fully 
utilise
> > this feature. Whereas, if not bumped, they can simply support until 
version
> > 12( will change to version:12 for tagged fields ), and non-AK 
clients can
> > then implement this feature. What do you think? I am inclined to 
not bump.

> >
> > On Thu, Jul 13, 2023 at 5:21 PM Andrew Schofield <
> > andrew_schofield_j...@outlook.com> wrote:
> >
> >> Hi José,
> >> Thanks. Sounds good.
> >>
> >> Andrew
> >>
> >>> On 13 Jul 2023, at 16:45, José Armando García Sancio
> >> wrote:
> >>>
> >>> Hi Andrew,
> >>>
> >>> On Thu, Jul 13, 2023 at 8:35 AM Andrew Schofield
> >>> wrote:
>  I have a question about José’s comment (2). I can see that it’s
> >> possible for multiple
>  partitions to change leadership to the same broker/node and it’s
> >> wasteful to repeat
>  all of the connection information for each topic-partition. But, I
> >> think it’s important to
>  know which partitions are now lead by which node. That 
information at

> >> least needs to be
>  per-partition I think. I may have misunderstood, but it sounded like
> >> your comment
>  suggestion lost that relationship.
> >>>
> >>> Each partition in both the FETCH response and the PRODUCE response
> >>> will have the CurrentLeader, the tuple leader id and leader epoch.
> >>> Clients can use this information to update their partition to leader
> >>> id and leader epoch mapping.
> >>>
> >>> They can also use the NodeEndpoints to update their mapping from
> >>> replica id to the tuple host, port and rack so that they can connect
> >>> to the correct node for future FETCH requests and PRODUCE requests.
> >>>
> >>> Thanks,
> >>> --
> >>> -José
> >>
> >>
> >
> > --
> > Regards,
> > Mayank Shekhar Narula
>
>