[jira] [Created] (KAFKA-14078) Replica fetches to follower should return NOT_LEADER error

2022-07-14 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14078:
---

 Summary: Replica fetches to follower should return NOT_LEADER error
 Key: KAFKA-14078
 URL: https://issues.apache.org/jira/browse/KAFKA-14078
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 3.3.0


After the fix for KAFKA-13837, if a follower receives a request from another 
replica, it will return UNKNOWN_LEADER_EPOCH even if the leader epoch matches. 
We need to do epoch leader/epoch validation first before we check whether we 
have a valid replica.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Apache Kafka 3.3.0 Release

2022-07-14 Thread Jason Gustafson
Hey Jose,

Thanks for volunteering to manage the release! KIP-833 is currently slotted
for 3.3. We've been getting some help from Jack Vanlighty to validate the
raft implementation in TLA+ and with frameworks like Jepsen. The
specification is written here if anyone is interested:
https://github.com/Vanlightly/raft-tlaplus/blob/main/specifications/pull-raft/KRaft.tla.
The main gap that this work uncovered in our implementation is documented
here: https://issues.apache.org/jira/browse/KAFKA-14077. I do believe that
KIP-833 depends on fixing this issue, so I wanted to see how you feel about
giving us a little more time to address it?

Thanks,
Jason

On Wed, Jul 13, 2022 at 10:01 AM Sagar  wrote:

> Hey Jose,
>
> Well actually I have 2 approved PRs from Kafka Connect:
>
> https://github.com/apache/kafka/pull/12321
> https://github.com/apache/kafka/pull/12309
>
> Not sure how to get these merged though but I think these can go into 3.3
> release.
>
> Thanks!
> Sagar.
>
>
> On Wed, Jul 13, 2022 at 5:03 PM Divij Vaidya 
> wrote:
>
> > Hey Jose
> >
> > A few of my PRs are pending review for quite some which I was hoping to
> > merge into 3.3. I have already marked them with "Fix version=3.3.0" so
> that
> > you can track them using the JIRA filter you shared earlier
> > <
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%203.3.0%20AND%20status%20not%20in%20(resolved%2C%20closed)%20ORDER%20BY%20priority%20DESC%2C%20status%20DESC%2C%20updated%20DESC%20%20%20%20%20%20
> > >
> > in this thread. Would you have some time to review them?
> >
> > Notable amongst them would be:
> > 1. Fix the rate window size calculation for edge cases -
> > https://github.com/apache/kafka/pull/12184
> > 2. Fix resource leaks - https://github.com/apache/kafka/pull/12228
> >
> > And the complete list would be at:
> >
> >
> https://github.com/search?q=is%3Aopen+is%3Apr+author%3Adivijvaidya+is%3Apr+repo%3Aapache%2Fkafka+created%3A2022-04-01..2022-07-30=Issues
> >
> >
> > --
> > Divij Vaidya
> >
> >
> >
> > On Mon, Jul 11, 2022 at 5:12 PM José Armando García Sancio
> >  wrote:
> >
> > > Hi all,
> > >
> > > I created the branch for 3.3
> > > (https://github.com/apache/kafka/tree/3.3). If you have bug fixes for
> > > the 3.3.0 release please make sure to cherry pick them to that branch.
> > >
> > > Thanks
> > >
> >
>


KAFKA-13572 Negative preferred replica imbalance metric

2022-07-14 Thread Haruki Okada
Hi, Kafka.

We found that the race in topic-deletion procedure could cause the
preferred replica imbalance metric to be negative.
The phenomenon can easily happen when many topics are deleted at once, and
since we use the metric for monitoring, we have to restart the controller
to fix the metric every time it happens.

I submitted a patch to fix it: https://github.com/apache/kafka/pull/12405
It'd be appreciated if anyone could review the PR.


Thanks,

-- 

Okada Haruki
ocadar...@gmail.com



[jira] [Created] (KAFKA-14077) KRaft should support recovery from failed disk

2022-07-14 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14077:
---

 Summary: KRaft should support recovery from failed disk
 Key: KAFKA-14077
 URL: https://issues.apache.org/jira/browse/KAFKA-14077
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
 Fix For: 3.3.0


If one of the nodes in the metadata quorum has a disk failure, there is no way 
currently to safely bring the node back into the quorum. When we lose disk 
state, we are at risk of losing committed data even if the failure only affects 
a minority of the cluster.

Here is an example. Suppose that a metadata quorum has 3 members: v1, v2, and 
v3. Initially, v1 is the leader and writes a record at offset 1. After v2 
acknowledges replication of the record, it becomes committed. Suppose that v1 
fails before v3 has a chance to replicate this record. As long as v1 remains 
down, the raft protocol guarantees that only v2 can become leader, so the 
record cannot be lost. The raft protocol expects that when v1 returns, it will 
still have that record, but what if there is a disk failure, the state cannot 
be recovered and v1 participates in leader election? Then we would have 
committed data on a minority of the voters. The main problem here concerns how 
we recover from this impaired state without risking the loss of this data.

Consider a naive solution which brings v1 back with an empty disk. Since the 
node has lost is prior knowledge of the state of the quorum, it will vote for 
any candidate that comes along. If v3 becomes a candidate, then it will vote 
for itself and it just needs the vote from v1 to become leader. If that 
happens, then the committed data on v2 will become lost.

This is just one scenario. In general, the invariants that the raft protocol is 
designed to preserve go out the window when disk state is lost. For example, it 
is also possible to contrive a scenario where the loss of disk state leads to 
multiple leaders. There is a good reason why raft requires that any vote cast 
by a voter is written to disk since otherwise the voter may vote for different 
candidates in the same epoch.

Many systems solve this problem with a unique identifier which is generated 
automatically and stored on disk. This identifier is then committed to the raft 
log. If a disk changes, we would see a new identifier and we can prevent the 
node from breaking raft invariants. Then recovery from a failed disk requires a 
quorum reconfiguration. We need something like this in KRaft to make disk 
recovery possible.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13846) Add an overloaded metricOrElseCreate function in Metrics

2022-07-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13846.
---
Fix Version/s: 3.3.0
   Resolution: Fixed

> Add an overloaded metricOrElseCreate function in Metrics
> 
>
> Key: KAFKA-13846
> URL: https://issues.apache.org/jira/browse/KAFKA-13846
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: newbie
> Fix For: 3.3.0
>
>
> The `Metrics` registry is often used by concurrent threads, however it's 
> get/create APIs are not well suited for it. A common pattern from the user 
> today is:
> {code}
> metric = metrics.metric(metricName);
> if (metric == null) {
>   try {
> metrics.createMetric(..)
>   } catch (IllegalArgumentException e){
> // another thread may create the metric at the mean time
>   }
> } 
> {code}
> Otherwise the caller would need to synchronize the whole block trying to get 
> the metric. However, the `createMetric` function call itself indeed 
> synchronize internally on updating the metric map.
> So we could consider adding a metricOrElseCreate function which is similar to 
> createMetric, but instead of throwing an illegal argument exception within 
> the internal synchronization block, it would just return the already existing 
> metric.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-14 Thread Guozhang Wang
Thanks Hector! Yes, making the templated group "type" with extensible
handling logic is part of the motivation of this rebalance protocol.


Guozhang

On Thu, Jul 14, 2022 at 10:35 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
hgerald...@bloomberg.net> wrote:

> Kudos David, Guozhang, and Jason for putting together such a great
> proposal.
>
> I don't want to hijack the discussion, just wanted to mention that it
> would be great if the final design is made extensible enough, so other use
> cases (like Kafka Connect, Schema Registry, etc.) can be added later on.
>
> I can see how the concept of different group "types" in the group
> coordinator can be leveraged to support such cases. On KIP-795, I wanted to
> add public APIs for the AbstractCoordinator with the intent of formalizing
> the use of the Group Membership Protocol for resource management use cases.
> I'll probably close this KIP and wait to see what comes out of this
> redesign of the protocol.
>
> Thanks
>
> -
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator
>
> From: dev@kafka.apache.org At: 07/06/22 04:44:59 UTC-4:00To:
> dev@kafka.apache.org
> Subject: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance
> Protocol
>
> Hi all,
>
> I would like to start a discussion thread on KIP-848: The Next
> Generation of the Consumer Rebalance Protocol. With this KIP, we aim
> to make the rebalance protocol (for consumers) more reliable, more
> scalable, easier to implement for clients, and easier to debug for
> operators.
>
> The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.
>
> Please take a look and let me know what you think.
>
> Best,
> David
>
> PS: I will be away from July 18th to August 8th. That gives you a bit
> of time to read and digest this long KIP.
>
>
>

-- 
-- Guozhang


[jira] [Created] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-14 Thread Jim Hughes (Jira)
Jim Hughes created KAFKA-14076:
--

 Summary: Fix issues with KafkaStreams.CloseOptions
 Key: KAFKA-14076
 URL: https://issues.apache.org/jira/browse/KAFKA-14076
 Project: Kafka
  Issue Type: Bug
Reporter: Jim Hughes


The new `close(CloseOptions)` function has a few bugs.  
([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]

Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14075) Consumer Group deletion does not delete pending transactional offset commits

2022-07-14 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-14075:


 Summary: Consumer Group deletion does not delete pending 
transactional offset commits
 Key: KAFKA-14075
 URL: https://issues.apache.org/jira/browse/KAFKA-14075
 Project: Kafka
  Issue Type: Bug
Reporter: Jeff Kim
Assignee: Jeff Kim


In 
[GroupMetadata.removeAllOffsets()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L729-L740]
 we pass in the offsets cache to delete pendingTransactionalOffsetCommits upon 
group deletion. So only transactional offset commits for topic partitions 
already in the offsets cache will be deleted.

However, we add a transactional offset commit to the offsets cache only after 
the commit/abort marker is written to the log in 
[GroupMetadata.completePendingTxnOffsetCommit()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L692]

So even after a group deletion we can still have pending transactional offset 
commits for a group that's supposed to be deleted. The group metadata manager 
will throw an IllegalStateException 
[here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L740]
 while loading group to memory. We will hit this exception on every load to 
group as long as the hanging transaction never completes. 

We should delete all pending transactional offset commits (instead of only 
topic partitions that exist in the offsets cache) when a group is deleted in 
GroupMetadata.removeOffsets()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re:[DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-14 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Kudos David, Guozhang, and Jason for putting together such a great proposal.

I don't want to hijack the discussion, just wanted to mention that it would be 
great if the final design is made extensible enough, so other use cases (like 
Kafka Connect, Schema Registry, etc.) can be added later on.

I can see how the concept of different group "types" in the group coordinator 
can be leveraged to support such cases. On KIP-795, I wanted to add public APIs 
for the AbstractCoordinator with the intent of formalizing the use of the Group 
Membership Protocol for resource management use cases. I'll probably close this 
KIP and wait to see what comes out of this redesign of the protocol.

Thanks

- 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator

From: dev@kafka.apache.org At: 07/06/22 04:44:59 UTC-4:00To:  
dev@kafka.apache.org
Subject: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance 
Protocol

Hi all,

I would like to start a discussion thread on KIP-848: The Next
Generation of the Consumer Rebalance Protocol. With this KIP, we aim
to make the rebalance protocol (for consumers) more reliable, more
scalable, easier to implement for clients, and easier to debug for
operators.

The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.

Please take a look and let me know what you think.

Best,
David

PS: I will be away from July 18th to August 8th. That gives you a bit
of time to read and digest this long KIP.




[GitHub] [kafka-site] mumrah merged pull request #423: Add a new signing key for David Arthur

2022-07-14 Thread GitBox


mumrah merged PR #423:
URL: https://github.com/apache/kafka-site/pull/423


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [DISCUSS] KIP-844: Transactional State Stores

2022-07-14 Thread Alexander Sorokoumov
Hi,

I updated the KIP with the following changes:
* Replaced in-memory batches with the secondary-store approach as the
default implementation to address the feedback about memory pressure as
suggested by Sagar and Bruno.
* Introduced StateStore#commit and StateStore#recover methods as an
extension of the rollback idea. @Guozhang, please see the comment below on
why I took a slightly different approach than you suggested.
* Removed mentions of changes to IQv1 and IQv2. Transactional state stores
enable reading committed in IQ, but it is really an independent feature
that deserves its own KIP. Conflating them unnecessarily increases the
scope for discussion, implementation, and testing in a single unit of work.

I also published a prototype - https://github.com/apache/kafka/pull/12393
that implements changes described in the proposal.

Regarding explicit rollback, I think it is a powerful idea that allows
other StateStore implementations to take a different path to the
transactional behavior rather than keep 2 state stores. Instead of
introducing a new commit token, I suggest using a changelog offset that
already 1:1 corresponds to the materialized state. This works nicely
because Kafka Stream first commits an AK transaction and only then
checkpoints the state store, so we can use the changelog offset to commit
the state store transaction.

I called the method StateStore#recover rather than StateStore#rollback
because a state store might either roll back or forward depending on the
specific point of the crash failure.Consider the write algorithm in Kafka
Streams is:
1. write stuff to the state store
2. producer.sendOffsetsToTransaction(token); producer.commitTransaction();
3. flush
4. checkpoint

Let's consider 3 cases:
1. If the crash failure happens between #2 and #3, the state store rolls
back and replays the uncommitted transaction from the changelog.
2. If the crash failure happens during #3, the state store can roll forward
and finish the flush/commit.
3. If the crash failure happens between #3 and #4, the state store should
do nothing during recovery and just proceed with the checkpoint.

Looking forward to your feedback,
Alexander

On Wed, Jun 8, 2022 at 12:16 AM Alexander Sorokoumov <
asorokou...@confluent.io> wrote:

> Hi,
>
> As a status update, I did the following changes to the KIP:
> * replaced configuration via the top-level config with configuration via
> Stores factory and StoreSuppliers,
> * added IQv2 and elaborated how readCommitted will work when the store is
> not transactional,
> * removed claims about ALOS.
>
> I am going to be OOO in the next couple of weeks and will resume working
> on the proposal and responding to the discussion in this thread starting
> June 27. My next top priorities are:
> 1. Prototype the rollback approach as suggested by Guozhang.
> 2. Replace in-memory batches with the secondary-store approach as the
> default implementation to address the feedback about memory pressure as
> suggested by Sagar and Bruno.
> 3. Adjust Stores methods to make transactional implementations pluggable.
> 4. Publish the POC for the first review.
>
> Best regards,
> Alex
>
> On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang  wrote:
>
>> Alex,
>>
>> Thanks for your replies! That is very helpful.
>>
>> Just to broaden our discussions a bit here, I think there are some other
>> approaches in parallel to the idea of "enforce to only persist upon
>> explicit flush" and I'd like to throw one here -- not really advocating
>> it,
>> but just for us to compare the pros and cons:
>>
>> 1) We let the StateStore's `flush` function to return a token instead of
>> returning `void`.
>> 2) We add another `rollback(token)` interface of StateStore which would
>> effectively rollback the state as indicated by the token to the snapshot
>> when the corresponding `flush` is called.
>> 3) We encode the token and commit as part of
>> `producer#sendOffsetsToTransaction`.
>>
>> Users could optionally implement the new functions, or they can just not
>> return the token at all and not implement the second function. Again, the
>> APIs are just for the sake of illustration, not feeling they are the most
>> natural :)
>>
>> Then the procedure would be:
>>
>> 1. the previous checkpointed offset is 100
>> ...
>> 3. flush store, make sure all writes are persisted; get the returned token
>> that indicates the snapshot of 200.
>> 4. producer.sendOffsetsToTransaction(token); producer.commitTransaction();
>> 5. Update the checkpoint file (say, the new value is 200).
>>
>> Then if there's a failure, say between 3/4, we would get the token from
>> the
>> last committed txn, and first we would do the restoration (which may get
>> the state to somewhere between 100 and 200), then call
>> `store.rollback(token)` to rollback to the snapshot of offset 100.
>>
>> The pros is that we would then not need to enforce the state stores to not
>> persist any data during the txn: for stores that may not be able to
>> implement the 

[jira] [Created] (KAFKA-14074) Restarting a broker during re-assignment can leave log directory entries

2022-07-14 Thread Adrian Preston (Jira)
Adrian Preston created KAFKA-14074:
--

 Summary: Restarting a broker during re-assignment can leave log 
directory entries
 Key: KAFKA-14074
 URL: https://issues.apache.org/jira/browse/KAFKA-14074
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.0, 2.8.0
Reporter: Adrian Preston


Re-starting a broker while replicas are being assigned away from the broker can 
result in topic partition directories being left in the broker’s log directory. 
This can trigger further problems if such a topic is deleted and re-created. 
These problems occur when replicas for the new topic are placed on a broker 
that hosts a “stale” topic partition directory of the same name, causing the 
on-disk topic partition state held by different brokers in the cluster to 
diverge.

We have also been able to re-produce variants this problem using Kafka 2.8 and 
3.1, as well as Kafka built from the head of the apache/kafka repository (at 
the time of writing this is commit: 94d4fdeb28b3cd4d474d943448a7ef653eaa145d). 
We have *not* being able to re-produce this problem with Kafka running in KRaft 
mode.

A minimal re-create for topic directories being left on disk is as follows:
 # Start ZooKeeper and a broker (both using the sample config)
 # Create 100 topics: each with 1 partition, and with replication factor 1
 # Add a second broker to the Kafka cluster (with minor edits to the sample 
config for: {{{}broker.id{}}}, {{{}listeners{}}}, and {{{}log.dirs{}}})
 # Issue a re-assignment that moves all of the topic partition replicas  from 
the first broker to the second broker
 # While this re-assignment is taking place shutdown the first broker (you need 
to be quick with only two brokers and 100 topics…)
 # Wait a few seconds for the re-assignment to stall
 # Restart the first broker and wait for the re-assignment to complete and it 
to remove any partially deleted topics (e.g. those with a “-delete” suffix).

Inspecting the logs directory for the first broker should show directories 
corresponding to topic partitions that are owned by the second broker. These 
are not cleaned up when the re-assignment completes, and also remain in the 
logs directory even if the first broker is restarted.  Deleting the topic also 
does not clean up the topic partitions left behind on the first broker - which 
leads to a second potential problem.

For topics that have more than one replica: a new topic that has the same name 
as a previously deleted topic might have replicas created on a broker with 
“stale” topic partition directories. If this happens these topics will remain 
in an under-replicated state.

A minimal re-create for this is as follows:
 # Create a three node Kafka cluster (backed by ZK) based off the sample config 
(to avoid confusion let’s call these kafka-0, kafka-1, and kafka-2)
 # Create 100 topics: each with 1 partition, and with replication factor 2
 # Submit a re-assignment to move all of the topic partition replicas to 
kafka-0 and kafka-1,  and wait for it to complete
 # Submit a re-assignment to move all of the topic partition replicas on 
kafka-0 to kafka-2.
 # While this re-assignment is taking place shutdown and re-start kafka-0.
 # Wait for the re-assignment to complete, and check that there’s unexpected 
topic partition directories in kafka-0’s logs directory
 # Delete all 100 topics, and re-create 100 new topics with the same name and 
configuration as the deleted topics.

In this state kafka-1 and kafka-2 continually generate log messages similar to:
{{[2022-07-14 13:07:49,118] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=0] Received INCONSISTENT_TOPIC_ID from the leader for partition 
test-039-0. This error may be returned transiently when the partition is being 
created or deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread)}}

Topics that have had replicas created on kafka-0 are under-replicated with 
kafka-0 missing from the ISR list. Performing a rolling restart of each broker 
in turn does not resolve the problem, in fact more partitions are listed as 
under-replicated, as before kafka-0 is missing from their ISR list.

I also tried to re-create this with Kafka running in Kraft mode, but was unable 
to do so. My test configuration was three brokers configured based on 
/config/kraft/server.properties. All three brokers were part of the controller 
quorum. Interestingly I see log lines like the following when re-starting the 
broker that I stopped mid-reassignment:

{{[2022-07-14 13:44:42,705] INFO Found stray log dir 
Log(dir=/tmp/kraft-2/test-029-0, topicId=DMGA3zxyQqGUfeV6cmkcmg, 
topic=test-029, partition=0, highWatermark=0, lastStableOffset=0, 
logStartOffset=0, logEndOffset=0): the current replica assignment [I@530d4c70 
does not contain the local brokerId 2. 
(kafka.server.metadata.BrokerMetadataPublisher$)}}

With later log lines showing the topic 

Re: [DISCUSS] Apache Kafka 3.2.1 release

2022-07-14 Thread Luke Chen
+1, Thanks David!

On Thu, Jul 14, 2022 at 1:16 PM David Jacot  wrote:

> +1. Thanks David.
>
> Le mer. 13 juil. 2022 à 23:43, José Armando García Sancio
>  a écrit :
>
> > +1. Thanks for volunteering David.
> >
> > --
> > -José
> >
>