Re: [VOTE] KIP-1037: Allow WriteTxnMarkers API with Alter Cluster Permission

2024-04-22 Thread Christo Lolov
Heya Nikhil,

Thanks for the proposal, as mentioned before it makes sense to me!

+1 (binding)

Best,
Christo

On Sat, 20 Apr 2024 at 00:25, Justine Olshan 
wrote:

> Hey Nikhil,
>
> I meant to comment on the discussion thread, but my draft took so long, you
> opened the vote.
>
> Regardless, I just wanted to say that it makes sense to me. +1 (binding)
>
> Justine
>
> On Fri, Apr 19, 2024 at 7:22 AM Nikhil Ramakrishnan <
> ramakrishnan.nik...@gmail.com> wrote:
>
> > Hi everyone,
> >
> > I would like to start a voting thread for KIP-1037: Allow
> > WriteTxnMarkers API with Alter Cluster Permission
> > (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1037%3A+Allow+WriteTxnMarkers+API+with+Alter+Cluster+Permission
> > )
> > as there have been no objections on the discussion thread.
> >
> > For comments or feedback please check the discussion thread here:
> > https://lists.apache.org/thread/bbkyt8mrc8xp3jfyvhph7oqtjxl29xmn
> >
> > Thanks,
> > Nikhil
> >
>


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Bruno Cadonna

Hi Damien,

Thanks a lot for the updates!

I have the following comments:

(1)
Could you rename ProcessingMetadata to ErrorHandlerContext or 
ErrorHandlerMetadata (I am preferring the former)? I think it makes it 
clearer for what this context/metadata is for.



(2)
Is there any reason you did not use something like

Record sourceRecord()

in ProcessingMetadata instead of sourceRawKey() and sourceRawValue() and 
headers()? The headers() method refers to the record read from the input 
topic of the sub-topology, right? If yes, maybe that is also something 
to mention more explicitly.



(3)
Since you added the processor node ID to the ProcessingMetadata, you can 
remove it from the signature of method handle() in 
ProcessingExceptionHandler.



(4)
Where are the mentioned changes to the DeserializationExceptionHandler?


(5)
To be consistent, the order of the parameters in the 
ProductionExceptionHandler should be:

1. context
2. record
3. exception


(6)
I am wondering where the implementation of ProcessingMetadata gets the 
sourceRawKey/Value from. Do we need additional changes in 
ProcessingContext and implementations?



Best,
Bruno


On 4/21/24 2:23 PM, Damien Gasparina wrote:

Hi Everyone,

Following the discussions on KIP-1033 and KIP-1034, we did a few changes:
   - We introduced a new ProcessingMetadata class containing only the
ProcessorContext metadata: topic, partition, offset, headers[],
sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName
   - To be consistent, we propose to deprecate the existing
DeserializationExceptionHandler and ProductionExceptionHandler methods
to rely on the new ProcessingMetadata
   - The creation and the ProcessingMetadata and the deprecation of old
methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on
Dead Letter Queue implementation without touching any interfaces. We
introduced a hard dependency for KIP-1034 regarding KIP-1033, we think
it's the wisest implementation wise.
- Instead of creating a new metric, KIP-1033 updates the
dropped-record metric.

Let me know what you think, if everything's fine, I think we should be
good to start a VOTE?

Cheers,
Damien





On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman  wrote:


Fully agree about creating a new class for the bits of ProcessingContext
that are specific to metadata only. In fact, more or less this same point
just came up in the related KIP 1034 for DLQs, since the RecordMetadata
can't always be trusted to remain immutable. Maybe it's possible to solve
both issues at once, with the same class?

On another related note -- I had actually also just proposed that we
deprecate the existing DeserializationExceptionHandler method and replace
it with one using the new PAPI as part of KIP-1034. But now that I'm
reading this, I would say it probably makes more sense to do in this KIP.
We can also push that out into a smaller-scoped third KIP if you want, but
clearly, there is some overlap here and so however you guys (the authors)
want to organize this part of the work is fine with me. I do think it
should be done alongside/before this KIP and 1034 though, for all the
reasons already stated.

Everything else in the discussion so far I agree with! The
ProcessingContext thing is the only open question in my mind

On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina 
wrote:


Hi Matthias, Bruno,

1.a During my previous comment, by Processor Node ID, I meant
Processor name. This is important information to expose in the handler
as it allows users to identify the location of the exception in the
topology.
I assume this information could be useful in other places, that's why
I would lean toward adding this as an attribute in the
ProcessingContext.

1.b Looking at the ProcessingContext, I do think the following 3
methods should not be accessible in the exception handlers:
getStateStore(), schedule() and commit().
Having a separate interface would make a cleaner signature. It would
also be a great time to ensure that all exception handlers are
consistent, at the moment, the
DeserializationExceptionHandler.handle() relies on the old PAPI
ProcessorContext and the ProductionExceptionHandler.handle() has none.
It could make sense to build the new interface in this KIP and track
the effort to migrate the existing handlers in a separate KIP, what do
you think?
Maybe I am overthinking this part and the ProcessingContext would be fine.

4. Good point regarding the dropped-record metric, as it is used by
the other handlers, I do think it makes sense to leverage it instead
of creating a new metric.
I will update the KIP to update the dropped-record-metric.

8. Regarding the DSL, I am aligned with Bruno, I think we could close
the gaps in a future KIP.

Cheers,
Damien


On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna  wrote:


Hi Matthias,


1.a
With processor node ID, I mean the ID that is exposed in the tags of
processor node metrics. That ID cannot be internal since it is exposed
in metrics. I think the processor nam

Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-22 Thread Andrew Schofield
Hi Jun,
Thanks for your comments.

120. Thanks. Fixed.

121. ShareUpdateValue.SnapshotEpoch indicates which snapshot
the update applies to. It should of course be the snapshot that precedes
it in the log. It’s just there to provide a consistency check.

I also noticed that ShareSnapshotValue was missing StateEpoch. It
isn’t any more.

122. In KIP-848, ConsumerGroupMemberMetadataValue includes
GroupEpoch, but in the code it does not. In fact, there is considerable
divergence between the KIP and the code for this record value schema
which I expect will be resolved when the migration code has been
completed.

123. The share group does not persist the target assignment.

124. Share groups have three kinds of record:
i) ShareGroupMetadata
  - this contains the group epoch and is written whenever the group
epoch changes.
ii) ShareGroupMemberMetadata
   - this does not contain the group epoch.
iii) ShareGroupPartitionMetadata
   - this currently contains the epoch, but I think that is unnecessary.
 For one thing, the ConsumerGroupPartitionMetadata definition
 contains the group epoch, but the value appears never to be set.
 David Jacot confirms that it’s not necessary and is removing it.

I have removed the Epoch from ShareGroupPartitionMetadata.
The only purpose of the persisting the epoch for a share group is so that
when a group coordinator takes over the share group, it is able to
continue the sequence of epochs. ShareGroupMetadataValue.Epoch
is used for this.

125. The group epoch will be incremented in this case and
consequently a ShareGroupMetadata will be written. KIP updated.

126. Not directly. A share group can only be deleted when it has no
members, so the tombstones for ShareGroupMemberMetadata will
have been written when the members left. I have clarified this.

127. The share coordinator is ignorant of the group epoch. When the
group coordinator is initializing the share-group state the first time that
a share-partition is being added to an assignment in the group, the
group epoch is used as the state epoch. But as the group epoch
increases over time, the share coordinator is entirely unaware.

When the first consumer for a share-partition fetches records from a
share-partition leader, the SPL calls the share coordinator to
ReadShareGroupState. If the SPL has previously read the information
and again it’s going from 0 to 1 consumer, it confirms it's up to date by
calling ReadShareGroupOffsetsState.

Even if many consumers are joining at the same time, any share-partition
which is being initialized will not be included in their assignments. Once
the initialization is complete, the next rebalance will assign the partition
to some consumers which will discover this by ShareGroupHeartbeat
response. And then, the fetching begins.

If an SPL receives a ShareFetch request before it’s read the state
from the SC, it can make the ShareFetch request wait up to MaxWaitMs
and then it can return an empty set of records if it’s still not ready.

So, I don’t believe there will be too much load. If a topic with many
partitions is added to the subscribed topics for a share group, the fact
that the assignments will only start to include the partitions as their
initialization completes should soften the impact.

128, 129: The “proper” way to turn on this feature when it’s finished will
be using `group.coordinator.rebalance.protocols` and `group.version`.
While it’s in Early Access and for test cases, the `group.share.enable`
configuration will turn it on.

I have described `group.share.enable` as an internal configuration in
the KIP.

130. The config `group.share.record.lock.duration.ms` applies to groups
which do not specify a group-level configuration for lock duration. The
minimum and maximum for this configuration are intended to give it
sensible bounds.

If a group does specify its own `group.share.record.lock.duration.ms`,
the broker-level `group.share.max.record.lock.duration.ms` gives the
cluster administrator a way of setting a maximum value for all groups.

While editing, I renamed `group.share.record.lock.duration.max.ms` to
`group.share.max.record.lock.duration.ms` for consistency with the
rest of the min/max configurations.

131. This is the limit per partition so you can go wider with multiple 
partitions.
I have set the initial value low for safety. I expect to be able to increase 
this
significantly when we have mature code which has been battle-tested.
Rather than try to guess how high it can safely go, I’ve erred on the side of
caution and expect to open it up in a future KIP.

132. Good catch. The problem is that I have missed two group configurations,
now added. These are group.share.session.timeout.ms and
group.share.heartbeat.timeout.ms . The configurations you mentioned
are the bounds for the group-level configurations.

133. The name `group.share.max.size` was chosen to mirror the existing
`group.consumer.max.size`.

134. It is intended to be a list of all of the valid assignors

[jira] [Resolved] (KAFKA-15736) KRaft support in PlaintextConsumerTest

2024-04-22 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez resolved KAFKA-15736.
--
Resolution: Done

> KRaft support in PlaintextConsumerTest
> --
>
> Key: KAFKA-15736
> URL: https://issues.apache.org/jira/browse/KAFKA-15736
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in PlaintextConsumerTest in 
> core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala need to 
> be updated to support KRaft
> 49 : def testHeaders(): Unit = {
> 136 : def testDeprecatedPollBlocksForAssignment(): Unit = {
> 144 : def testHeadersSerializerDeserializer(): Unit = {
> 153 : def testMaxPollRecords(): Unit = {
> 169 : def testMaxPollIntervalMs(): Unit = {
> 194 : def testMaxPollIntervalMsDelayInRevocation(): Unit = {
> 234 : def testMaxPollIntervalMsDelayInAssignment(): Unit = {
> 258 : def testAutoCommitOnClose(): Unit = {
> 281 : def testAutoCommitOnCloseAfterWakeup(): Unit = {
> 308 : def testAutoOffsetReset(): Unit = {
> 319 : def testGroupConsumption(): Unit = {
> 339 : def testPatternSubscription(): Unit = {
> 396 : def testSubsequentPatternSubscription(): Unit = {
> 447 : def testPatternUnsubscription(): Unit = {
> 473 : def testCommitMetadata(): Unit = {
> 494 : def testAsyncCommit(): Unit = {
> 513 : def testExpandingTopicSubscriptions(): Unit = {
> 527 : def testShrinkingTopicSubscriptions(): Unit = {
> 541 : def testPartitionsFor(): Unit = {
> 551 : def testPartitionsForAutoCreate(): Unit = {
> 560 : def testPartitionsForInvalidTopic(): Unit = {
> 566 : def testSeek(): Unit = {
> 621 : def testPositionAndCommit(): Unit = {
> 653 : def testPartitionPauseAndResume(): Unit = {
> 671 : def testFetchInvalidOffset(): Unit = {
> 696 : def testFetchOutOfRangeOffsetResetConfigEarliest(): Unit = {
> 717 : def testFetchOutOfRangeOffsetResetConfigLatest(): Unit = {
> 743 : def testFetchRecordLargerThanFetchMaxBytes(): Unit = {
> 772 : def testFetchHonoursFetchSizeIfLargeRecordNotFirst(): Unit = {
> 804 : def testFetchHonoursMaxPartitionFetchBytesIfLargeRecordNotFirst(): Unit 
> = {
> 811 : def testFetchRecordLargerThanMaxPartitionFetchBytes(): Unit = {
> 819 : def testLowMaxFetchSizeForRequestAndPartition(): Unit = {
> 867 : def testRoundRobinAssignment(): Unit = {
> 903 : def testMultiConsumerRoundRobinAssignor(): Unit = {
> 940 : def testMultiConsumerStickyAssignor(): Unit = {
> 986 : def testMultiConsumerDefaultAssignor(): Unit = {
> 1024 : def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
> 1109 : def testMultiConsumerDefaultAssignorAndVerifyAssignment(): Unit = {
> 1141 : def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
> 1146 : def testMultiConsumerSessionTimeoutOnClose(): Unit = {
> 1151 : def testInterceptors(): Unit = {
> 1210 : def testAutoCommitIntercept(): Unit = {
> 1260 : def testInterceptorsWithWrongKeyValue(): Unit = {
> 1286 : def testConsumeMessagesWithCreateTime(): Unit = {
> 1303 : def testConsumeMessagesWithLogAppendTime(): Unit = {
> 1331 : def testListTopics(): Unit = {
> 1351 : def testUnsubscribeTopic(): Unit = {
> 1367 : def testPauseStateNotPreservedByRebalance(): Unit = {
> 1388 : def testCommitSpecifiedOffsets(): Unit = {
> 1415 : def testAutoCommitOnRebalance(): Unit = {
> 1454 : def testPerPartitionLeadMetricsCleanUpWithSubscribe(): Unit = {
> 1493 : def testPerPartitionLagMetricsCleanUpWithSubscribe(): Unit = {
> 1533 : def testPerPartitionLeadMetricsCleanUpWithAssign(): Unit = {
> 1562 : def testPerPartitionLagMetricsCleanUpWithAssign(): Unit = {
> 1593 : def testPerPartitionLagMetricsWhenReadCommitted(): Unit = {
> 1616 : def testPerPartitionLeadWithMaxPollRecords(): Unit = {
> 1638 : def testPerPartitionLagWithMaxPollRecords(): Unit = {
> 1661 : def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = {
> 1809 : def testConsumingWithNullGroupId(): Unit = {
> 1874 : def testConsumingWithEmptyGroupId(): Unit = {
> 1923 : def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = 
> {
> Scanned 1951 lines. Found 0 KRaft tests out of 61 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16596) Flaky test – org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()

2024-04-22 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16596:
---

 Summary: Flaky test – 
org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()
 
 Key: KAFKA-16596
 URL: https://issues.apache.org/jira/browse/KAFKA-16596
 Project: Kafka
  Issue Type: Test
Reporter: Igor Soarez


org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()
 failed in the following way:

 
{code:java}
org.opentest4j.AssertionFailedError: Unexpected addresses [93.184.215.14, 
2606:2800:21f:cb07:6820:80da:af6b:8b2c] ==> expected:  but was:  
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) 
   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at 
app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)at 
app//org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup(ClientUtilsTest.java:65)
 {code}
As a result of the following assertions:

 
{code:java}
// With lookup of example.com, either one or two addresses are expected 
depending on
// whether ipv4 and ipv6 are enabled
List validatedAddresses = 
checkWithLookup(asList("example.com:1"));
assertTrue(validatedAddresses.size() >= 1, "Unexpected addresses " + 
validatedAddresses);
List validatedHostNames = 
validatedAddresses.stream().map(InetSocketAddress::getHostName)
.collect(Collectors.toList());
List expectedHostNames = asList("93.184.216.34", 
"2606:2800:220:1:248:1893:25c8:1946"); {code}
It seems that the DNS result has changed for example.com.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16597) Flaky test - org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads()

2024-04-22 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16597:
---

 Summary: Flaky test - 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads()
 Key: KAFKA-16597
 URL: https://issues.apache.org/jira/browse/KAFKA-16597
 Project: Kafka
  Issue Type: Test
  Components: streams
Reporter: Igor Soarez


org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads()
 failed with:
{code:java}
Error

org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The 
specified partition 1 for store source-table does not exist.

Stacktrace

org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The 
specified partition 1 for store source-table does not exist.   at 
app//org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:63)
at 
app//org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53)
 at 
app//org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads(StoreQueryIntegrationTest.java:411)
 {code}
 

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/2/tests/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16596) Flaky test – org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()

2024-04-22 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass resolved KAFKA-16596.
-
Fix Version/s: 3.8.0
 Assignee: Andras Katona
   Resolution: Fixed

> Flaky test – 
> org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()
>  
> ---
>
> Key: KAFKA-16596
> URL: https://issues.apache.org/jira/browse/KAFKA-16596
> Project: Kafka
>  Issue Type: Test
>Reporter: Igor Soarez
>Assignee: Andras Katona
>Priority: Major
>  Labels: GoodForNewContributors, good-first-issue
> Fix For: 3.8.0
>
>
> org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()
>  failed in the following way:
>  
> {code:java}
> org.opentest4j.AssertionFailedError: Unexpected addresses [93.184.215.14, 
> 2606:2800:21f:cb07:6820:80da:af6b:8b2c] ==> expected:  but was:  
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) 
> at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)  
>   at 
> app//org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup(ClientUtilsTest.java:65)
>  {code}
> As a result of the following assertions:
>  
> {code:java}
> // With lookup of example.com, either one or two addresses are expected 
> depending on
> // whether ipv4 and ipv6 are enabled
> List validatedAddresses = 
> checkWithLookup(asList("example.com:1"));
> assertTrue(validatedAddresses.size() >= 1, "Unexpected addresses " + 
> validatedAddresses);
> List validatedHostNames = 
> validatedAddresses.stream().map(InetSocketAddress::getHostName)
> .collect(Collectors.toList());
> List expectedHostNames = asList("93.184.216.34", 
> "2606:2800:220:1:248:1893:25c8:1946"); {code}
> It seems that the DNS result has changed for example.com.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-1037: Allow WriteTxnMarkers API with Alter Cluster Permission

2024-04-22 Thread Andrew Schofield
Hi Nikhil,
Thanks for the KIP. Looks good to me.

+1 (non-binding)

Thanks,
Andrew

> On 22 Apr 2024, at 09:17, Christo Lolov  wrote:
>
> Heya Nikhil,
>
> Thanks for the proposal, as mentioned before it makes sense to me!
>
> +1 (binding)
>
> Best,
> Christo
>
> On Sat, 20 Apr 2024 at 00:25, Justine Olshan 
> wrote:
>
>> Hey Nikhil,
>>
>> I meant to comment on the discussion thread, but my draft took so long, you
>> opened the vote.
>>
>> Regardless, I just wanted to say that it makes sense to me. +1 (binding)
>>
>> Justine
>>
>> On Fri, Apr 19, 2024 at 7:22 AM Nikhil Ramakrishnan <
>> ramakrishnan.nik...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> I would like to start a voting thread for KIP-1037: Allow
>>> WriteTxnMarkers API with Alter Cluster Permission
>>> (
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1037%3A+Allow+WriteTxnMarkers+API+with+Alter+Cluster+Permission
>>> )
>>> as there have been no objections on the discussion thread.
>>>
>>> For comments or feedback please check the discussion thread here:
>>> https://lists.apache.org/thread/bbkyt8mrc8xp3jfyvhph7oqtjxl29xmn
>>>
>>> Thanks,
>>> Nikhil
>>>
>>



Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-22 Thread Lucas Brutschy
Hi!

Thanks for the KIP, great stuff.

L1. I was a bit confused that the default configuration (once you set
a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood
correctly. Is this something that will be a common use-case, and is it
a configuration that we want to encourage? It expected that you either
want to fail or skip-and-send-to-DLQ.

L2. Have you considered extending the `TimestampExtractor` interface
so that it can also produce to DLQ? AFAIK it's not covered by any of
the existing exception handlers, but it can cause similar failures
(potentially custom logic, depends on validity input record). There
could also be a default implementation as a subclass of
`ExtractRecordMetadataTimestamp`.

L3. It would be nice to include an example of how to produce to
multiple topics in the KIP, as I can imagine that this will be a
common use-case. I wasn't sure how much code would be involved to make
it work. If a lot of code is required, we may want to consider
exposing some utils that make it easier.

Cheers,
Lucas

On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina  wrote:
>
> Hi everyone,
>
> Following all the discussion on this KIP and KIP-1033, we introduced a
> new container class containing only processing context metadata:
> ProcessingMetadata. This new container class is actually part of
> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
> think it's the wisest implementation wise.
>
> I also clarified the interface of the enums:
> withDeadLetterQueueRecords(Iterable byte[]>> deadLetterQueueRecords) . Very likely most users would just
> send one DLQ record, but there might be specific use-cases and what
> can do more can do less, so I added an Iterable.
>
> I took some time to think about the impact of storing the
> ProcessingMetadata on the ProductionExceptionHandler. I think storing
> the topic/offset/partition should be fine, but I am concerned about
> storing the rawSourceKey/Value. I think it could impact some specific
> use-cases, for example, a high-throughput Kafka Streams application
> "counting" messages could have huge source input messages, and very
> small sink messages, here, I assume storing the rawSourceKey/Value
> could significantly require more memory than the actual Kafka Producer
> buffer.
>
> I think the safest approach is actually to only store the fixed-size
> metadata for the ProductionExceptionHandler.handle:
> topic/partition/offset/processorNodeId/taskId, it might be confusing
> for the user, but 1) it is still better than nowaday where there are
> no context information at all, 2) it would be clearly stated in the
> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
> punctuate case). .
>
> Do you think it would be a suitable design Sophie?
>
> Cheers,
> Damien
>
> On Sun, 14 Apr 2024 at 21:30, Loic Greffier  
> wrote:
> >
> > Hi Sophie,
> >
> > Thanks for your feedback.
> > Completing the Damien's comments here for points S1 and S5B.
> >
> > S1:
> > > I'm confused -- are you saying that we're introducing a new kind of 
> > > ProducerRecord class for this?
> >
> > I am wondering if it makes sense to alter the ProducerRecord from Clients 
> > API with a "deadLetterQueueTopicName" attribute dedicated to Kafka Streams 
> > DLQ.
> > Adding "deadLetterQueueTopicName" as an additional parameter to 
> > "withDeadLetterQueueRecord" is a good option, and may allow users to send 
> > records to different DLQ topics depending on conditions:
> > @Override
> > public ProductionExceptionHandlerResponse handle(final ProcessingContext 
> > context,
> >  ProducerRecord > byte[]> record,
> >  Exception exception) {
> > if (condition1) {
> > return ProductionExceptionHandlerResponse.CONTINUE
> >.withDeadLetterQueueRecord(record, "dlq-topic-a");
> > }
> > if (condition2) {
> > return ProductionExceptionHandlerResponse.CONTINUE
> > .withDeadLetterQueueRecord(record, "dlq-topic-b");
> > }
> > return ProductionExceptionHandlerResponse.CONTINUE
> > .withDeadLetterQueueRecord(record, "dlq-topic-c");
> > }
> >
> > S5B:
> > > I was having a bit of trouble understanding what the behavior would be if 
> > > someone configured a "errors.deadletterqueue.topic.name" but didn't 
> > > implement the handlers.
> >
> > The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler and 
> > DefaultProductionExceptionHandler should be able to tell if records should 
> > be sent to DLQ or not.
> > The "errors.deadletterqueue.topic.name" takes place to:
> >
> >   *   Specifying if the provided handlers should or should not send records 
> > to DLQ.
> >  *   If the value is empty, the handlers should not send records to DLQ.
> >  *   If the value is not empty, the handlers should send records to DLQ.
> >   *   Define the name of the DLQ topic that should be used by the provided 
> > 

[jira] [Resolved] (KAFKA-16549) suppress the warnings from RemoteLogManager

2024-04-22 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16549.

Fix Version/s: 3.8.0
   Resolution: Fixed

> suppress the warnings from RemoteLogManager
> ---
>
> Key: KAFKA-16549
> URL: https://issues.apache.org/jira/browse/KAFKA-16549
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: charliecheng
>Priority: Trivial
> Fix For: 3.8.0
>
>
> {quote}
> /home/chia7712/project/kafka/core/src/main/java/kafka/log/remote/RemoteLogManager.java:234:
>  warning: [removal] AccessController in java.security has been deprecated and 
> marked for removal
> return java.security.AccessController.doPrivileged(new 
> PrivilegedAction() {
> ^
> /home/chia7712/project/kafka/core/src/main/java/kafka/log/remote/RemoteLogManager.java:256:
>  warning: [removal] AccessController in java.security has been deprecated and 
> marked for removal
> return java.security.AccessController.doPrivileged(new 
> PrivilegedAction() {
> {quote}
> we should add @SuppressWarnings("removal") to those methods



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-22 Thread Alieh Saeedi
Thank you all for the feedback!

Addressing the main concern: The KIP is about giving the user the ability
to handle producer exceptions, but to be more conservative and avoid future
issues, we decided to be limited to a short list of exceptions. I included
*RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open to
suggestion for adding some more ;-)

KIP Updates:
- clarified the way that the user should configure the Producer to use the
custom handler. I think adding a producer config property is the cleanest
one.
- changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to be
closer to what we are changing.
- added the ProducerRecord as the input parameter of the handle() method as
well.
- increased the response types to 3 to have fail and two types of continue.
- The default behaviour is having no custom handler, having the
corresponding config parameter set to null. Therefore, the KIP provides no
default implementation of the interface.
- We follow the interface solution as described in the
Rejected Alternetives section.


Cheers,
Alieh


On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax  wrote:

> Thanks for the KIP Alieh! It addresses an important case for error
> handling.
>
> I agree that using this handler would be an expert API, as mentioned by
> a few people. But I don't think it would be a reason to not add it. It's
> always a tricky tradeoff what to expose to users and to avoid foot guns,
> but we added similar handlers to Kafka Streams, and have good experience
> with it. Hence, I understand, but don't share the concern raised.
>
> I also agree that there is some responsibility by the user to understand
> how such a handler should be implemented to not drop data by accident.
> But it seem unavoidable and acceptable.
>
> While I understand that a "simpler / reduced" API (eg via configs) might
> also work, I personally prefer a full handler. Configs have the same
> issue that they could be miss-used potentially leading to incorrectly
> dropped data, but at the same time are less flexible (and thus maybe
> ever harder to use correctly...?). Base on my experience, there is also
> often weird corner case for which it make sense to also drop records for
> other exceptions, and a full handler has the advantage of full
> flexibility and "absolute power!".
>
> To be fair: I don't know the exact code paths of the producer in
> details, so please keep me honest. But my understanding is, that the KIP
> aims to allow users to react to internal exception, and decide to keep
> retrying internally, swallow the error and drop the record, or raise the
> error?
>
> Maybe the KIP would need to be a little bit more precises what error we
> want to cover -- I don't think this list must be exhaustive, as we can
> always do follow up KIP to also apply the handler to other errors to
> expand the scope of the handler. The KIP does mention examples, but it
> might be good to explicitly state for what cases the handler gets applied?
>
> I am also not sure if CONTINUE and FAIL are enough options? Don't we
> need three options? Or would `CONTINUE` have different meaning depending
> on the type of error? Ie, for a retryable error `CONTINUE` would mean
> keep retrying internally, but for a non-retryable error `CONTINUE` means
> swallow the error and drop the record? This semantic overload seems
> tricky to reason about by users, so it might better to split `CONTINUE`
> into two cases -> `RETRY` and `SWALLOW` (or some better names).
>
> Additionally, should we just ship a `DefaultClientExceptionHandler`
> which would return `FAIL`, for backward compatibility. Or don't have any
> default handler to begin with and allow it to be `null`? I don't see the
> need for a specific `TransactionExceptionHandler`. To me, the goal
> should be to not modify the default behavior at all, but to just allow
> users to change the default behavior if there is a need.
>
> What is missing on the KIP though it, how the handler is passed into the
> producer thought? Would we need a new config which allows to set a
> custom handler? And/or would we allow to pass in an instance via the
> constructor or add a new method to set a handler?
>
>
> -Matthias
>
> On 4/18/24 10:02 AM, Andrew Schofield wrote:
> > Hi Alieh,
> > Thanks for the KIP.
> >
> > Exception handling in the Kafka producer and consumer is really not
> ideal.
> > It’s even harder working out what’s going on with the consumer.
> >
> > I’m a bit nervous about this KIP and I agree with Chris that it could do
> with additional
> > motivation. This would be an expert-level interface given how complicated
> > the exception handling for Kafka has become.
> >
> > 7. The application is not really aware of the batching being done on its
> behalf.
> > The ProduceResponse can actually return an array of records which failed
> > per batch. If you get RecordTooLargeException, and want to retry, you
> probably
> > need to remove the offending records from the batch and retry it. T

[jira] [Created] (KAFKA-16598) Mirgrate `ResetConsumerGroupOffsetTest` to new test infra

2024-04-22 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16598:
--

 Summary: Mirgrate `ResetConsumerGroupOffsetTest` to new test infra
 Key: KAFKA-16598
 URL: https://issues.apache.org/jira/browse/KAFKA-16598
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


as title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Damien Gasparina
Hi Bruno,

1) Good point, naming stuff is definitely hard, I renamed
ProcessingMetadata to ErrorHandlerContext

> Is there any reason you did not use something like
> Record sourceRecord()

2) The record class is used in many locations and, I assume, could be
expanded by new features.
As the error handler metadata could be stored in memory for a longer
duration due to the ProductionExceptionHandle, I think it is wiser to
have an independent class that is the leanest possible.
Maybe I am overthinking this, what do you think?

3) Oops, good point, I didn't notice that it was still there. I
removed the parameter.

4) To clarify the KIP, I specified the DeserializationExceptionHandler
interface in the KIP.

5) Good point, I updated the KIP with the order you mentioned:
context, record, exception

6) I was indeed thinking of extending the ProcessingContext to store
it there. Let me update the KIP to make it clear.

>From past experience, deprecating an interface is always a bit
painful, in this KIP, I relied on default implementation to ensure
backward compatibility while encouraging people to implement the new
method signature. If you know a better approach, I'll take :-)

Cheers,
Damien

On Mon, 22 Apr 2024 at 11:01, Bruno Cadonna  wrote:
>
> Hi Damien,
>
> Thanks a lot for the updates!
>
> I have the following comments:
>
> (1)
> Could you rename ProcessingMetadata to ErrorHandlerContext or
> ErrorHandlerMetadata (I am preferring the former)? I think it makes it
> clearer for what this context/metadata is for.
>
>
> (2)
> Is there any reason you did not use something like
>
> Record sourceRecord()
>
> in ProcessingMetadata instead of sourceRawKey() and sourceRawValue() and
> headers()? The headers() method refers to the record read from the input
> topic of the sub-topology, right? If yes, maybe that is also something
> to mention more explicitly.
>
>
> (3)
> Since you added the processor node ID to the ProcessingMetadata, you can
> remove it from the signature of method handle() in
> ProcessingExceptionHandler.
>
>
> (4)
> Where are the mentioned changes to the DeserializationExceptionHandler?
>
>
> (5)
> To be consistent, the order of the parameters in the
> ProductionExceptionHandler should be:
> 1. context
> 2. record
> 3. exception
>
>
> (6)
> I am wondering where the implementation of ProcessingMetadata gets the
> sourceRawKey/Value from. Do we need additional changes in
> ProcessingContext and implementations?
>
>
> Best,
> Bruno
>
>
> On 4/21/24 2:23 PM, Damien Gasparina wrote:
> > Hi Everyone,
> >
> > Following the discussions on KIP-1033 and KIP-1034, we did a few changes:
> >- We introduced a new ProcessingMetadata class containing only the
> > ProcessorContext metadata: topic, partition, offset, headers[],
> > sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName
> >- To be consistent, we propose to deprecate the existing
> > DeserializationExceptionHandler and ProductionExceptionHandler methods
> > to rely on the new ProcessingMetadata
> >- The creation and the ProcessingMetadata and the deprecation of old
> > methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on
> > Dead Letter Queue implementation without touching any interfaces. We
> > introduced a hard dependency for KIP-1034 regarding KIP-1033, we think
> > it's the wisest implementation wise.
> > - Instead of creating a new metric, KIP-1033 updates the
> > dropped-record metric.
> >
> > Let me know what you think, if everything's fine, I think we should be
> > good to start a VOTE?
> >
> > Cheers,
> > Damien
> >
> >
> >
> >
> >
> > On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman  
> > wrote:
> >>
> >> Fully agree about creating a new class for the bits of ProcessingContext
> >> that are specific to metadata only. In fact, more or less this same point
> >> just came up in the related KIP 1034 for DLQs, since the RecordMetadata
> >> can't always be trusted to remain immutable. Maybe it's possible to solve
> >> both issues at once, with the same class?
> >>
> >> On another related note -- I had actually also just proposed that we
> >> deprecate the existing DeserializationExceptionHandler method and replace
> >> it with one using the new PAPI as part of KIP-1034. But now that I'm
> >> reading this, I would say it probably makes more sense to do in this KIP.
> >> We can also push that out into a smaller-scoped third KIP if you want, but
> >> clearly, there is some overlap here and so however you guys (the authors)
> >> want to organize this part of the work is fine with me. I do think it
> >> should be done alongside/before this KIP and 1034 though, for all the
> >> reasons already stated.
> >>
> >> Everything else in the discussion so far I agree with! The
> >> ProcessingContext thing is the only open question in my mind
> >>
> >> On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina 
> >> wrote:
> >>
> >>> Hi Matthias, Bruno,
> >>>
> >>> 1.a During my previous comment, by Processor Node ID, I meant
> >>> Process

[jira] [Created] (KAFKA-16599) Always await async commit callbacks in commitSync and close

2024-04-22 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-16599:
--

 Summary: Always await async commit callbacks in commitSync and 
close
 Key: KAFKA-16599
 URL: https://issues.apache.org/jira/browse/KAFKA-16599
 Project: Kafka
  Issue Type: Task
Reporter: Lucas Brutschy






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16103) Review client logic for triggering offset commit callbacks

2024-04-22 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy resolved KAFKA-16103.

Resolution: Fixed

> Review client logic for triggering offset commit callbacks
> --
>
> Key: KAFKA-16103
> URL: https://issues.apache.org/jira/browse/KAFKA-16103
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: kip-848-client-support, offset
> Fix For: 3.8.0
>
>
> Review logic for triggering commit callbacks, ensuring that all callbacks are 
> triggered before returning from commitSync



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-853: KRaft Controller Membership Changes

2024-04-22 Thread Jason Gustafson
Thanks Jose. +1. Great KIP!

On Fri, Mar 29, 2024 at 11:16 AM Jun Rao  wrote:

> Hi, Jose,
>
> Thanks for the KIP. +1
>
> Jun
>
> On Fri, Mar 29, 2024 at 9:55 AM José Armando García Sancio
>  wrote:
>
> > Hi all,
> >
> > I would like to call a vote to adopt KIP-853.
> >
> > KIP: https://cwiki.apache.org/confluence/x/nyH1D
> > Discussion thread:
> > https://lists.apache.org/thread/6o3sjvcb8dx1ozqfpltb7p0w76b4nd46
> >
> > Thanks,
> > --
> > -José
> >
>


Re: [VOTE] KIP-853: KRaft Controller Membership Changes

2024-04-22 Thread José Armando García Sancio
+1 binding.

On Mon, Apr 22, 2024 at 9:28 AM Jason Gustafson
 wrote:
>
> Thanks Jose. +1. Great KIP!
>
> On Fri, Mar 29, 2024 at 11:16 AM Jun Rao  wrote:
>
> > Hi, Jose,
> >
> > Thanks for the KIP. +1
> >
> > Jun
> >
> > On Fri, Mar 29, 2024 at 9:55 AM José Armando García Sancio
> >  wrote:
> >
> > > Hi all,
> > >
> > > I would like to call a vote to adopt KIP-853.
> > >
> > > KIP: https://cwiki.apache.org/confluence/x/nyH1D
> > > Discussion thread:
> > > https://lists.apache.org/thread/6o3sjvcb8dx1ozqfpltb7p0w76b4nd46
> > >
> > > Thanks,
> > > --
> > > -José
> > >
> >



-- 
-José


Re: [DISCUSS] KIP-1028: Docker Official Image for Apache Kafka

2024-04-22 Thread Vedarth Sharma
Hey folks,

Thanks a lot for reviewing the KIP and providing feedback.
The discussion thread seems resolved and KIP has been updated accordingly.
We will be starting the voting thread for this KIP in the next few days.
Please take a look at the KIP and let us know if any further discussion
is needed.

Thanks and regards,
Vedarth

On Fri, Apr 19, 2024 at 1:33 PM Manikumar  wrote:

> Thanks Krish. KIP looks good to me.
>
> On Wed, Apr 17, 2024 at 1:38 PM Krish Vora  wrote:
> >
> > Hi Manikumar,
> >
> > Thanks for the comments.
> >
> > Maybe as part of the release process, RM can create a JIRA for this
> > > task. This can be taken by RM or any comitter or any contributor (with
> > > some help from commiters to run "Docker Image Preparation via GitHub
> > > Actions:"
> >
> > This sounds like a good idea. This step would be beneficial. By creating
> a
> > JIRA ticket, it will also serve as a reminder to complete the
> post-release
> > steps for the Docker official images. Have updated the KIP with this
> step.
> >
> > Is this using GitHub Actions workflow? or manual testing?
> >
> > This will be done by a Github Actions workflow, which will test the
> static
> > Docker Official Image assets for a specific release version.
> >
> > Is it mandatory for RM/comitters to raise the PR to Docker Hub’s
> > > official images repository (or) can it be done by any contributor.
> >
> > I believe that it can be done by any contributor (ref: This link
> > 
> > quotes "*Anyone can provide feedback, contribute code, suggest process
> > changes, or even propose a new Official Image.*")
> >
> > Also I was thinking, once the KIP gets voted, we should try to release
> > > kafka:3.7.0 (or 3.7.1) Docker Official image. This will help us to
> > > validate the process and allow us to fix any changes suggested by
> > > Dockerhub before the 3.8.0 release.
> >
> > This sounds like a great idea. This KIP proposes release of DOI as a
> > post-release process, which can be done anytime post release. Since 3.7.0
> > is already released, we can perform these steps for that release too. By
> > the time the KIP gets implemented, if 3.7.1 is released, we could do
> these
> > steps for 3.7.1, instead of 3.7.0. This would allow us to make changes to
> > the Dockerfiles and other assets based on feedback from Docker Hub before
> > the release of version 3.8.0.
> >
> > Thanks,
> > Krish.
> >
> > On Mon, Apr 15, 2024 at 12:59 PM Manikumar 
> > wrote:
> >
> > > Hi Krish,
> > >
> > > Thanks for the updated KIP. a few comments below.
> > >
> > > > "These actions can be carried out by the RM or any contributor post
> the
> > > release process."
> > > Maybe as part of the release process, RM can create a JIRA for this
> > > task. This can be taken by RM or any comitter or any contributor (with
> > > some help from commiters to run "Docker Image Preparation via GitHub
> > > Actions:"
> > >
> > > > "Perform Docker build tests to ensure image integrity"
> > > Is this using GitHub Actions workflow? or manual testing?
> > >
> > > > "The RM will manually raise the final PR to Docker Hub’s official
> images
> > > repository using the contents of the generated file"
> > >  Is it mandatory for RM/comitters to raise the PR to Docker Hub’s
> > > official images repository (or) can it be done by any contributor.
> > >
> > > Also I was thinking, once the KIP gets voted, we should try to release
> > > kafka:3.7.0 (or 3.7.1) Docker Official image. This will help us to
> > > validate the process and allow us to fix any changes suggested by
> > > Dockerhub before the 3.8.0 release.
> > >
> > >
> > > Thanks,
> > >
> > > On Mon, Apr 8, 2024 at 2:33 PM Krish Vora 
> wrote:
> > > >
> > > > Hi Manikumar and Luke.
> > > > Thanks for the questions.
> > > >
> > > > 1. No, the Docker inventory files and configurations will not be the
> same
> > > > for Open Source Software (OSS) Images and Docker Official Images
> (DOI).
> > > >
> > > > For OSS images, the Dockerfile located in docker/jvm/dockerfile is
> > > > utilized. This process is integrated with the existing release
> pipeline
> > > as
> > > > outlined in KIP-975
> > > > <
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-Status
> > > >,
> > > > where the Kafka URL is provided as a build argument. This method
> allows
> > > for
> > > > building, testing, and releasing OSS images dynamically. The OSS
> images
> > > > will continue to be released under the standard release process .
> > > >
> > > > In contrast, the release process for DOIs requires providing the
> Docker
> > > Hub
> > > > team with a specific directory for each version release that
> contains a
> > > > standalone Dockerfile. These Dockerfiles are designed to be
> > > > self-sufficient, hence require hardcoded values instead of relying on
> > > build
> > > > arguments. To accommodate this, in our proposed appro

Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-22 Thread Abhijeet Kumar
Hi Luke,

Thanks for your comments. Please find my responses inline.

On Tue, Apr 9, 2024 at 2:08 PM Luke Chen  wrote:

> Hi Abhijeet,
>
> Thanks for the KIP to improve the tiered storage feature!
>
> Questions:
> 1. We could also get the "pending-upload-offset" and epoch via remote log
> metadata, instead of adding a new API to fetch from the leader. Could you
> explain why you choose the later approach, instead of the former?
>

The remote log metadata could be tracking overlapping log segments. The
maximum offset
across the log segments it may be tracking, may not be the
last-tiered-offset because of truncations
during unclean leader election. Remote Log metadata alone is not sufficient
to identify last-tiered-offset or
pending-upload-offset.

Only the leader knows the correct lineage of offsets that is required to
identify the "pending-upload-offset".
That is the reason we chose to add a new API to fetch this information from
the leader.


2.
> > We plan to have a follow-up KIP that will address both the
> deprioritization
> of these brokers from leadership, as well as
> for consumption (when fetching from followers is allowed).
>
> I agree with Jun that we might need to make it clear all possible drawbacks
> that could have. So, could we add the drawbacks that Jun mentioned about
> the performance issue when consumer fetch from follower?
>
>
Updated the KIP to call this out.


> 3. Could we add "Rejected Alternatives" section to the end of the KIP to
> add some of them?
> Like the "ListOffsetRequest" approach VS "Earliest-Pending-Upload-Offset"
> approach, or getting the "Earliest-Pending-Upload-Offset" from remote log
> metadata... etc.
>
> Added the section on Rejected Alternatives


> Thanks.
> Luke
>
>
> On Tue, Apr 9, 2024 at 2:25 PM Abhijeet Kumar 
> wrote:
>
> > Hi Christo,
> >
> > Please find my comments inline.
> >
> > On Fri, Apr 5, 2024 at 12:36 PM Christo Lolov 
> > wrote:
> >
> > > Hello Abhijeet and Jun,
> > >
> > > I have been mulling this KIP over a bit more in recent days!
> > >
> > > re: Jun
> > >
> > > I wasn't aware we apply 2.1 and 2.2 for reserving new timestamps - in
> > > retrospect it should have been fairly obvious. I would need to go an
> > update
> > > KIP-1005 myself then, thank you for giving the useful reference!
> > >
> > > 4. I think Abhijeet wants to rebuild state from latest-tiered-offset
> and
> > > fetch from latest-tiered-offset + 1 only for new replicas (or replicas
> > > which experienced a disk failure) to decrease the time a partition
> spends
> > > in under-replicated state. In other words, a follower which has just
> > fallen
> > > out of ISR, but has local data will continue using today's Tiered
> Storage
> > > replication protocol (i.e. fetching from earliest-local). I further
> > believe
> > > he has taken this approach so that local state of replicas which have
> > just
> > > fallen out of ISR isn't forcefully wiped thus leading to situation 1.
> > > Abhijeet, have I understood (and summarised) what you are proposing
> > > correctly?
> > >
> > > Yes, your understanding is correct. We want to limit the behavior
> changes
> > only to new replicas.
> >
> >
> > > 5. I think in today's Tiered Storage we know the leader's
> > log-start-offset
> > > from the FetchResponse and we can learn its local-log-start-offset from
> > the
> > > ListOffsets by asking for earliest-local timestamp (-4). But granted,
> > this
> > > ought to be added as an additional API call in the KIP.
> > >
> > >
> > Yes, I clarified this in my reply to Jun. I will add this missing detail
> in
> > the KIP.
> >
> >
> > > re: Abhijeet
> > >
> > > 101. I am still a bit confused as to why you want to include a new
> offset
> > > (i.e. pending-upload-offset) when you yourself mention that you could
> use
> > > an already existing offset (i.e. last-tiered-offset + 1). In essence,
> you
> > > end your Motivation with "In this KIP, we will focus only on the
> follower
> > > fetch protocol using the *last-tiered-offset*" and then in the
> following
> > > sections you talk about pending-upload-offset. I understand this might
> be
> > > classified as an implementation detail, but if you introduce a new
> offset
> > > (i.e. pending-upload-offset) you have to make a change to the
> ListOffsets
> > > API (i.e. introduce -6) and thus document it in this KIP as such.
> > However,
> > > the last-tiered-offset ought to already be exposed as part of KIP-1005
> > > (under implementation). Am I misunderstanding something here?
> > >
> >
> > I have tried to clarify this in my reply to Jun.
> >
> > > The follower needs to build the local data starting from the offset
> > > Earliest-Pending-Upload-Offset. Hence it needs the offset and the
> > > corresponding leader-epoch.
> > > There are two ways to do this:
> > >1. We add support in ListOffsetRequest to be able to fetch this
> offset
> > > (and leader epoch) from the leader.
> > >2. Or, fetch the tiered-offset (which is already supported). From
> this
> >

Re: [VOTE] KIP-853: KRaft Controller Membership Changes

2024-04-22 Thread José Armando García Sancio
I am going to close the vote tomorrow morning (PST).

On Mon, Apr 22, 2024 at 10:06 AM José Armando García Sancio
 wrote:
>
> +1 binding.
>
> On Mon, Apr 22, 2024 at 9:28 AM Jason Gustafson
>  wrote:
> >
> > Thanks Jose. +1. Great KIP!
> >
> > On Fri, Mar 29, 2024 at 11:16 AM Jun Rao  wrote:
> >
> > > Hi, Jose,
> > >
> > > Thanks for the KIP. +1
> > >
> > > Jun
> > >
> > > On Fri, Mar 29, 2024 at 9:55 AM José Armando García Sancio
> > >  wrote:
> > >
> > > > Hi all,
> > > >
> > > > I would like to call a vote to adopt KIP-853.
> > > >
> > > > KIP: https://cwiki.apache.org/confluence/x/nyH1D
> > > > Discussion thread:
> > > > https://lists.apache.org/thread/6o3sjvcb8dx1ozqfpltb7p0w76b4nd46
> > > >
> > > > Thanks,
> > > > --
> > > > -José
> > > >
> > >
>
>
>
> --
> -José



-- 
-José


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-22 Thread Jun Rao
Hi, Andrew,

Thanks for the reply.

123. "The share group does not persist the target assignment."
  What's the impact of this? Everytime that GC fails over, it needs to
recompute the assignment for every member. Do we expect the member
assignment to change on every GC failover?

125. Should the GC also write ShareGroupPartitionMetadata?

127. So, group epoch is only propagated to SC when
InitializeShareGroupState request is sent. This sounds good.

130. Should we have a group.share.min.record.lock.duration.ms to pair with
group.share.max.record.lock.duration.ms?

131. Sounds good. The name group.share.record.lock.partition.limit doesn't
seem very intuitive. How about something
like group.share.partition.max.records.pending.ack?

136. Could we describe the process of GC failover? I guess it needs to
compute member reassignment and check if there is any new topic/partition
matching the share subscription. Does it bump up the group epoch?

137. Metrics:
137.1 It would be useful to document who reports each metric. Is it any
broker, GC, SC or SPL?
137.2 partition-load-time: Is that the loading time at SPL or SC?
137.3 "The time taken in milliseconds to load the share-group state from
the share-group state partitions loaded in the last 30 seconds."
  The window depends on metrics.num.samples and metrics.sample.window.ms
and is not always 30 seconds, right?
137.4 Could you explain write/write-latency a bit more? Does it include the
time to write to the internal topic?

Jun

On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield 
wrote:

> Hi Jun,
> Thanks for your comments.
>
> 120. Thanks. Fixed.
>
> 121. ShareUpdateValue.SnapshotEpoch indicates which snapshot
> the update applies to. It should of course be the snapshot that precedes
> it in the log. It’s just there to provide a consistency check.
>
> I also noticed that ShareSnapshotValue was missing StateEpoch. It
> isn’t any more.
>
> 122. In KIP-848, ConsumerGroupMemberMetadataValue includes
> GroupEpoch, but in the code it does not. In fact, there is considerable
> divergence between the KIP and the code for this record value schema
> which I expect will be resolved when the migration code has been
> completed.
>
> 123. The share group does not persist the target assignment.
>
> 124. Share groups have three kinds of record:
> i) ShareGroupMetadata
>   - this contains the group epoch and is written whenever the group
> epoch changes.
> ii) ShareGroupMemberMetadata
>- this does not contain the group epoch.
> iii) ShareGroupPartitionMetadata
>- this currently contains the epoch, but I think that is unnecessary.
>  For one thing, the ConsumerGroupPartitionMetadata definition
>  contains the group epoch, but the value appears never to be set.
>  David Jacot confirms that it’s not necessary and is removing it.
>
> I have removed the Epoch from ShareGroupPartitionMetadata.
> The only purpose of the persisting the epoch for a share group is so that
> when a group coordinator takes over the share group, it is able to
> continue the sequence of epochs. ShareGroupMetadataValue.Epoch
> is used for this.
>
> 125. The group epoch will be incremented in this case and
> consequently a ShareGroupMetadata will be written. KIP updated.
>
> 126. Not directly. A share group can only be deleted when it has no
> members, so the tombstones for ShareGroupMemberMetadata will
> have been written when the members left. I have clarified this.
>
> 127. The share coordinator is ignorant of the group epoch. When the
> group coordinator is initializing the share-group state the first time that
> a share-partition is being added to an assignment in the group, the
> group epoch is used as the state epoch. But as the group epoch
> increases over time, the share coordinator is entirely unaware.
>
> When the first consumer for a share-partition fetches records from a
> share-partition leader, the SPL calls the share coordinator to
> ReadShareGroupState. If the SPL has previously read the information
> and again it’s going from 0 to 1 consumer, it confirms it's up to date by
> calling ReadShareGroupOffsetsState.
>
> Even if many consumers are joining at the same time, any share-partition
> which is being initialized will not be included in their assignments. Once
> the initialization is complete, the next rebalance will assign the
> partition
> to some consumers which will discover this by ShareGroupHeartbeat
> response. And then, the fetching begins.
>
> If an SPL receives a ShareFetch request before it’s read the state
> from the SC, it can make the ShareFetch request wait up to MaxWaitMs
> and then it can return an empty set of records if it’s still not ready.
>
> So, I don’t believe there will be too much load. If a topic with many
> partitions is added to the subscribed topics for a share group, the fact
> that the assignments will only start to include the partitions as their
> initialization completes should soften the impact.
>
> 128, 129: The “proper” way to turn on 

Re: [DISCUSS] KIP-1023: Follower fetch from tiered offset

2024-04-22 Thread Abhijeet Kumar
Hi Jun,

Please find my comments inline.


On Thu, Apr 18, 2024 at 3:26 AM Jun Rao  wrote:

> Hi, Abhijeet,
>
> Thanks for the reply.
>
> 1. I am wondering if we could achieve the same result by just lowering
> local.retention.ms and local.retention.bytes. This also allows the newly
> started follower to build up the local data before serving the consumer
> traffic.
>

I am not sure I fully followed this. Do you mean we could lower the
local.retention (by size and time)
so that there is little data on the leader's local storage so that the
follower can quickly catch up with the leader?

In that case, we will need to set small local retention across brokers in
the cluster. It will have the undesired
effect where there will be increased remote log fetches for serving consume
requests, and this can cause
degradations. Also, this behaviour (of increased remote fetches) will
happen on all brokers at all times, whereas in
the KIP we are restricting the behavior only to the newly bootstrapped
brokers and only until the time it fully builds
the local logs as per retention defined at the cluster level.
(Deprioritization of the broker could help reduce the impact
 even further)


>
> 2. Have you updated the KIP?
>

The KIP has been updated now.


>
> Thanks,
>
> Jun
>
> On Tue, Apr 9, 2024 at 3:36 AM Satish Duggana 
> wrote:
>
> > +1 to Jun for adding the consumer fetching from a follower scenario
> > also to the existing section that talked about the drawback when a
> > node built with last-tiered-offset has become a leader. As Abhijeet
> > mentioned, we plan to have a follow-up KIP that will address these by
> > having a deprioritzation of these brokers. The deprioritization of
> > those brokers can be removed once they catchup until the local log
> > retention.
> >
> > Thanks,
> > Satish.
> >
> > On Tue, 9 Apr 2024 at 14:08, Luke Chen  wrote:
> > >
> > > Hi Abhijeet,
> > >
> > > Thanks for the KIP to improve the tiered storage feature!
> > >
> > > Questions:
> > > 1. We could also get the "pending-upload-offset" and epoch via remote
> log
> > > metadata, instead of adding a new API to fetch from the leader. Could
> you
> > > explain why you choose the later approach, instead of the former?
> > > 2.
> > > > We plan to have a follow-up KIP that will address both the
> > > deprioritization
> > > of these brokers from leadership, as well as
> > > for consumption (when fetching from followers is allowed).
> > >
> > > I agree with Jun that we might need to make it clear all possible
> > drawbacks
> > > that could have. So, could we add the drawbacks that Jun mentioned
> about
> > > the performance issue when consumer fetch from follower?
> > >
> > > 3. Could we add "Rejected Alternatives" section to the end of the KIP
> to
> > > add some of them?
> > > Like the "ListOffsetRequest" approach VS
> "Earliest-Pending-Upload-Offset"
> > > approach, or getting the "Earliest-Pending-Upload-Offset" from remote
> log
> > > metadata... etc.
> > >
> > > Thanks.
> > > Luke
> > >
> > >
> > > On Tue, Apr 9, 2024 at 2:25 PM Abhijeet Kumar <
> > abhijeet.cse@gmail.com>
> > > wrote:
> > >
> > > > Hi Christo,
> > > >
> > > > Please find my comments inline.
> > > >
> > > > On Fri, Apr 5, 2024 at 12:36 PM Christo Lolov <
> christolo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Abhijeet and Jun,
> > > > >
> > > > > I have been mulling this KIP over a bit more in recent days!
> > > > >
> > > > > re: Jun
> > > > >
> > > > > I wasn't aware we apply 2.1 and 2.2 for reserving new timestamps -
> in
> > > > > retrospect it should have been fairly obvious. I would need to go
> an
> > > > update
> > > > > KIP-1005 myself then, thank you for giving the useful reference!
> > > > >
> > > > > 4. I think Abhijeet wants to rebuild state from
> latest-tiered-offset
> > and
> > > > > fetch from latest-tiered-offset + 1 only for new replicas (or
> > replicas
> > > > > which experienced a disk failure) to decrease the time a partition
> > spends
> > > > > in under-replicated state. In other words, a follower which has
> just
> > > > fallen
> > > > > out of ISR, but has local data will continue using today's Tiered
> > Storage
> > > > > replication protocol (i.e. fetching from earliest-local). I further
> > > > believe
> > > > > he has taken this approach so that local state of replicas which
> have
> > > > just
> > > > > fallen out of ISR isn't forcefully wiped thus leading to situation
> 1.
> > > > > Abhijeet, have I understood (and summarised) what you are proposing
> > > > > correctly?
> > > > >
> > > > > Yes, your understanding is correct. We want to limit the behavior
> > changes
> > > > only to new replicas.
> > > >
> > > >
> > > > > 5. I think in today's Tiered Storage we know the leader's
> > > > log-start-offset
> > > > > from the FetchResponse and we can learn its local-log-start-offset
> > from
> > > > the
> > > > > ListOffsets by asking for earliest-local timestamp (-4). But
> granted,
> > > > this
> > > > > ought to be added as an add

[jira] [Created] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close

2024-04-22 Thread Alex Leung (Jira)
Alex Leung created KAFKA-16600:
--

 Summary: Periodically receive "Failed to transition to 
PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close
 Key: KAFKA-16600
 URL: https://issues.apache.org/jira/browse/KAFKA-16600
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.2
Reporter: Alex Leung


>From time to time, we observe the following ERROR message during streams close:
{code:java}
2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client 
[testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING to 
PENDING_SHUTDOWN
2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client 
[testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to 
PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN
{code}
These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have 
not changed any code related to streams shutdown.



When the problem does not occur (most of the time), it looks like the following:
{code:java}
2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client 
[testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING to 
PENDING_SHUTDOWN
2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client 
[testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in 
PENDING_SHUTDOWN, all resources are being closed and the client will be 
stopped. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-22 Thread Sebastien Viale
Hi,

Thanks for your remarks

L1. I would say "who can do the most can do the least", even though most people 
will fail and stop, we found it interesting to offer the possibility to 
fail-and-send-to-DLQ

L2: We did not consider extending the TimestampExtractor because we estimate it 
out of scope for this KIP. Perhaps it will be possible to include it in an 
ExceptionHandler later.

L3: we will include an example in the KIP, but as we mentioned earlier, the DLQ 
topic can be different in each custom Exception Handler:

When providing custom handlers, users would have the possibility to return:
 * FAIL
* CONTINUE
* FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
* CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")

cheers !
Sébastien



De : Lucas Brutschy 
Envoyé : lundi 22 avril 2024 14:36
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi!

Thanks for the KIP, great stuff.

L1. I was a bit confused that the default configuration (once you set
a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood
correctly. Is this something that will be a common use-case, and is it
a configuration that we want to encourage? It expected that you either
want to fail or skip-and-send-to-DLQ.

L2. Have you considered extending the `TimestampExtractor` interface
so that it can also produce to DLQ? AFAIK it's not covered by any of
the existing exception handlers, but it can cause similar failures
(potentially custom logic, depends on validity input record). There
could also be a default implementation as a subclass of
`ExtractRecordMetadataTimestamp`.

L3. It would be nice to include an example of how to produce to
multiple topics in the KIP, as I can imagine that this will be a
common use-case. I wasn't sure how much code would be involved to make
it work. If a lot of code is required, we may want to consider
exposing some utils that make it easier.

Cheers,
Lucas

This email was screened for spam and malicious content but exercise caution 
anyway.



On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina  wrote:
>
> Hi everyone,
>
> Following all the discussion on this KIP and KIP-1033, we introduced a
> new container class containing only processing context metadata:
> ProcessingMetadata. This new container class is actually part of
> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
> think it's the wisest implementation wise.
>
> I also clarified the interface of the enums:
> withDeadLetterQueueRecords(Iterable byte[]>> deadLetterQueueRecords) . Very likely most users would just
> send one DLQ record, but there might be specific use-cases and what
> can do more can do less, so I added an Iterable.
>
> I took some time to think about the impact of storing the
> ProcessingMetadata on the ProductionExceptionHandler. I think storing
> the topic/offset/partition should be fine, but I am concerned about
> storing the rawSourceKey/Value. I think it could impact some specific
> use-cases, for example, a high-throughput Kafka Streams application
> "counting" messages could have huge source input messages, and very
> small sink messages, here, I assume storing the rawSourceKey/Value
> could significantly require more memory than the actual Kafka Producer
> buffer.
>
> I think the safest approach is actually to only store the fixed-size
> metadata for the ProductionExceptionHandler.handle:
> topic/partition/offset/processorNodeId/taskId, it might be confusing
> for the user, but 1) it is still better than nowaday where there are
> no context information at all, 2) it would be clearly stated in the
> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
> punctuate case). .
>
> Do you think it would be a suitable design Sophie?
>
> Cheers,
> Damien
>
> On Sun, 14 Apr 2024 at 21:30, Loic Greffier  
> wrote:
> >
> > Hi Sophie,
> >
> > Thanks for your feedback.
> > Completing the Damien's comments here for points S1 and S5B.
> >
> > S1:
> > > I'm confused -- are you saying that we're introducing a new kind of 
> > > ProducerRecord class for this?
> >
> > I am wondering if it makes sense to alter the ProducerRecord from Clients 
> > API with a "deadLetterQueueTopicName" attribute dedicated to Kafka Streams 
> > DLQ.
> > Adding "deadLetterQueueTopicName" as an additional parameter to 
> > "withDeadLetterQueueRecord" is a good option, and may allow users to send 
> > records to different DLQ topics depending on conditions:
> > @Override
> > public ProductionExceptionHandlerResponse handle(final ProcessingContext 
> > context,
> > ProducerRecord record,
> > Exception exception) {
> > if (condition1) {
> > return ProductionExceptionHandlerResponse.CONTINUE
> > .withDeadLetterQueueRecord(record, "dlq-topic-a");
> > }
> > if (condition2) {
> > return ProductionExceptionHandlerRespo

Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.7 #142

2024-04-22 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16601) Flaky test – org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics()

2024-04-22 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16601:
---

 Summary: Flaky test – 
org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics()
 Key: KAFKA-16601
 URL: https://issues.apache.org/jira/browse/KAFKA-16601
 Project: Kafka
  Issue Type: Test
  Components: controller
Reporter: Igor Soarez


 
org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics()
 failed with:
h4. Error
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
h4. Stacktrace
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0  at 
app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808)  
at 
app//org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1270)
  at 
app//org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:547)
  at 
app//org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:881)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
  at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) Caused by: 
java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
 

Source:

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/3/tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16602) Flaky test – org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()

2024-04-22 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16602:
---

 Summary: Flaky test – 
org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
 Key: KAFKA-16602
 URL: https://issues.apache.org/jira/browse/KAFKA-16602
 Project: Kafka
  Issue Type: Test
  Components: controller
Reporter: Igor Soarez


 
org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
 failed with:

 
h4. Error
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
h4. Stacktrace
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0  at 
app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808)  
at 
app//org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1270)
  at 
app//org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:547)
  at 
app//org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:881)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
  at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) Caused by: 
java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
 

Source: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/3/tests/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-04-22 Thread Anil Dasari (Jira)
Anil Dasari created KAFKA-16603:
---

 Summary: Data loss when kafka connect sending data to Kafka
 Key: KAFKA-16603
 URL: https://issues.apache.org/jira/browse/KAFKA-16603
 Project: Kafka
  Issue Type: Bug
  Components: clients, producer 
Affects Versions: 3.3.1
Reporter: Anil Dasari


We are experiencing a data loss when Kafka Source connector is failed to send 
data to Kafka topic and offset topic. 

Kafka cluster and Kafka connect details:
 # Kafka version : Confluent community version 7.3.1 i.e Kafka 3.3.1
 # Cluster size : 3 brokers
 # Number of partitions in all topics = 3
 # Replication factor = 3
 # Min ISR set 2
 # Uses no transformations in Kafka connector
 # Use default error tolerance i.e None.

Our connector checkpoints the offsets info received in SourceTask#commitRecord 
and resume the data process from the persisted checkpoint.

The data loss is noticed when broker is unresponsive for few mins due to high 
load and kafka connector was restarted. However, Kafka connector graceful 
shutdown failed.

Logs:

 
{code:java}
[Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] 
Discovered group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
attempted.
Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
Apr 22, 2024 @ 15:56:16.708 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
disconnected.
Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 disconnected.
Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
attempted.
Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log **)
Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
leaderUrl='http://10.75.100.46:8083/', offset=4, 
connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the 
group
Apr 22, 2024 @ 15:56:19.866 Stopping connector d094a5d7bbb046b99d62398cb84d648c
Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' failed 
to properly shut down, has become unresponsive, and may be consuming external 
resources. Correct the configuration for this connector or remove the 
connector. After fixing the connector, it may be necessary to restart this 
worker to release any consumed resources.
Apr 22, 2024 @ 15:56:24.110 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the 
Kafka producer with timeoutMillis = 0 ms.
Apr 22, 2024 @ 15:56:24.110 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to 
force close the producer since pending requests could not be completed within 
timeout 0 ms.
Apr 22, 2024 @ 15:56:24.112 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning 
shutdown of Kafka producer I/O thread, sending remaining records.
Apr 22, 2024 @ 15:56:24.112 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Aborting 
incomplete batches due to forced shutdown
Apr 22, 2024 @ 15:56:24.113 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
Committing offsets
Apr 22, 2024 @ 15:56:24.113 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Either 
no records were produced by the task since the last offset commit, or every 
record has been filtered out by a transformation or dropped due to 
transformation or conversion errors.
Apr 22, 2024 @ 15:56:24.146 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Finished stopping tasks in 
p

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Matthias J. Sax

Thanks for all the updates. Great discussion.

Few follow up questions from my side:

99: I agree with Damien about Bruno's point (2). We should not return 
`Record` (cf point 103 below why).



100: Can we imagine a case, for which the `ProcessingExceptionHandler` 
would want/need to have access to a Processor's state stores? When the 
handler is executed, we are technically still in the right context, and 
could maybe allow accessing the state store. Just a wild idea; also 
don't want to enlarge the scope unnecessarily, and we might be able to 
do this in a follow up KIP, too, if we believe it would be useful. I 
though I just throw out the idea for completeness.



101: Does the name `ErrorHandlerContext` align to what Sophie had in 
mind about using this interface somewhere else?



102 `ErrorHandlerContext`: Is it intentional to have both `partition()` 
and `taskId()` -- the `TaskId` encodes the partition implicitly, so it's 
kinda redundant to also have `partition()`. Don't feel strongly about 
it, but might be worth to call out in the KIP why both are added.



103 `ErrorHandlerContext#header`: the return type is `Headers` which 
does not ensure immutability. I believe we need to introduce new 
`ReadOnlyHeaders` (or maybe some better name) interface...



104 `ErrorHandlerContext#convertToProcessorContext()`: I understand why 
this method was added, but I don't think that's the right approach to 
handle this case. We should not add this leaky abstraction IMHO, but 
instead add this method to a `DefaultImpl` class, and add a cast into 
the implementation from the interface to the class to access it. (Not 
100% sure about the details how to setup the code, so it would be great 
to get a POC PR up to see how we can do this w/o the need to add this 
method to the interface.)



105 `ProductionExceptionHandler`: why does the existing method get a 
default implementation that throws an exception? Would be good to 
clarify in the KIP why this change in necessary in this way. -- Could we 
also let it `return FAIL` instead?



106 `ProductionExceptionHandler`: why do we add two new methods? IIRC, 
we added `handleSerializationException(...)` only because we could not 
re-use the existing `handle(...)` method (cf KIP-399: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions). 
I think it's sufficient to only add a single new method which 
"blends/unifies" both exiting ones:


  handle(final ErrorHandlerContext context,
 final ProducerRecord record, // no generic types
 final Exception exception)


107 `DeserializationExceptionHandler`: same question as above, about the 
default impl and letting it throw an exception.



108 `default.process.exception.handler`: we use the prefix `default.` 
for both existing handlers, because we allow to pass in topic specific 
handlers via `Consumed` and `Produced` overwrites, ie, the default can 
be overwritten. We don't want to allow to pass in a Processor specific 
handler as pointed out in "Rejected Alternatives" section, and thus the 
configured handler is not really a "default" as it cannot be 
overwritten. For this case, we should drop the `default.` prefix in the 
config name.



109: Lucas brought up the idea on the KIP-1034 discussion to also 
include `TimestampExtractor` interface for DLQ, what I think makes a lot 
of sense. Not sure if we would need any extentions in this KIP to get 
this done? I would rather include timestamp extraction issue in the DLQ 
KIP from day one on. The interface is quite different though, so we 
would need to think a little bit about it in more details how to do 
this. Right now, the contract is that returning `-1` as extracted 
timestamp is an implicit "drop record" signal to the runtime, what is 
rather subtle. Can we do anything about this in a meaningful way?




-Matthias

On 4/22/24 8:20 AM, Damien Gasparina wrote:

Hi Bruno,

1) Good point, naming stuff is definitely hard, I renamed
ProcessingMetadata to ErrorHandlerContext


Is there any reason you did not use something like
Record sourceRecord()


2) The record class is used in many locations and, I assume, could be
expanded by new features.
As the error handler metadata could be stored in memory for a longer
duration due to the ProductionExceptionHandle, I think it is wiser to
have an independent class that is the leanest possible.
Maybe I am overthinking this, what do you think?

3) Oops, good point, I didn't notice that it was still there. I
removed the parameter.

4) To clarify the KIP, I specified the DeserializationExceptionHandler
interface in the KIP.

5) Good point, I updated the KIP with the order you mentioned:
context, record, exception

6) I was indeed thinking of extending the ProcessingContext to store
it there. Let me update the KIP to make it clear.

 From past experience, deprecating an interface is always a bit
painful, in this KIP, I relied on default implementation

Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2837

2024-04-22 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-22 Thread Matthias J. Sax
Thanks for splitting out this KIP. The discussion shows, that it is a 
complex beast by itself, so worth to discuss by its own.



Couple of question / comment:


100 `StateStore#commit()`: The JavaDoc says "must not be called by 
users" -- I would propose to put a guard in place for this, by either 
throwing an exception (preferable) or adding a no-op implementation (at 
least for our own stores, by wrapping them -- we cannot enforce it for 
custom stores I assume), and document this contract explicitly.



101 adding `.position` to the store: Why do we actually need this? The 
KIP says "To ensure consistency with the committed data and changelog 
offsets" but I am not sure if I can follow? Can you elaborate why 
leaving the `.position` file as-is won't work?



If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I think
it is possible, and probably not too expensive, but the devil will be in
the detail.


This sounds like a significant overhead to me. We know that opening a 
single RocksDB takes about 500ms, and thus opening RocksDB to get this 
information might slow down rebalances significantly.



102: It's unclear to me, how `.position` information is added. The KIP 
only says: "position offsets will be stored in RocksDB, in the same 
column family as the changelog offsets". Do you intent to add this 
information to the map passed via `commit(final MapLong> changelogOffsets)`? The KIP should describe this in more detail. 
Also, if my assumption is correct, we might want to rename the parameter 
and also have a better JavaDoc description?



103: Should we make it mandatory (long-term) that all stores (including 
custom stores) manage their offsets internally? Maintaining both options 
and thus both code paths puts a burden on everyone and make the code 
messy. I would strongly prefer if we could have mid-term path to get rid 
of supporting both.  -- For this case, we should deprecate the newly 
added `managesOffsets()` method right away, to point out that we intend 
to remove it. If it's mandatory to maintain offsets for stores, we won't 
need this method any longer. In memory stores can just return null from 
#committedOffset().



104 "downgrading": I think it might be worth to add support for 
downgrading w/o the need to wipe stores? Leveraging `upgrade.from` 
parameter, we could build a two rolling bounce downgrade: (1) the new 
code is started with `upgrade.from` set to a lower version, telling the 
runtime to do the cleanup on `close()` -- (ie, ensure that all data is 
written into `.checkpoint` and `.position` file, and the newly added CL 
is deleted). In a second, rolling bounce, the old code would be able to 
open RocksDB. -- I understand that this implies much more work, but 
downgrade seems to be common enough, that it might be worth it? Even if 
we did not always support this in the past, we have the face the fact 
that KS is getting more and more adopted and as a more mature product 
should support this?





-Matthias







On 4/21/24 11:58 PM, Bruno Cadonna wrote:

Hi all,

How should we proceed here?

1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but locally 
existing task state


While I like option 2, I think option 1 is less risky and will give us 
the benefits of transactional state stores sooner. We should consider 
the interface approach afterwards, though.



Best,
Bruno



On 4/17/24 3:15 PM, Bruno Cadonna wrote:

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can 
read the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the state directory so that it 
knows where to find the information that it needs to return from 
changelogOffsets().


In general, I think we should proceed with the plain .checkpoint file 
for now and iterate back to the state store solution later since it 
seems it is not that straightforward. Alternatively, Nick could 
timebox an effort to better understand what would be needed for the 
state store solution. Nick, let us know your decision.


Regarding your question about the state store instance. I am not too 
familiar with that part of the code, but I think the state store is 
build when the processor topology is build and the processor topology 
is build per stream task. So there is one instance of processor 
topology and state store per stream task. Try to follow the call in [1].


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153




On 4/16/24 8:59 PM, Nick Telford wrote:

That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole 
Topology
(in InternalTopologyBuilder), and pass that into 
Proces

Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-22 Thread Matthias J. Sax

Thanks Alieh!

A few nits:


1) The new config we add for the producer should be mentioned in the 
"Public Interfaces" section.


2) Why do we use `producer.` prefix for a *producer* config? Should it 
be `exception.handler` only?



-Matthias

On 4/22/24 7:38 AM, Alieh Saeedi wrote:

Thank you all for the feedback!

Addressing the main concern: The KIP is about giving the user the ability
to handle producer exceptions, but to be more conservative and avoid future
issues, we decided to be limited to a short list of exceptions. I included
*RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open to
suggestion for adding some more ;-)

KIP Updates:
- clarified the way that the user should configure the Producer to use the
custom handler. I think adding a producer config property is the cleanest
one.
- changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to be
closer to what we are changing.
- added the ProducerRecord as the input parameter of the handle() method as
well.
- increased the response types to 3 to have fail and two types of continue.
- The default behaviour is having no custom handler, having the
corresponding config parameter set to null. Therefore, the KIP provides no
default implementation of the interface.
- We follow the interface solution as described in the
Rejected Alternetives section.


Cheers,
Alieh


On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax  wrote:


Thanks for the KIP Alieh! It addresses an important case for error
handling.

I agree that using this handler would be an expert API, as mentioned by
a few people. But I don't think it would be a reason to not add it. It's
always a tricky tradeoff what to expose to users and to avoid foot guns,
but we added similar handlers to Kafka Streams, and have good experience
with it. Hence, I understand, but don't share the concern raised.

I also agree that there is some responsibility by the user to understand
how such a handler should be implemented to not drop data by accident.
But it seem unavoidable and acceptable.

While I understand that a "simpler / reduced" API (eg via configs) might
also work, I personally prefer a full handler. Configs have the same
issue that they could be miss-used potentially leading to incorrectly
dropped data, but at the same time are less flexible (and thus maybe
ever harder to use correctly...?). Base on my experience, there is also
often weird corner case for which it make sense to also drop records for
other exceptions, and a full handler has the advantage of full
flexibility and "absolute power!".

To be fair: I don't know the exact code paths of the producer in
details, so please keep me honest. But my understanding is, that the KIP
aims to allow users to react to internal exception, and decide to keep
retrying internally, swallow the error and drop the record, or raise the
error?

Maybe the KIP would need to be a little bit more precises what error we
want to cover -- I don't think this list must be exhaustive, as we can
always do follow up KIP to also apply the handler to other errors to
expand the scope of the handler. The KIP does mention examples, but it
might be good to explicitly state for what cases the handler gets applied?

I am also not sure if CONTINUE and FAIL are enough options? Don't we
need three options? Or would `CONTINUE` have different meaning depending
on the type of error? Ie, for a retryable error `CONTINUE` would mean
keep retrying internally, but for a non-retryable error `CONTINUE` means
swallow the error and drop the record? This semantic overload seems
tricky to reason about by users, so it might better to split `CONTINUE`
into two cases -> `RETRY` and `SWALLOW` (or some better names).

Additionally, should we just ship a `DefaultClientExceptionHandler`
which would return `FAIL`, for backward compatibility. Or don't have any
default handler to begin with and allow it to be `null`? I don't see the
need for a specific `TransactionExceptionHandler`. To me, the goal
should be to not modify the default behavior at all, but to just allow
users to change the default behavior if there is a need.

What is missing on the KIP though it, how the handler is passed into the
producer thought? Would we need a new config which allows to set a
custom handler? And/or would we allow to pass in an instance via the
constructor or add a new method to set a handler?


-Matthias

On 4/18/24 10:02 AM, Andrew Schofield wrote:

Hi Alieh,
Thanks for the KIP.

Exception handling in the Kafka producer and consumer is really not

ideal.

It’s even harder working out what’s going on with the consumer.

I’m a bit nervous about this KIP and I agree with Chris that it could do

with additional

motivation. This would be an expert-level interface given how complicated
the exception handling for Kafka has become.

7. The application is not really aware of the batching being done on its

behalf.

The ProduceResponse can actually return an array of records which failed
per batch. If you

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-22 Thread Andrew Schofield
Hi Fred,
Just reviewing the KIP again now that the discussion has quietened down a 
little.
It will soon be ready for a vote I think. I have a few comments about details.

A1) The signature of the new constructor for RecordDeserializationException
needs to be updated accord to the discussion. I see that you have a PR which
is much closer to what I expected. Personally, I think the arguments for the 
constructor
which represent the portions of the record should match the order for the 
constructor
of ConsumerRecord. We’ve already worked out the order of these things once so I
would go for consistency. I suggest:

public RecordDeserializationException(
TopicPartition partition,
long offset,
long timestamp,
TimestampType timestampType,
ByteBuffer key,
ByteBuffer value,
Headers headers,
String message,
Throwable cause);

A2) There are still references to the Record class in the KIP, but we decided 
not
to use it.

A3) There is also a reference to a getConsumerRecord() method which is now to
be replaced by individual methods for the portions of the record, such as:
byte[] value();

The KIP should have a complete and accurate description of the Java interface
changes so please fill in the details.

A4) Given that the key and value are provided to the constructor as ByteBuffer 
but
lazily converted into byte[] as required, I wonder whether the names of the 
methods
and the constructor arguments should be slightly different, just a keyBuffer 
for the
constructure and key() for the getter. Maybe you prefer to keep them the same 
and
I’m happy with that. Just offering a suggestion.


Thanks for the KIP. I think it’s a worthwhile improvement and I expect it’s 
almost there.

Thanks,
Andrew



> On 19 Apr 2024, at 18:59, Frédérik Rouleau  
> wrote:
>
> Hi everyone,
>
> Thanks for all that valuable feedback.
> So we have a consensus not to use Record.
>
> I have updated to PR by creating 2 childs classes
> KeyDeserializationException and ValueDeserializationException. Those
> classes directly embed the required fields. I do not think a wrapper object
> would be useful in that case.
> I still had to update checkstyle as Headers class is not allowed for import
> in the Errors package. I do not think it's an issue to add that
> authorization as Headers is already used in consumerRecord, so already
> public.
>
> The proposed PR https://github.com/apache/kafka/pull/15691/files
>
> If it's ok I will update the KIP.
>
> Regards,
> Fred