Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2810

2024-04-12 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-12 Thread Sophie Blee-Goldman
Somewhat minor point overall, but it actually drives me crazy that you
can't get access to the taskId of a StateStore until #init is called. This
has caused me a huge headache personally (since the same is true for
processors and I was trying to do something that's probably too hacky to
actually complain about here lol)

Can we just change the StateStoreSupplier to receive and pass along the
taskId when creating a new store? Presumably by adding a new version of the
#get method that takes in a taskId parameter? We can have it default to
invoking the old one for compatibility reasons and it should be completely
safe to tack on.

Would also prefer the same for a ProcessorSupplier, but that's definitely
outside the scope of this KIP

On Fri, Apr 12, 2024 at 3:31 AM Nick Telford  wrote:

> On further thought, it's clear that this can't work for one simple reason:
> StateStores don't know their associated TaskId (and hence, their
> StateDirectory) until the init() call. Therefore, committedOffset() can't
> be called before init(), unless we also added a StateStoreContext argument
> to committedOffset(), which I think might be trying to shoehorn too much
> into committedOffset().
>
> I still don't like the idea of the Streams engine maintaining the cache of
> changelog offsets independently of stores, mostly because of the
> maintenance burden of the code duplication, but it looks like we'll have to
> live with it.
>
> Unless you have any better ideas?
>
> Regards,
> Nick
>
> On Wed, 10 Apr 2024 at 14:12, Nick Telford  wrote:
>
> > Hi Bruno,
> >
> > Immediately after I sent my response, I looked at the codebase and came
> to
> > the same conclusion. If it's possible at all, it will need to be done by
> > creating temporary StateManagers and StateStores during rebalance. I
> think
> > it is possible, and probably not too expensive, but the devil will be in
> > the detail.
> >
> > I'll try to find some time to explore the idea to see if it's possible
> and
> > report back, because we'll need to determine this before we can vote on
> the
> > KIP.
> >
> > Regards,
> > Nick
> >
> > On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna  wrote:
> >
> >> Hi Nick,
> >>
> >> Thanks for reacting on my comments so quickly!
> >>
> >>
> >> 2.
> >> Some thoughts on your proposal.
> >> State managers (and state stores) are parts of tasks. If the task is not
> >> assigned locally, we do not create those tasks. To get the offsets with
> >> your approach, we would need to either create kind of inactive tasks
> >> besides active and standby tasks or store and manage state managers of
> >> non-assigned tasks differently than the state managers of assigned
> >> tasks. Additionally, the cleanup thread that removes unassigned task
> >> directories needs to concurrently delete those inactive tasks or
> >> task-less state managers of unassigned tasks. This seems all quite messy
> >> to me.
> >> Could we create those state managers (or state stores) for locally
> >> existing but unassigned tasks on demand when
> >> TaskManager#getTaskOffsetSums() is executed? Or have a different
> >> encapsulation for the unused task directories?
> >>
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >>
> >> On 4/10/24 11:31 AM, Nick Telford wrote:
> >> > Hi Bruno,
> >> >
> >> > Thanks for the review!
> >> >
> >> > 1, 4, 5.
> >> > Done
> >> >
> >> > 3.
> >> > You're right. I've removed the offending paragraph. I had originally
> >> > adapted this from the guarantees outlined in KIP-892. But it's
> >> difficult to
> >> > provide these guarantees without the KIP-892 transaction buffers.
> >> Instead,
> >> > we'll add the guarantees back into the JavaDoc when KIP-892 lands.
> >> >
> >> > 2.
> >> > Good point! This is the only part of the KIP that was (significantly)
> >> > changed when I extracted it from KIP-892. My prototype currently
> >> maintains
> >> > this "cache" of changelog offsets in .checkpoint, but doing so becomes
> >> very
> >> > messy. My intent with this change was to try to better encapsulate
> this
> >> > offset "caching", especially for StateStores that can cheaply provide
> >> the
> >> > offsets stored directly in them without needing to duplicate them in
> >> this
> >> > cache.
> >> >
> >> > It's clear some more work is needed here to better encapsulate this.
> My
> >> > immediate thought is: what if we construct *but don't initialize* the
> >> > StateManager and StateStores for every Task directory on-disk? That
> >> should
> >> > still be quite cheap to do, and would enable us to query the offsets
> for
> >> > all on-disk stores, even if they're not open. If the StateManager
> (aka.
> >> > ProcessorStateManager/GlobalStateManager) proves too expensive to hold
> >> open
> >> > for closed stores, we could always have a "StubStateManager" in its
> >> place,
> >> > that enables the querying of offsets, but nothing else?
> >> >
> >> > IDK, what do you think?
> >> >
> >> > Regards,
> >> >
> >> > Nick
> >> >
> >> > On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna 
> 

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-12 Thread Sophie Blee-Goldman
I think the bigger question here is: why is checkstyle complaining about
this import? Does anyone know?

On Thu, Apr 11, 2024 at 11:12 AM Frédérik Rouleau
 wrote:

> Hi everyone,
>
> I have made some changes to take in account comments. I have replaced the
> ConsumerRecord by Record. As it was not allowed by checkstyle, I have
> modified its configuration. I hope that's ok.
> I find this new version better. Thanks for your help.
>
> Regards,
> Fred
>


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-12 Thread Sophie Blee-Goldman
Fully agree about creating a new class for the bits of ProcessingContext
that are specific to metadata only. In fact, more or less this same point
just came up in the related KIP 1034 for DLQs, since the RecordMetadata
can't always be trusted to remain immutable. Maybe it's possible to solve
both issues at once, with the same class?

On another related note -- I had actually also just proposed that we
deprecate the existing DeserializationExceptionHandler method and replace
it with one using the new PAPI as part of KIP-1034. But now that I'm
reading this, I would say it probably makes more sense to do in this KIP.
We can also push that out into a smaller-scoped third KIP if you want, but
clearly, there is some overlap here and so however you guys (the authors)
want to organize this part of the work is fine with me. I do think it
should be done alongside/before this KIP and 1034 though, for all the
reasons already stated.

Everything else in the discussion so far I agree with! The
ProcessingContext thing is the only open question in my mind

On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina 
wrote:

> Hi Matthias, Bruno,
>
> 1.a During my previous comment, by Processor Node ID, I meant
> Processor name. This is important information to expose in the handler
> as it allows users to identify the location of the exception in the
> topology.
> I assume this information could be useful in other places, that's why
> I would lean toward adding this as an attribute in the
> ProcessingContext.
>
> 1.b Looking at the ProcessingContext, I do think the following 3
> methods should not be accessible in the exception handlers:
> getStateStore(), schedule() and commit().
> Having a separate interface would make a cleaner signature. It would
> also be a great time to ensure that all exception handlers are
> consistent, at the moment, the
> DeserializationExceptionHandler.handle() relies on the old PAPI
> ProcessorContext and the ProductionExceptionHandler.handle() has none.
> It could make sense to build the new interface in this KIP and track
> the effort to migrate the existing handlers in a separate KIP, what do
> you think?
> Maybe I am overthinking this part and the ProcessingContext would be fine.
>
> 4. Good point regarding the dropped-record metric, as it is used by
> the other handlers, I do think it makes sense to leverage it instead
> of creating a new metric.
> I will update the KIP to update the dropped-record-metric.
>
> 8. Regarding the DSL, I am aligned with Bruno, I think we could close
> the gaps in a future KIP.
>
> Cheers,
> Damien
>
>
> On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna  wrote:
> >
> > Hi Matthias,
> >
> >
> > 1.a
> > With processor node ID, I mean the ID that is exposed in the tags of
> > processor node metrics. That ID cannot be internal since it is exposed
> > in metrics. I think the processor name and the processor node ID is the
> > same thing. I followed how the processor node ID is set in metrics and I
> > ended up in addProcessor(name, ...).
> >
> >
> > 1.b
> > Regarding ProcessingContext, I also thought about a separate class to
> > pass-in context information into the handler, but then I dismissed the
> > idea because I thought I was overthinking it. Apparently, I was not
> > overthinking it if you also had the same idea. So let's consider a
> > separate class.
> >
> >
> > 4.
> > Regarding the metric, thanks for pointing to the dropped-record metric,
> > Matthias. The dropped-record metric is used with the deserialization
> > handler and the production handler. So, it would make sense to also use
> > it for this handler. However, the dropped-record metric only records
> > records that are skipped by the handler and not the number of calls to
> > the handler. But that difference is probably irrelevant since in case of
> > FAIL, the metric will be reset anyways since the stream thread will be
> > restarted. In conclusion, I think the dropped-record metric in
> > combination with a warn log message might be the better choice to
> > introducing a new metric.
> >
> >
> > 8.
> > Regarding the DSL, I think we should close possible gaps in a separate
> KIP.
> >
> >
> > Best,
> > Bruno
> >
> > On 4/11/24 12:06 AM, Matthias J. Sax wrote:
> > > Thanks for the KIP. Great discussion.
> > >
> > > I am not sure if I understand the proposal from Bruno to hand in the
> > > processor node id? Isn't this internal (could not even find it
> quickly).
> > > We do have a processor name, right? Or do I mix up something?
> > >
> > > Another question is about `ProcessingContext` -- it contains a lot of
> > > (potentially irrelevant?) metadata. We should think carefully about
> what
> > > we want to pass in and what not -- removing stuff is hard, but adding
> > > stuff is easy. It's always an option to create a new interface that
> only
> > > exposes stuff we find useful, and allows us to evolve this interface
> > > independent of others. Re-using an existing interface always has the
> > > danger to introduce 

[jira] [Created] (KAFKA-16545) Auto adjust the replica factor according to number of broker when using ClusterTestExtensions

2024-04-12 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16545:
--

 Summary: Auto adjust the replica factor according to number of 
broker when using ClusterTestExtensions
 Key: KAFKA-16545
 URL: https://issues.apache.org/jira/browse/KAFKA-16545
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


In most test cases, we start single broker so as to save resources. However, it 
could causes error when creating internal topics since they require 3 replicas 
by default. In order to reducing the duplicate configs from all tests, we can 
add a bit sugar to auto adjust the replica factor (if it is not defined by 
tests) when the number of brokers started by tests is less then default value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Sophie Blee-Goldman
Thanks for the KIP, this will make a lot of people very happy.

Wanted to chime in on a few points that have been raised so far and add
some of my own (numbering with an S to distinguish my points from the
previous ones)

S1.

> 1.a I really meant ProducerRecord, that's the class used to forward to
> downstream processors in the PAPI. The only information missing in
> this class is the topic name. I also considered relying on the Kafka
> Producer ProducerRecord, but I assume it would not be consistent with
> the KafkaStreams API.

I'm confused -- are you saying that we're introducing a new kind of
ProducerRecord class for this? Why not just use the existing one, ie  the
o.a.k.clients.producer.ProducerRecord class? This is what the
ProductionExceptionHandler uses, so it's definitely "consistent". In other
words, we can remove the "String deadLetterQueueTopicName"

S2.
I think this would be a good opportunity to also deprecate the existing
#handle method of the DeserializationExceptionHandler, and replace it with
one that uses a ProcessingContext instead of the  ProcessorContext. Partly
for the same reasons about guarding access to the #forward methods, partly
because this method needs to be  migrated to the new PAPI interface
anyways, and ProcessingContext is part of the new one.

S3.
Regarding 2a. -- I'm inclined to agree that records which a Punctuator
failed to produce should also be sent to the DLQ via the
ProductionExceptionHandler.  Users will just need to be careful about
accessing certain fields of the ProcessingContext that aren't available in
the punctuator, and need to  check the Optional returned by the
ProcessingContext#recordMetadata API.
Also, from an implementation standpoint, it will be really hard to
distinguish between a record created by a punctuator vs a processor from
within the  RecordCollector, which is the class that actually handles
sending records to the Streams Producer and invoking the
ProductionExceptionHandler. This is because the RecordCollector is at the
"end" of the topology graph and doesn't have any context about which of the
upstream processors actually attempted to forward a record.

This in itself is at least theoretically solvable, but it leads into my
first major new point:

S4:
 I'm deeply worried about passing the ProcessingContext in as a means of
forwarding metadata. The problem is that the processing/processor context
is a mutable class and is inherently meaningless outside the context of a
specific task. And when I said earlier that the RecordCollector  sits at
the "end" of the topology, I meant that it's literally outside the task's
subtopology and is used/shared by all tasks on that StreamThread. So to
begin with, there's no guarantee what will actually be returned for
essential methods such as the new #rawSourceKey/Value or the existing
#recordMetadata

For serialization exceptions it'll probably be correct, but for general
send errors it almost definitely won't be. In short, this is because we
send records to the producer after the sink node, but don't check for send
errors right away since obviously it takes some time for the producer to
actually send. In other words, sending/producing records is actually done
asynchronously with processing, and we simply check for errors on any
previously-sent records
during the send on a new record in a sink node. This means the context we
would be passing in to a (non-serialization) exception would pretty much
always correspond not the the record that experienced the error, but the
random record that happened to be being sent when we checked and saw the
error for the failed record.

This discrepancy, in addition to the whole "sourceRawKey/Value and
recordMetadata are null for punctuators" issue, seems like an
insurmountable inconsistency that is more likely to cause users confusion
or problems than be  helpful.
We could create a new metadata object and copy over the relevant info from
the ProcessingContext, but I worry that has the potential to explode memory
since we'd need to hold on to it for all in-flight records up until they
are either successfully sent or failed and passed in to the
ProductionExceptionHandler. But if the metadata is relatively small, it's
probably fine. Especially if it's just the raw source key/value. Are
there any other parts of the ProcessingContext you think should be made
available?

Note that this only applies to the ProductionExceptionHandler, as the
DeserializationExceptionHandler (and the newly proposed
ProcessingExceptionHandler) would both be invoked immediately and therefore
with the failed record's context. However, I'm also a bit uncomfortable
with adding the rawSourceKey/rawSourceValue to the ProcessingContext. So
I'd propose to just wrap those (and any other metadata you might want) in a
container class and pass that in instead of the ProcessingContext, to all
of the exception handlers.

S5:
For some reason I'm finding the proposed API a little bit awkward, although
it's entirely 

Re: [DISCUSS] KIP-899: Allow clients to rebootstrap

2024-04-12 Thread Chris Egerton
Thanks Ivan! LGTM

On Fri, Apr 12, 2024, 13:38 Ivan Yurchenko  wrote:

> Hi Chris and all,
>
> Thank you for your feedback. Your proposals seems good to me. I did these
> changed to the KIP, please have a look at the change [1]
>
> Best,
> Ivan
>
> [1]
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396=14=12
>
> On Thu, Apr 11, 2024, at 10:49, Chris Egerton wrote:
> > Hi Ivan,
> >
> > I agree with Andrew that we can save cluster ID checking for later. This
> > feature is opt-in and if necessary we can add a note to users about only
> > enabling it if they can be certain that the same cluster will always be
> > resolved by the bootstrap servers. This would apply regardless of whether
> > we did client ID checking anyways.
> >
> > Thanks for exploring a variety of options and ironing out the details on
> > this KIP. I think this is acceptable as-is but have a couple of final
> > suggestions we might consider:
> >
> > 1. Although the definition of an unavailable broker is useful ("A broker
> is
> > unavailable when the client doesn't have an established connection with
> it
> > and cannot establish a connection (e.g. due to the reconnect backoff)"),
> I
> > think this is a little too restrictive. It's useful to note this as an
> > example of what we may consider an unavailable broker, but if we leave
> some
> > more wiggle room, it could save us the trouble of a follow-up KIP when
> > tweaking behavior in the future. For example, to reduce discovery time
> for
> > a migrated Kafka cluster, it could be nice to re-bootstrap after a
> > connection attempt has failed for every currently-known broker with no
> > successful attempts in between, instead of waiting for the reconnection
> > backoff interval to kick in. Again, I don't think this needs to happen
> with
> > the initial implementation of the KIP, I just want us to be able to
> explore
> > options like this in the future.
> >
> > 2. In a similar vein, I think we can leave more room in our definition of
> > re-bootstrapping. Instead of "During the rebootstrap process, the client
> > forgets the brokers it knows about and falls back on the bootstrap
> brokers
> > (i.e. provided by bootstrap.servers provided by the client configuration)
> > as if it had just been initialized.", we could say something like "During
> > the rebootstrap process, the client attempts to re-contact the bootstrap
> > servers (i.e. ...) that it contacted during initialization." This could
> be
> > useful if we want to add the bootstrap servers to the previously-known
> list
> > of brokers instead of completely discarding the previously-known set.
> This
> > too can be left out of the initial implementation and just give us a bit
> > more room for future options.
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Apr 9, 2024 at 11:51 AM Andrew Schofield <
> andrew_schofi...@live.com>
> > wrote:
> >
> > > Hi Ivan,
> > > I think you have to go one way or the other with the cluster ID, so I
> > > think removing that from this KIP might
> > > be the best. I think there’s another KIP waiting to be written for
> > > ensuring consistency of clusters, but
> > > I think that wouldn’t conflict at all with this one.
> > >
> > > Thanks,
> > > Andrew
> > >
> > > > On 9 Apr 2024, at 19:11, Ivan Yurchenko  wrote:
> > > >
> > > > Hi Andrew and all,
> > > >
> > > > I looked deeper into the code [1] and it seems the Metadata class is
> OK
> > > with cluster ID changing. So I'm thinking that the rebootstrapping
> > > shouldn't introduce a new failure mode here. And I should remove the
> > > mention of this cluster ID checks from the KIP.
> > > >
> > > > Best,
> > > > Ivan
> > > >
> > > > [1]
> > >
> https://github.com/apache/kafka/blob/ff90f78c700c582f9800013faad827c36b45ceb7/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L355
> > > >
> > > > On Tue, Apr 9, 2024, at 09:28, Andrew Schofield wrote:
> > > >> Hi Ivan,
> > > >> Thanks for the KIP. I can see situations in which this would be
> > > helpful. I have one question.
> > > >>
> > > >> The KIP says the client checks the cluster ID when it re-bootstraps
> and
> > > that it will fail if the
> > > >> cluster ID doesn’t match the previously known one. How does it fail?
> > > Which exception does
> > > >> it throw and when?
> > > >>
> > > >> In a similar vein, now that we are checking cluster IDs, I wonder
> if it
> > > could be extended to
> > > >> cover all situations in which there are cluster ID mismatches, such
> as
> > > the bootstrap server
> > > >> list erroneously pointing at brokers from different clusters and the
> > > problem only being
> > > >> detectable later on.
> > > >>
> > > >> Thanks,
> > > >> Andrew
> > > >>
> > > >>> On 8 Apr 2024, at 18:24, Ivan Yurchenko  wrote:
> > > >>>
> > > >>> Hello!
> > > >>>
> > > >>> I changed the KIP a bit, specifying that the certain benefit goes
> to
> > > consumers not participating in a group, but that other clients can
> benefit
> > > as well in certain 

[jira] [Created] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE

2024-04-12 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16544:
--

 Summary: DescribeTopicsResult#allTopicIds and 
DescribeTopicsResult#allTopicNames should return null instead of throwing NPE
 Key: KAFKA-16544
 URL: https://issues.apache.org/jira/browse/KAFKA-16544
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai


{code:java}
 * @return A future map from topic names to descriptions which can be used 
to check
 * the status of individual description if the describe topic 
request used
 * topic names, otherwise return null, this request succeeds only 
if all the
 * topic descriptions succeed
{code}

According the docs, it should return null if we try to get the result unmatched 
to the request. For example, we call `allTopicNames` in passing 
`TopicIdCollection`. However, the current implementation will throw NPE directly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-1022 Formatting and Updating Features

2024-04-12 Thread Jun Rao
Hi, Justine,

Thanks for the KIP. +1

Jun

On Wed, Apr 10, 2024 at 9:13 AM José Armando García Sancio
 wrote:

> Hi Justine,
>
> +1 (binding)
>
> Thanks for the improvement.
> --
> -José
>


Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Damien Gasparina
Hi Andrew,

Thanks a lot for your review, plenty of good points!

11. Typo fixed, good cach.

12. I do agree with you and Nick also mentioned it, I updated the KIP
to mention that context headers should be forwarded.

13. Good catch, to be consistent with KIP-298, and without a strong
opinion from my side, I updated the KIP with your prefix proposal.

14. I am not sure about this point, a big difference between KIP-298
and this one is that the handlers can easily be overridden, something
that is not doable in Kafka Connect.
If someone would like a different behavior, e.g. to mask the payload
or include further headers, I think we should encourage them to write
their own exception handlers to build the DLQ Record the way they
expect.

15. Yeah, that's a good point, I was not fully convinced about putting
a String in it, I do assume that "null" is also a valid value. I do
assume that the Stacktrace and the Exception in this case are the key
metadata for the user to troubleshoot the problem.
I updated the KIP to mention that the value should be null if
triggered in a punctuate.

16. I added a session to mention that Kafka Streams would not try to
automatically create the topic and the topic should either be
automatically created, or pre-created.

17. If a DLQ record can not be sent, the exception should go to the
uncaughtExceptionHandler. Let me clearly state it in the KIP.

On Fri, 12 Apr 2024 at 17:25, Damien Gasparina  wrote:
>
> Hi Nick,
>
> 1. Good point, that's less impactful than a custom interface, I just
> updated the KIP with the new signature.
>
> 1.a I really meant ProducerRecord, that's the class used to forward to
> downstream processors in the PAPI. The only information missing in
> this class is the topic name. I also considered relying on the Kafka
> Producer ProducerRecord, but I assume it would not be consistent with
> the KafkaStreams API.
>
> 2. Agreed
>
> 2.a I do think exceptions occurring during punctuate should be
> included in the DLQ.
> Even if building a suitable payload is almost impossible, even with
> custom code; those exceptions are still fatal for Kafka Streams by
> default and are something that can not be ignored safely.
> I do assume that most users would want to be informed if an error
> happened during a punctuate, even if only the metadata (e.g.
> stacktrace, exception) is provided.
> I am only concerned flooding the DLQ topic as, if a scheduled
> operation failed, very likely it will fails during the next
> invocation, but
>
> 4. Good point, I clarified the wording in the KIP to make it explicit.
>
> 5. Good point, I will clearly mention that it is out of scope as part
> of the KIP and might not be as trivial as people could expect. I will
> update the KIP once I do have some spare time.
>
> 6. Oh yeah, I didn't think about it, but forwarding input headers
> would definitely make sense. Confluent Schema Registry ID is actually
> part of the payload, but many correlation ID and technical metadata
> are passed through headers, it makes sense to forward them, specially
> as it is the default behavior of Kafka Streams,
>
>
>
> On Fri, 12 Apr 2024 at 15:25, Nick Telford  wrote:
> >
> > Hi Damien and Sebastien,
> >
> > 1.
> > I think you can just add a `String topic` argument to the existing
> > `withDeadLetterQueueRecord(ProducerRecord
> > deadLetterQueueRecord)` method, and then the implementation of the
> > exception handler could choose the topic to send records to using whatever
> > logic the user desires. You could perhaps provide a built-in implementation
> > that leverages your new config to send all records to an untyped DLQ topic?
> >
> > 1a.
> > BTW you have a typo: in your DeserializationExceptionHandler, the type of
> > your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should
> > probably be `ConsumerRecord`.
> >
> > 2.
> > Agreed. I think it's a good idea to provide an implementation that sends to
> > a single DLQ by default, but it's important to enable users to customize
> > this with their own exception handlers.
> >
> > 2a.
> > I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a
> > DLQ topic like it's a bad record. To me, a DLQ should only contain records
> > that failed to process. I'm not even sure how a user would
> > re-process/action one of these other errors; it seems like the purview of
> > error logging to me?
> >
> > 4.
> > My point here was that I think it would be useful for the KIP to contain an
> > explanation of the behavior both with KIP-1033 and without it. i.e. clarify
> > if/how records that throw an exception in a processor are handled. At the
> > moment, I'm assuming that without KIP-1033, processing exceptions would not
> > cause records to be sent to the DLQ, but with KIP-1033, they would. If this
> > assumption is correct, I think it should be made explicit in the KIP.
> >
> > 5.
> > Understood. You may want to make this explicit in the documentation for
> > users, so they understand the 

Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2809

2024-04-12 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-1022 Formatting and Updating Features

2024-04-12 Thread Justine Olshan
Hi Jun,

Ok sounds good.

Justine

On Fri, Apr 12, 2024 at 10:17 AM Jun Rao  wrote:

> Hi, Justine,
>
> unstable.metadata.versions.enable is an internal configuration. So, we
> could probably just remove it instead of depreciation. Also, it would be
> useful to make it clear that unstable.feature.versions.enable is an
> internal configuration.
>
> Thanks,
>
> Jun
>
> On Thu, Apr 11, 2024 at 11:16 AM Justine Olshan
>  wrote:
>
> > Updated. :)
> > Thanks for the reviews
> >
> > Justine
> >
> > On Thu, Apr 11, 2024 at 11:01 AM Jun Rao 
> wrote:
> >
> > > Hi, Justine,
> > >
> > > Thanks for the updated KIP.
> > >
> > > Perhaps it's better to name the new config
> > unstable.feature.versions.enable
> > > since there could be multiple unstable versions.
> > >
> > > Other than that, the KIP looks good to me.
> > >
> > > Jun
> > >
> > > On Thu, Apr 11, 2024 at 9:06 AM Justine Olshan
> > > 
> > > wrote:
> > >
> > > > The original config was never actually approved in any KIP. But we
> can
> > > say
> > > > it is deprecated.
> > > > I can change the config name.
> > > >
> > > > Justine
> > > >
> > > > On Thu, Apr 11, 2024 at 8:52 AM Jun Rao 
> > > wrote:
> > > >
> > > > > Hi, Justine,
> > > > >
> > > > > Thanks for the updated KIP.
> > > > >
> > > > > Would unstable.feature.version.enable be a clearer name? Also,
> should
> > > we
> > > > > remove/deprecate unstable.metadata.versions.enable in this KIP?
> > > > >
> > > > > Jun
> > > > >
> > > > > On Tue, Apr 9, 2024 at 9:09 AM Justine Olshan
> > > >  > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > Makes sense to me. It seems like KIP-1014 has been inactive
> > > recently. I
> > > > > can
> > > > > > update my KIP and mention this change on that discussion thread.
> > > > > >
> > > > > > Justine
> > > > > >
> > > > > > On Tue, Apr 9, 2024 at 9:01 AM Jun Rao  >
> > > > wrote:
> > > > > >
> > > > > > > Hi, Justine,
> > > > > > >
> > > > > > > A single config makes sense to me too. We just need to reach
> > > > consensus
> > > > > > with
> > > > > > > KIP-1014.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Mon, Apr 8, 2024 at 5:06 PM Justine Olshan
> > > > > >  > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Jun,
> > > > > > > >
> > > > > > > > That's a good question. I think maybe for simplicity, we can
> > > have a
> > > > > > > single
> > > > > > > > config?
> > > > > > > > If that makes sense, I will update the KIP.
> > > > > > > >
> > > > > > > > Justine
> > > > > > > >
> > > > > > > > On Mon, Apr 8, 2024 at 3:20 PM Jun Rao
> >  > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi, Justine,
> > > > > > > > >
> > > > > > > > > Thanks for the updated KIP.
> > > > > > > > >
> > > > > > > > > One more question related to KIP-1014. It introduced a new
> > > > > > > > > config unstable.metadata.versions.enable. Does each new
> > feature
> > > > > need
> > > > > > to
> > > > > > > > > have a corresponding config to enable the testing of
> unstable
> > > > > > features
> > > > > > > or
> > > > > > > > > should we have a generic config enabling the testing of all
> > > > > unstable
> > > > > > > > > features?
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Thu, Apr 4, 2024 at 8:24 PM Justine Olshan
> > > > > > > >  > > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I'm hoping this covers the majority of comments. I will
> go
> > > > ahead
> > > > > > and
> > > > > > > > open
> > > > > > > > > > the vote in the next day or so.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Justine
> > > > > > > > > >
> > > > > > > > > > On Wed, Apr 3, 2024 at 3:31 PM Justine Olshan <
> > > > > > jols...@confluent.io>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Find and replace has failed me :(
> > > > > > > > > > >
> > > > > > > > > > > Group version seems a little vague, but we can update
> it.
> > > > > > Hopefully
> > > > > > > > > find
> > > > > > > > > > > and replace won't fail me again, otherwise I will get
> > > another
> > > > > > email
> > > > > > > > on
> > > > > > > > > > this.
> > > > > > > > > > >
> > > > > > > > > > > Justine
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Apr 3, 2024 at 12:15 PM David Jacot
> > > > > > > > >  > > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Thanks, Justine.
> > > > > > > > > > >>
> > > > > > > > > > >> * Should we also use `group.version` (GV) as I
> suggested
> > > in
> > > > my
> > > > > > > > > previous
> > > > > > > > > > >> message in order to be consistent?
> > > > > > > > > > >> * Should we add both names to the `Public Interfaces`
> > > > section?
> > > > > > > > > > >> * There is still at least one usage of
> > > > > > > > `transaction.protocol.verison`
> > > > > > > > > in
> > > > > > > > > > >> the KIP too.
> > > > > > > > > > >>
> > > > > > > > > > >> Best,
> > > 

Re: [DISCUSS] KIP-899: Allow clients to rebootstrap

2024-04-12 Thread Ivan Yurchenko
Hi Chris and all,

Thank you for your feedback. Your proposals seems good to me. I did these 
changed to the KIP, please have a look at the change [1]

Best,
Ivan

[1] 
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396=14=12

On Thu, Apr 11, 2024, at 10:49, Chris Egerton wrote:
> Hi Ivan,
> 
> I agree with Andrew that we can save cluster ID checking for later. This
> feature is opt-in and if necessary we can add a note to users about only
> enabling it if they can be certain that the same cluster will always be
> resolved by the bootstrap servers. This would apply regardless of whether
> we did client ID checking anyways.
> 
> Thanks for exploring a variety of options and ironing out the details on
> this KIP. I think this is acceptable as-is but have a couple of final
> suggestions we might consider:
> 
> 1. Although the definition of an unavailable broker is useful ("A broker is
> unavailable when the client doesn't have an established connection with it
> and cannot establish a connection (e.g. due to the reconnect backoff)"), I
> think this is a little too restrictive. It's useful to note this as an
> example of what we may consider an unavailable broker, but if we leave some
> more wiggle room, it could save us the trouble of a follow-up KIP when
> tweaking behavior in the future. For example, to reduce discovery time for
> a migrated Kafka cluster, it could be nice to re-bootstrap after a
> connection attempt has failed for every currently-known broker with no
> successful attempts in between, instead of waiting for the reconnection
> backoff interval to kick in. Again, I don't think this needs to happen with
> the initial implementation of the KIP, I just want us to be able to explore
> options like this in the future.
> 
> 2. In a similar vein, I think we can leave more room in our definition of
> re-bootstrapping. Instead of "During the rebootstrap process, the client
> forgets the brokers it knows about and falls back on the bootstrap brokers
> (i.e. provided by bootstrap.servers provided by the client configuration)
> as if it had just been initialized.", we could say something like "During
> the rebootstrap process, the client attempts to re-contact the bootstrap
> servers (i.e. ...) that it contacted during initialization." This could be
> useful if we want to add the bootstrap servers to the previously-known list
> of brokers instead of completely discarding the previously-known set. This
> too can be left out of the initial implementation and just give us a bit
> more room for future options.
> 
> Cheers,
> 
> Chris
> 
> On Tue, Apr 9, 2024 at 11:51 AM Andrew Schofield 
> wrote:
> 
> > Hi Ivan,
> > I think you have to go one way or the other with the cluster ID, so I
> > think removing that from this KIP might
> > be the best. I think there’s another KIP waiting to be written for
> > ensuring consistency of clusters, but
> > I think that wouldn’t conflict at all with this one.
> >
> > Thanks,
> > Andrew
> >
> > > On 9 Apr 2024, at 19:11, Ivan Yurchenko  wrote:
> > >
> > > Hi Andrew and all,
> > >
> > > I looked deeper into the code [1] and it seems the Metadata class is OK
> > with cluster ID changing. So I'm thinking that the rebootstrapping
> > shouldn't introduce a new failure mode here. And I should remove the
> > mention of this cluster ID checks from the KIP.
> > >
> > > Best,
> > > Ivan
> > >
> > > [1]
> > https://github.com/apache/kafka/blob/ff90f78c700c582f9800013faad827c36b45ceb7/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L355
> > >
> > > On Tue, Apr 9, 2024, at 09:28, Andrew Schofield wrote:
> > >> Hi Ivan,
> > >> Thanks for the KIP. I can see situations in which this would be
> > helpful. I have one question.
> > >>
> > >> The KIP says the client checks the cluster ID when it re-bootstraps and
> > that it will fail if the
> > >> cluster ID doesn’t match the previously known one. How does it fail?
> > Which exception does
> > >> it throw and when?
> > >>
> > >> In a similar vein, now that we are checking cluster IDs, I wonder if it
> > could be extended to
> > >> cover all situations in which there are cluster ID mismatches, such as
> > the bootstrap server
> > >> list erroneously pointing at brokers from different clusters and the
> > problem only being
> > >> detectable later on.
> > >>
> > >> Thanks,
> > >> Andrew
> > >>
> > >>> On 8 Apr 2024, at 18:24, Ivan Yurchenko  wrote:
> > >>>
> > >>> Hello!
> > >>>
> > >>> I changed the KIP a bit, specifying that the certain benefit goes to
> > consumers not participating in a group, but that other clients can benefit
> > as well in certain situations.
> > >>>
> > >>> You can see the changes in the history [1]
> > >>>
> > >>> Thank you!
> > >>>
> > >>> Ivan
> > >>>
> > >>> [1]
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=240881396=10=11
> > >>>
> > >>> On 2023/07/15 16:37:52 Ivan Yurchenko wrote:
> >  Hello!
> > 
> >  I've made several 

Re: [DISCUSS] KIP-1022 Formatting and Updating Features

2024-04-12 Thread Jun Rao
Hi, Justine,

unstable.metadata.versions.enable is an internal configuration. So, we
could probably just remove it instead of depreciation. Also, it would be
useful to make it clear that unstable.feature.versions.enable is an
internal configuration.

Thanks,

Jun

On Thu, Apr 11, 2024 at 11:16 AM Justine Olshan
 wrote:

> Updated. :)
> Thanks for the reviews
>
> Justine
>
> On Thu, Apr 11, 2024 at 11:01 AM Jun Rao  wrote:
>
> > Hi, Justine,
> >
> > Thanks for the updated KIP.
> >
> > Perhaps it's better to name the new config
> unstable.feature.versions.enable
> > since there could be multiple unstable versions.
> >
> > Other than that, the KIP looks good to me.
> >
> > Jun
> >
> > On Thu, Apr 11, 2024 at 9:06 AM Justine Olshan
> > 
> > wrote:
> >
> > > The original config was never actually approved in any KIP. But we can
> > say
> > > it is deprecated.
> > > I can change the config name.
> > >
> > > Justine
> > >
> > > On Thu, Apr 11, 2024 at 8:52 AM Jun Rao 
> > wrote:
> > >
> > > > Hi, Justine,
> > > >
> > > > Thanks for the updated KIP.
> > > >
> > > > Would unstable.feature.version.enable be a clearer name? Also, should
> > we
> > > > remove/deprecate unstable.metadata.versions.enable in this KIP?
> > > >
> > > > Jun
> > > >
> > > > On Tue, Apr 9, 2024 at 9:09 AM Justine Olshan
> > >  > > > >
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Makes sense to me. It seems like KIP-1014 has been inactive
> > recently. I
> > > > can
> > > > > update my KIP and mention this change on that discussion thread.
> > > > >
> > > > > Justine
> > > > >
> > > > > On Tue, Apr 9, 2024 at 9:01 AM Jun Rao 
> > > wrote:
> > > > >
> > > > > > Hi, Justine,
> > > > > >
> > > > > > A single config makes sense to me too. We just need to reach
> > > consensus
> > > > > with
> > > > > > KIP-1014.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Mon, Apr 8, 2024 at 5:06 PM Justine Olshan
> > > > >  > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Jun,
> > > > > > >
> > > > > > > That's a good question. I think maybe for simplicity, we can
> > have a
> > > > > > single
> > > > > > > config?
> > > > > > > If that makes sense, I will update the KIP.
> > > > > > >
> > > > > > > Justine
> > > > > > >
> > > > > > > On Mon, Apr 8, 2024 at 3:20 PM Jun Rao
>  > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi, Justine,
> > > > > > > >
> > > > > > > > Thanks for the updated KIP.
> > > > > > > >
> > > > > > > > One more question related to KIP-1014. It introduced a new
> > > > > > > > config unstable.metadata.versions.enable. Does each new
> feature
> > > > need
> > > > > to
> > > > > > > > have a corresponding config to enable the testing of unstable
> > > > > features
> > > > > > or
> > > > > > > > should we have a generic config enabling the testing of all
> > > > unstable
> > > > > > > > features?
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Thu, Apr 4, 2024 at 8:24 PM Justine Olshan
> > > > > > >  > > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > I'm hoping this covers the majority of comments. I will go
> > > ahead
> > > > > and
> > > > > > > open
> > > > > > > > > the vote in the next day or so.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Justine
> > > > > > > > >
> > > > > > > > > On Wed, Apr 3, 2024 at 3:31 PM Justine Olshan <
> > > > > jols...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Find and replace has failed me :(
> > > > > > > > > >
> > > > > > > > > > Group version seems a little vague, but we can update it.
> > > > > Hopefully
> > > > > > > > find
> > > > > > > > > > and replace won't fail me again, otherwise I will get
> > another
> > > > > email
> > > > > > > on
> > > > > > > > > this.
> > > > > > > > > >
> > > > > > > > > > Justine
> > > > > > > > > >
> > > > > > > > > > On Wed, Apr 3, 2024 at 12:15 PM David Jacot
> > > > > > > >  > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Thanks, Justine.
> > > > > > > > > >>
> > > > > > > > > >> * Should we also use `group.version` (GV) as I suggested
> > in
> > > my
> > > > > > > > previous
> > > > > > > > > >> message in order to be consistent?
> > > > > > > > > >> * Should we add both names to the `Public Interfaces`
> > > section?
> > > > > > > > > >> * There is still at least one usage of
> > > > > > > `transaction.protocol.verison`
> > > > > > > > in
> > > > > > > > > >> the KIP too.
> > > > > > > > > >>
> > > > > > > > > >> Best,
> > > > > > > > > >> David
> > > > > > > > > >>
> > > > > > > > > >> On Wed, Apr 3, 2024 at 6:29 PM Justine Olshan
> > > > > > > > > >> 
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > I had missed the David's message yesterday about the
> > > naming
> > > > > for
> > > > > > > > > >> transaction
> > > > > > > > > >> > version vs transaction protocol version.
> > > > > > > > > >> >
> > > > > > > > > >> > After some 

Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Damien Gasparina
Hi Nick,

1. Good point, that's less impactful than a custom interface, I just
updated the KIP with the new signature.

1.a I really meant ProducerRecord, that's the class used to forward to
downstream processors in the PAPI. The only information missing in
this class is the topic name. I also considered relying on the Kafka
Producer ProducerRecord, but I assume it would not be consistent with
the KafkaStreams API.

2. Agreed

2.a I do think exceptions occurring during punctuate should be
included in the DLQ.
Even if building a suitable payload is almost impossible, even with
custom code; those exceptions are still fatal for Kafka Streams by
default and are something that can not be ignored safely.
I do assume that most users would want to be informed if an error
happened during a punctuate, even if only the metadata (e.g.
stacktrace, exception) is provided.
I am only concerned flooding the DLQ topic as, if a scheduled
operation failed, very likely it will fails during the next
invocation, but

4. Good point, I clarified the wording in the KIP to make it explicit.

5. Good point, I will clearly mention that it is out of scope as part
of the KIP and might not be as trivial as people could expect. I will
update the KIP once I do have some spare time.

6. Oh yeah, I didn't think about it, but forwarding input headers
would definitely make sense. Confluent Schema Registry ID is actually
part of the payload, but many correlation ID and technical metadata
are passed through headers, it makes sense to forward them, specially
as it is the default behavior of Kafka Streams,



On Fri, 12 Apr 2024 at 15:25, Nick Telford  wrote:
>
> Hi Damien and Sebastien,
>
> 1.
> I think you can just add a `String topic` argument to the existing
> `withDeadLetterQueueRecord(ProducerRecord
> deadLetterQueueRecord)` method, and then the implementation of the
> exception handler could choose the topic to send records to using whatever
> logic the user desires. You could perhaps provide a built-in implementation
> that leverages your new config to send all records to an untyped DLQ topic?
>
> 1a.
> BTW you have a typo: in your DeserializationExceptionHandler, the type of
> your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should
> probably be `ConsumerRecord`.
>
> 2.
> Agreed. I think it's a good idea to provide an implementation that sends to
> a single DLQ by default, but it's important to enable users to customize
> this with their own exception handlers.
>
> 2a.
> I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a
> DLQ topic like it's a bad record. To me, a DLQ should only contain records
> that failed to process. I'm not even sure how a user would
> re-process/action one of these other errors; it seems like the purview of
> error logging to me?
>
> 4.
> My point here was that I think it would be useful for the KIP to contain an
> explanation of the behavior both with KIP-1033 and without it. i.e. clarify
> if/how records that throw an exception in a processor are handled. At the
> moment, I'm assuming that without KIP-1033, processing exceptions would not
> cause records to be sent to the DLQ, but with KIP-1033, they would. If this
> assumption is correct, I think it should be made explicit in the KIP.
>
> 5.
> Understood. You may want to make this explicit in the documentation for
> users, so they understand the consequences of re-processing data sent to
> their DLQ. The main reason I raised this point is it's something that's
> tripped me up in numerous KIPs that that committers frequently remind me
> of; so I wanted to get ahead of it for once! :D
>
> And one new point:
> 6.
> The DLQ record schema appears to discard all custom headers set on the
> source record. Is there a way these can be included? In particular, I'm
> concerned with "schema pointer" headers (like those set by Schema
> Registry), that may need to be propagated, especially if the records are
> fed back into the source topics for re-processing by the user.
>
> Regards,
> Nick
>
>
> On Fri, 12 Apr 2024 at 13:20, Damien Gasparina 
> wrote:
>
> > Hi Nick,
> >
> > Thanks a lot for your review and your useful comments!
> >
> > 1. It is a good point, as you mentioned, I think it would make sense
> > in some use cases to have potentially multiple DLQ topics, so we
> > should provide an API to let users do it.
> > Thinking out-loud here, maybe it is a better approach to create a new
> > Record class containing the topic name, e.g. DeadLetterQueueRecord and
> > changing the signature to
> > withDeadLetterQueueRecords(Iteratable
> > deadLetterQueueRecords) instead of
> > withDeadLetterQueueRecord(ProducerRecord
> > deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would
> > be something like "class DeadLetterQueueRecord extends
> > org.apache.kafka.streams.processor.api;.ProducerRecords { String
> > topic; /*  + getter/setter + */ } "
> >
> > 2. I think the root question here is: should we have one DLQ topic or

Re: [DISCUSS] Transition Required from RemoteLogSegment to LocalLogSegment during Fetch Request

2024-04-12 Thread Arpit Goyal
@Divij Vaidya   Satish @Kamal Chandraprakash
 ,Luke  As you are more aware of the code
flow path , Your suggestion  would help me to advance further on this.

On Thu, Apr 11, 2024, 21:20 Arpit Goyal  wrote:

> Hi All,
> When tackling the issue outlined in
> https://issues.apache.org/jira/browse/KAFKA-16088, which pertains to
> transitioning from RemoteLogSegment to LocalLogSegment during log
> compaction in a fetch request, I'm seeking some suggestions or guidance
> from the community to advance
>
> *Current Behaviour *
>
> At a very high level The fetch request behaves incase of topics enabled
> with tiered storage.
>
> [image: Screenshot 2024-04-11 at 8.49.31 PM.png]
>
> 1. When a consumer client requests an offset that is not available in the
> local log for the given partition, the broker throws an OffsetOutOfRange
> error.
> 2. If the required offset falls within the range of the Remote Segment
> log, we schedule a DelayedRemoteFetch request along with the successful
> results of other partitions from the log, and the responseCallback.
>
> *Current Issue*
> In this scenario, the functionality is compromised when RemoteLogSegments
> are log compacted. As we initiate offset reading from RemoteLogManager,
> there's a possibility that we cannot find the required remote log segment
> within the requested offset range because of log compaction. Ideally, the
> system should then search for higher log segments for additional retrieval.
> However, these higher segments are stored locally instead of remote.
> Presently, there's no mechanism in place to transition the fetch request
> from remote log segments to local log segments, resulting in empty records
> being returned (Check the diagram above). Consequently, the consumer client
> remains unable to increment the offset, erroneously perceiving that there
> is no more data available.
>
> The scenario has been discussed here
> 
> in detail.
>
> *Possible Approaches and downside*
>
> *Implement Mechanism of RemoteLogSegment to Local LogSegment*
>
> 1. Suppose we implement a mechanism to advance the fetch request from
> Remote LogSegment to LocalLogSegment. However, during this process, there's
> a possibility that the local segments are moved to Remote Storage. This
> situation hints at a potential dead spiral loop, where we continuously
> switch between local and remote segments and vice versa.
>
> 2. Handling the advancement of Fetch Request from remote segment to log
> segment code-wise is complex, mainly because the flow is independent, and
> there is no existing mechanism to manage this transition seamlessly.
>
> *Use endoffset of the last RemoteLogSegmentMetadata *
>
> Once we've determined that no RemoteLogSegment satisfies the fetch offset,
> we can utilize the information regarding the next segment to inspect, which
> is based on the end offset of the lastRemoteLogSegment iterated. This
> information can be passed to the client along with an error message
> resembling "OffsetOutOfRangeBecauseOfLogCompaction". Similar to the
> advanced strategy options like "latest" or "earliest", we can advance the
> next fetch request for the given partition to the required value passed to
> the client.
>
> 1. In our current implementation, the subscription position is only
> incremented under two conditions: when there are records in the response or
> when the resetStrategy is configured.
> 2. The proposed design requires sending the next fetch offset in the
> response to the client if this scenario occurs.
>
>
> Please let me know if the community has any suggestions or directions to
> offer.
>
>
> Thanks and Regards
> Arpit Goyal
> 8861094754
>


[jira] [Created] (KAFKA-16543) There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0

2024-04-12 Thread hudeqi (Jira)
hudeqi created KAFKA-16543:
--

 Summary: There may be ambiguous deletions in the 
`cleanupGroupMetadata` when the generation of the group is less than or equal 
to 0
 Key: KAFKA-16543
 URL: https://issues.apache.org/jira/browse/KAFKA-16543
 Project: Kafka
  Issue Type: Bug
  Components: group-coordinator
Affects Versions: 3.6.2
Reporter: hudeqi
Assignee: hudeqi


In the `cleanupGroupMetadata` method, tombstone messages is written to delete 
the group's MetadataKey only when the group is in the Dead state and the 
generation is greater than 0. The comment indicates: 'We avoid writing the 
tombstone when the generationId is 0, since this group is only using Kafka for 
offset storage.' This means that groups that only use Kafka for offset storage 
should not be deleted. However, there is a situation where, for example, Flink 
commit offsets with a generationId equal to -1. If the ApiKeys.DELETE_GROUPS is 
called to delete this group, Flink's group metadata will never be deleted. Yet, 
the logic above has already cleaned up commitKey by writing tombstone messages 
with removedOffsets. Therefore, the actual manifestation is: the group no 
longer exists (since the offsets have been cleaned up, there is no possibility 
of adding the group back to the `groupMetadataCache` unless offsets are 
committed again with the same group name), but the corresponding group metadata 
information still exists in __consumer_offsets. This leads to the problem that 
deleting the group does not completely clean up its related information.

The group's state is set to Dead only in the following three situations:
1. The group information is unloaded
2. The group is deleted by ApiKeys.DELETE_GROUPS
3. All offsets of the group have expired or removed.

Therefore, since the group is already in the Dead state and has been removed 
from the `groupMetadataCache`, why not directly clean up all the information of 
the group? Even if it is only used for storing offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Andrew Schofield
Hi Damien, Sebastien and Loic,
Thanks for the KIP. The DLQ pattern is well established and bringing this to
Kafka Streams is a good improvement. I do plan to add DLQ support to share
groups in the future, once KIP-932 is complete. Having broad support in Kafka
for DLQs is great.

I have a few comments.

11. Tiny typo “Deal letter queue”.

12. Copying across the raw key and value, then adding headers is a good
model and is shared with DLQs in Kafka Connect. It is important that 
pre-existing
headers are also propagated, not just the key and value.

13. In terms of naming headers, I suggest that you follow KIP-298. It uses a 
common
prefix for all added headers which is unlikely to clash with pre-existing 
headers.

For example, you could use:

__streams.errors.topic
__streams.errors.partition
__streams.errors.offset
__streams.errors.exception.class.name
__streams.errors.exception.stacktrace
__streams errors.exception.message

14. Also in common with KIP-298, I think you should be able to opt out
of the additional headers and just copy the raw record onto the DLQ topic.

15. I would not make up the record value for situations in which you do not
have a record value. You can use an exception message header if an exception
is the reason why there’s no record value. You could use an additional header
for the punctuate case if required, or perhaps even create an exception class
specifically for this case.

16. What happens if the DLQ topic does not exist? If topic auto-create is
enabled, the broker would automatically create it with default options.
The KIP probably ought to say what happens when the topic doesn’t exist.

17. What happens if the record cannot be put onto the DLQ? I suppose
KafkaStreams stops.

Thanks,
Andrew


> On 12 Apr 2024, at 14:24, Nick Telford  wrote:
>
> Hi Damien and Sebastien,
>
> 1.
> I think you can just add a `String topic` argument to the existing
> `withDeadLetterQueueRecord(ProducerRecord
> deadLetterQueueRecord)` method, and then the implementation of the
> exception handler could choose the topic to send records to using whatever
> logic the user desires. You could perhaps provide a built-in implementation
> that leverages your new config to send all records to an untyped DLQ topic?
>
> 1a.
> BTW you have a typo: in your DeserializationExceptionHandler, the type of
> your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should
> probably be `ConsumerRecord`.
>
> 2.
> Agreed. I think it's a good idea to provide an implementation that sends to
> a single DLQ by default, but it's important to enable users to customize
> this with their own exception handlers.
>
> 2a.
> I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a
> DLQ topic like it's a bad record. To me, a DLQ should only contain records
> that failed to process. I'm not even sure how a user would
> re-process/action one of these other errors; it seems like the purview of
> error logging to me?
>
> 4.
> My point here was that I think it would be useful for the KIP to contain an
> explanation of the behavior both with KIP-1033 and without it. i.e. clarify
> if/how records that throw an exception in a processor are handled. At the
> moment, I'm assuming that without KIP-1033, processing exceptions would not
> cause records to be sent to the DLQ, but with KIP-1033, they would. If this
> assumption is correct, I think it should be made explicit in the KIP.
>
> 5.
> Understood. You may want to make this explicit in the documentation for
> users, so they understand the consequences of re-processing data sent to
> their DLQ. The main reason I raised this point is it's something that's
> tripped me up in numerous KIPs that that committers frequently remind me
> of; so I wanted to get ahead of it for once! :D
>
> And one new point:
> 6.
> The DLQ record schema appears to discard all custom headers set on the
> source record. Is there a way these can be included? In particular, I'm
> concerned with "schema pointer" headers (like those set by Schema
> Registry), that may need to be propagated, especially if the records are
> fed back into the source topics for re-processing by the user.
>
> Regards,
> Nick
>
>
> On Fri, 12 Apr 2024 at 13:20, Damien Gasparina 
> wrote:
>
>> Hi Nick,
>>
>> Thanks a lot for your review and your useful comments!
>>
>> 1. It is a good point, as you mentioned, I think it would make sense
>> in some use cases to have potentially multiple DLQ topics, so we
>> should provide an API to let users do it.
>> Thinking out-loud here, maybe it is a better approach to create a new
>> Record class containing the topic name, e.g. DeadLetterQueueRecord and
>> changing the signature to
>> withDeadLetterQueueRecords(Iteratable
>> deadLetterQueueRecords) instead of
>> withDeadLetterQueueRecord(ProducerRecord
>> deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would
>> be something like "class DeadLetterQueueRecord extends
>> 

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2808

2024-04-12 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 341788 lines...]
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldUnassignTaskWhenRequired() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldUnassignTaskWhenRequired() PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldClearTaskReleaseFutureOnShutdown() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldClearTaskReleaseFutureOnShutdown() PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldProcessTasks() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldProcessTasks() PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldPunctuateStreamTime() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldPunctuateStreamTime() PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldShutdownTaskExecutor() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldShutdownTaskExecutor() PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldAwaitProcessableTasksIfNoneAssignable() 
STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldAwaitProcessableTasksIfNoneAssignable() 
PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > 
shouldRespectPunctuationDisabledByTaskExecutionMetadata() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > 
shouldRespectPunctuationDisabledByTaskExecutionMetadata() PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldSetTaskTimeoutOnTimeoutException() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldSetTaskTimeoutOnTimeoutException() PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldPunctuateSystemTime() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldPunctuateSystemTime() PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldUnassignTaskWhenNotProgressing() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldUnassignTaskWhenNotProgressing() PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldNotFlushOnException() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > shouldNotFlushOnException() PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > 
shouldRespectProcessingDisabledByTaskExecutionMetadata() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskExecutorTest > 
shouldRespectProcessingDisabledByTaskExecutionMetadata() PASSED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskManagerTest > shouldLockAnEmptySetOfTasks() STARTED
[2024-04-12T10:26:10.208Z] 
[2024-04-12T10:26:10.208Z] Gradle Test Run :streams:test > Gradle Test Executor 
94 > DefaultTaskManagerTest > shouldLockAnEmptySetOfTasks() PASSED
[2024-04-12T10:26:10.208Z] 

Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Nick Telford
Hi Damien and Sebastien,

1.
I think you can just add a `String topic` argument to the existing
`withDeadLetterQueueRecord(ProducerRecord
deadLetterQueueRecord)` method, and then the implementation of the
exception handler could choose the topic to send records to using whatever
logic the user desires. You could perhaps provide a built-in implementation
that leverages your new config to send all records to an untyped DLQ topic?

1a.
BTW you have a typo: in your DeserializationExceptionHandler, the type of
your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should
probably be `ConsumerRecord`.

2.
Agreed. I think it's a good idea to provide an implementation that sends to
a single DLQ by default, but it's important to enable users to customize
this with their own exception handlers.

2a.
I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a
DLQ topic like it's a bad record. To me, a DLQ should only contain records
that failed to process. I'm not even sure how a user would
re-process/action one of these other errors; it seems like the purview of
error logging to me?

4.
My point here was that I think it would be useful for the KIP to contain an
explanation of the behavior both with KIP-1033 and without it. i.e. clarify
if/how records that throw an exception in a processor are handled. At the
moment, I'm assuming that without KIP-1033, processing exceptions would not
cause records to be sent to the DLQ, but with KIP-1033, they would. If this
assumption is correct, I think it should be made explicit in the KIP.

5.
Understood. You may want to make this explicit in the documentation for
users, so they understand the consequences of re-processing data sent to
their DLQ. The main reason I raised this point is it's something that's
tripped me up in numerous KIPs that that committers frequently remind me
of; so I wanted to get ahead of it for once! :D

And one new point:
6.
The DLQ record schema appears to discard all custom headers set on the
source record. Is there a way these can be included? In particular, I'm
concerned with "schema pointer" headers (like those set by Schema
Registry), that may need to be propagated, especially if the records are
fed back into the source topics for re-processing by the user.

Regards,
Nick


On Fri, 12 Apr 2024 at 13:20, Damien Gasparina 
wrote:

> Hi Nick,
>
> Thanks a lot for your review and your useful comments!
>
> 1. It is a good point, as you mentioned, I think it would make sense
> in some use cases to have potentially multiple DLQ topics, so we
> should provide an API to let users do it.
> Thinking out-loud here, maybe it is a better approach to create a new
> Record class containing the topic name, e.g. DeadLetterQueueRecord and
> changing the signature to
> withDeadLetterQueueRecords(Iteratable
> deadLetterQueueRecords) instead of
> withDeadLetterQueueRecord(ProducerRecord
> deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would
> be something like "class DeadLetterQueueRecord extends
> org.apache.kafka.streams.processor.api;.ProducerRecords { String
> topic; /*  + getter/setter + */ } "
>
> 2. I think the root question here is: should we have one DLQ topic or
> multiple DLQ topics by default. This question highly depends on the
> context, but implementing a default implementation to handle multiple
> DLQ topics would be opinionated, e.g. how to manage errors in a
> punctuate?
> I think it makes sense to have the default implementation writing all
> faulty records to a single DLQ, that's at least the approach I used in
> past applications: one DLQ per Kafka Streams application. Of course
> the message format could change in the DLQ e.g. due to the source
> topic, but those DLQ records will be very likely troubleshooted, and
> maybe replay, manually anyway.
> If a user needs to have multiple DLQ topics or want to enforce a
> specific schema, it's still possible, but they would need to implement
> custom Exception Handlers.
> Coming back to 1. I do agree that it would make sense to have the user
> set the DLQ topic name in the handlers for more flexibility.
>
> 3. Good point, sorry it was a typo, the ProcessingContext makes much
> more sense here indeed.
>
> 4. I do assume that we could implement KIP-1033 (Processing exception
> handler) independently from KIP-1034. I do hope that KIP-1033 would be
> adopted and implemented before KIP-1034, but if that's not the case,
> we could implement KIP-1034 indepantly and update KIP-1033 to include
> the DLQ record afterward (in the same KIP or in a new one if not
> possible).
>
> 5. I think we should be clear that this KIP only covers the DLQ record
> produced.
> Everything related to replay messages or recovery plan should be
> considered out-of-scope as it is use-case and error specific.
>
> Let me know if that's not clear, there are definitely points that
> highly debatable.
>
> Cheers,
> Damien
>
> On Fri, 12 Apr 2024 at 13:00, Nick Telford  wrote:
> >
> > Oh, and one 

Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-04-12 Thread Claude Warren
I think there is an issue in the KIP.

Basically the kip says, if the PID is found in either of the Bloom filters
then no action is taken
If the PID is not found then it is added and the quota rating metrics are
incremented.

In this case long running PIDs will be counted multiple times.

Let's assume a 30 minute window with 2 15-minute frames.  So for the first
15 minutes all PIDs are placed in the first Bloom filter and for the 2nd 15
minutes all new PIDs are placed in the second bloom filter.  At the 3rd 15
minutes the first filter is removed and a new empty one created.

Let's denote Bloom filters as BFn{} and indicate the contained pids between
the braces.


So at t0 lets insert PID0 and increment the rating metrics.  Thus we have
BF0{PID0}
at t0+5 let's insert PID1 and increment the rating metrics.  Thus we have
BF0{PID0, PID1}
at t0+10 we see PID0 again but no changes occur.
at t0+15 we start t1  and we have BF0{PID0, PID1}, BF1{}
at t1+5 we see PID2, increment the rating metrics, and we have BF0{PID0,
PID1}, BF1{PID2}
at t1+6 we see PID0 again and no changes occur
at t1+7 we see PID1 again and no changes occur
at t1+15 we start a new window and dispose of BF0.  Thus we have BF1{PID2},
BF2{}
at t2+1 we see PID3, increment the rating metrics,  and we have we have
BF1{PID2}, BF2{PID3}
at t2+6 we see PID0 again but now it is not in the list so we increment the
rating metrics and add it BF1{PID2}, BF2{PID3, PID0}

But we just saw PID0 15 minutes ago.  Well within the 30 minute window we
are trying to track.  Or am I missing something?  It seems like we need to
add each PID to the last bloom filter

On Fri, Apr 12, 2024 at 2:45 PM Claude Warren  wrote:

> Initial code is available at
> https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java
>
> On Tue, Apr 9, 2024 at 2:37 PM Claude Warren  wrote:
>
>> I should also note that the probability of false positives does not fall
>> below shape.P because as it approaches shape.P a new layer is created and
>> filters are added to that.  So no layer in the LayeredBloomFilter exceeds
>> shape.P thus the entire filter does not exceed shape.P.
>>
>> Claude
>>
>> On Tue, Apr 9, 2024 at 2:26 PM Claude Warren  wrote:
>>
>>> The overall design for KIP-936 seems sound to me.  I would make the
>>> following changes:
>>>
>>> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from
>>> commons-collections v4.5
>>>
>>> Define the producer.id.quota.window.size.seconds to be the length of
>>> time that a Bloom filter of PIDs will exist.
>>> Define a new configuration option "producer.id.quota.window.count" as
>>> the number of windows active in window.size.seconds.
>>>
>>> Define the "Shape" (See commons-collections bloomfilters v4.5) of the
>>> bloom filter from the average number of PIDs expected in
>>> window.size.seconds/window.count (call this N) and the probability of false
>>> positives (call this P).  Due to the way the LayeredBloomFilter works the
>>> number of items can be a lower number than the max.  I'll explain that in a
>>> minute.
>>>
>>> The LayeredBloomFilter implements the standard BloomFilter interface but
>>> internally keeps an ordered list of filters (called layers) from oldest
>>> created to newest.  It adds new layers when a specified Predicate
>>> (checkExtend) returns true.  It will remove filters as defined by a
>>> specified Consumer (filterCleanup).
>>>
>>> Everytime a BloomFilter is merged into the LayeredBloomFilter the filter
>>> checks to the "checkExtend" predicate.  If it fires the "filterCleanup" is
>>> called to remove any layers that should be removed and a new layer is added.
>>>
>>> Define the layers of the LayeredBloomFilter to comprise a standard
>>> BloomFilter and an associated expiration timestamp.
>>>
>>> We can thus define
>>>
>>>- "checkExtend" to require a new layer window.size.seconds /
>>>window.count seconds or when the current layer contains shape.N items.
>>>- "filterCleanup" to start at the head of the list of layers and
>>>remove any expired filters, usually 0, every window.size.seconds 1,
>>>infrequently more than 1.
>>>
>>> This system will correctly handle bursty loads.  There are 3 cases to
>>> consider:
>>>
>>>1. If the producer is producing fewer than shape.N PIDs the layer
>>>will not fill up before the next layer is added.
>>>2. If the producer is producing shape.N PIDs the layer will be
>>>processed as either a 1 or a 3 depending on system timings.
>>>3. If the producer is producing more than shape.N PIDs the layer
>>>will fill up and a new layer will be created with an expiration timestamp
>>>window.size.seconds from when it was created.  This is the case that 
>>> leads
>>>to the filterCleanup infrequently having more than 1 layer to remove.
>>>
>>> The last case to consider is if a producer stops generating PIDs, in
>>> this case we should walk the map of producers,  call 

Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-04-12 Thread Claude Warren
Initial code is available at
https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java

On Tue, Apr 9, 2024 at 2:37 PM Claude Warren  wrote:

> I should also note that the probability of false positives does not fall
> below shape.P because as it approaches shape.P a new layer is created and
> filters are added to that.  So no layer in the LayeredBloomFilter exceeds
> shape.P thus the entire filter does not exceed shape.P.
>
> Claude
>
> On Tue, Apr 9, 2024 at 2:26 PM Claude Warren  wrote:
>
>> The overall design for KIP-936 seems sound to me.  I would make the
>> following changes:
>>
>> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from
>> commons-collections v4.5
>>
>> Define the producer.id.quota.window.size.seconds to be the length of time
>> that a Bloom filter of PIDs will exist.
>> Define a new configuration option "producer.id.quota.window.count" as the
>> number of windows active in window.size.seconds.
>>
>> Define the "Shape" (See commons-collections bloomfilters v4.5) of the
>> bloom filter from the average number of PIDs expected in
>> window.size.seconds/window.count (call this N) and the probability of false
>> positives (call this P).  Due to the way the LayeredBloomFilter works the
>> number of items can be a lower number than the max.  I'll explain that in a
>> minute.
>>
>> The LayeredBloomFilter implements the standard BloomFilter interface but
>> internally keeps an ordered list of filters (called layers) from oldest
>> created to newest.  It adds new layers when a specified Predicate
>> (checkExtend) returns true.  It will remove filters as defined by a
>> specified Consumer (filterCleanup).
>>
>> Everytime a BloomFilter is merged into the LayeredBloomFilter the filter
>> checks to the "checkExtend" predicate.  If it fires the "filterCleanup" is
>> called to remove any layers that should be removed and a new layer is added.
>>
>> Define the layers of the LayeredBloomFilter to comprise a standard
>> BloomFilter and an associated expiration timestamp.
>>
>> We can thus define
>>
>>- "checkExtend" to require a new layer window.size.seconds /
>>window.count seconds or when the current layer contains shape.N items.
>>- "filterCleanup" to start at the head of the list of layers and
>>remove any expired filters, usually 0, every window.size.seconds 1,
>>infrequently more than 1.
>>
>> This system will correctly handle bursty loads.  There are 3 cases to
>> consider:
>>
>>1. If the producer is producing fewer than shape.N PIDs the layer
>>will not fill up before the next layer is added.
>>2. If the producer is producing shape.N PIDs the layer will be
>>processed as either a 1 or a 3 depending on system timings.
>>3. If the producer is producing more than shape.N PIDs the layer will
>>fill up and a new layer will be created with an expiration timestamp
>>window.size.seconds from when it was created.  This is the case that leads
>>to the filterCleanup infrequently having more than 1 layer to remove.
>>
>> The last case to consider is if a producer stops generating PIDs, in this
>> case we should walk the map of producers,  call "filterCleanup", and then
>> check to see if the LayeredBloomFilter is empty.  If so, remove it from the
>> map.  It will be empty if the producer has not produced a PID for
>> window.size.seconds.
>>
>> I have this solution mostly coded, though I must admit I do not know
>> where to plugin the ProducerIdQuotaManager defined in the KIP
>>
>> Claude
>>
>


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-12 Thread Andrew Schofield
Hi,
77. I’ve updated the KIP to use log retention rather than log compaction.
The basic ideas of what to persist are unchanged. It makes a few changes:

* It changes the record names: ShareCheckpoint -> ShareSnapshot and
  ShareDelta -> ShareUpdate. They’re equivalent, but renaming makes it
  simple to check I did an atomic change to the new proposal.
* It uses log retention and explicit pruning of elderly records using
  ReplicaManager.deleteRecords
* It gets rid of the nasty DeltaIndex scheme because we don’t need to worry
  about the log compactor and key uniqueness.

I have also changed the ambiguous “State” to “DeliveryState” in RPCs
and records.

And I added a clarification about how the “group.type” configuration should
be used.

Thanks,
Andrew

> On 10 Apr 2024, at 15:33, Andrew Schofield  
> wrote:
> 
> Hi Jun,
> Thanks for your questions.
> 
> 41.
> 41.1. The partition leader obtains the state epoch in the response from
> ReadShareGroupState. When it becomes a share-partition leader,
> it reads the share-group state and one of the things it learns is the
> current state epoch. Then it uses the state epoch in all subsequent
> calls to WriteShareGroupState. The fencing is to prevent writes for
> a previous state epoch, which are very unlikely but which would mean
> that a leader was using an out-of-date epoch and was likely no longer
> the current leader at all, perhaps due to a long pause for some reason.
> 
> 41.2. If the group coordinator were to set the SPSO, wouldn’t it need
> to discover the initial offset? I’m trying to avoid yet another inter-broker
> hop.
> 
> 42.
> 42.1. I think I’ve confused things. When the share group offset is altered
> using AdminClient.alterShareGroupOffsets, the group coordinator WILL
> update the state epoch. I don’t think it needs to update the group epoch
> at the same time (although it could) because the group epoch will have
> been bumped when the group became empty. If the share group offset
> is altered multiple times when the group remains empty, it would be
> harmless if the same state epoch was reused to initialize the state.
> 
> When the share-partition leader updates the SPSO as a result of
> the usual flow of record delivery, it does not update the state epoch.
> 
> 42.2. The share-partition leader will notice the alteration because,
> when it issues WriteShareGroupState, the response will contain the
> error code FENCED_STATE_EPOCH. This is supposed to be the
> last-resort way of catching this.
> 
> When the share-partition leader handles its first ShareFetch request,
> it learns the state epoch from the response to ReadShareGroupState.
> 
> In normal running, the state epoch will remain constant, but, when there
> are no consumers and the group is empty, it might change. As a result,
> I think it would be sensible when the set of share sessions transitions
> from 0 to 1, which is a reasonable proxy for the share group transitioning
> from empty to non-empty, for the share-partition leader to issue
> ReadShareGroupOffsetsState to validate the state epoch. If its state
> epoch is out of date, it can then ReadShareGroupState to re-initialize.
> 
> I’ve changed the KIP accordingly.
> 
> 47, 56. If I am to change BaseOffset to FirstOffset, we need to have
> a clear view of which is the correct term. Having reviewed all of the
> instances, my view is that BaseOffset should become FirstOffset in
> ALL schemas defined in the KIP. Then, BaseOffset is just used in
> record batches, which is already a known concept.
> 
> Please let me know if you agree.
> 
> 60. I’ve added FindCoordinator to the top level index for protocol changes.
> 
> 61. OK. I expect you are correct about how users will be using the
> console share consumer. When I use the console consumer, I always get
> a new consumer group. I have changed the default group ID for console
> share consumer to “console-share-consumer” to match the console consumer
> better and give more of an idea where this mysterious group has come from.
> 
> 77. I will work on a proposal that does not use compaction and we can
> make a judgement about whether it’s a better course for KIP-932. Personally,
> until I’ve written it down and lived with the ideas for a few days, I won’t be
> able to choose which I prefer.
> 
> I should be able to get the proposal written by the end of this week.
> 
> 100. ShareGroupHeartbeatRequest.RebalanceTimeoutMs matches
> ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs from KIP-848.
> I prefer to maintain the consistency.
> 
> 101. Thanks for catching this. The ShareGroupHeartbeatResponse was originally
> created from KIP-848. This part of the schema does not apply and I have 
> removed
> it. I have also renamed AssignedTopicPartitions to simply TopicPartitions 
> which
> aligns with the actual definition of ConsumerGroupHeartbeatResponse.
> 
> 102. No, I don’t think we do. Removed.
> 
> 103. I’ve changed the description for the error codes for ShareFetchResponse.
> 
> 104. 

[DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Sebastien Viale
Hello Nick,

Thanks for your remark.


5. The Record posted in the DLQ topic is not intended to be reprocessed by the 
Kafka Streams application.

If users want to reprocess the records, it is up to them to take into 
considerations the stream time and / or if the window has closed

regards
Sébastien


De : Nick Telford 
Envoyé : vendredi 12 avril 2024 12:57
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Oh, and one more thing:

5.
Whenever you take a record out of the stream, and then potentially
re-introduce it at a later date, you introduce the potential for record
ordering issues. For example, that record could have been destined for a
Window that has been closed by the time it's re-processed. I'd like to see
a section that considers these consequences, and perhaps make those risks
clear to users. For the record, this is exactly what sunk KIP-990, which
was an alternative approach to error handling that introduced the same
issues.

Cheers,

Nick

This email was screened for spam and malicious content but exercise caution 
anyway.



On Fri, 12 Apr 2024 at 11:54, Nick Telford  wrote:

> Hi Damien,
>
> Thanks for the KIP! Dead-letter queues are something that I think a lot of
> users would like.
>
> I think there are a few points with this KIP that concern me:
>
> 1.
> It looks like you can only define a single, global DLQ for the entire
> Kafka Streams application? What about applications that would like to
> define different DLQs for different data flows? This is especially
> important when dealing with multiple source topics that have different
> record schemas.
>
> 2.
> Your DLQ payload value can either be the record value that failed, or an
> error string (such as "error during punctuate"). This is likely to cause
> problems when users try to process the records from the DLQ, as they can't
> guarantee the format of every record value will be the same. This is very
> loosely related to point 1. above.
>
> 3.
> You provide a ProcessorContext to both exception handlers, but state they
> cannot be used to forward records. In that case, I believe you should use
> ProcessingContext instead, which statically guarantees that it can't be
> used to forward records.
>
> 4.
> You mention the KIP-1033 ProcessingExceptionHandler, but what's the plan
> if KIP-1033 is not adopted, or if KIP-1034 lands before 1033?
>
> Regards,
>
> Nick
>
> On Fri, 12 Apr 2024 at 11:38, Damien Gasparina 
> wrote:
>
>> In a general way, if the user does not configure the right ACL, that
>> would be a security issue, but that's true for any topic.
>>
>> This KIP allows users to configure a Dead Letter Queue without writing
>> custom Java code in Kafka Streams, not at the topic level.
>> A lot of applications are already implementing this pattern, but the
>> required code to do it is quite painful and error prone, for example
>> most apps I have seen created a new KafkaProducer to send records to
>> their DLQ.
>>
>> As it would be disabled by default for backward compatibility, I doubt
>> it would generate any security concern.
>> If a user explicitly configures a Deal Letter Queue, it would be up to
>> him to configure the relevant ACLs to ensure that the right principal
>> can access it.
>> It is already the case for all internal, input and output Kafka
>> Streams topics (e.g. repartition, changelog topics) that also could
>> contain confidential data, so I do not think we should implement a
>> different behavior for this one.
>>
>> In this KIP, we configured the default DLQ record to have the initial
>> record key/value as we assume that it is the expected and wanted
>> behavior for most applications.
>> If a user does not want to have the key/value in the DLQ record for
>> any reason, they could still implement exception handlers to build
>> their own DLQ record.
>>
>> Regarding ACL, maybe something smarter could be done in Kafka Streams,
>> but this is out of scope for this KIP.
>>
>> On Fri, 12 Apr 2024 at 11:58, Claude Warren  wrote:
>> >
>> > My concern is that someone would create a dead letter queue on a
>> sensitive
>> > topic and not get the ACL correct from the start. Thus causing
>> potential
>> > confidential data leak. Is there anything in the proposal that would
>> > prevent that from happening? If so I did not recognize it as such.
>> >
>> > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina > >
>> > wrote:
>> >
>> > > Hi Claude,
>> > >
>> > > In this KIP, the Dead Letter Queue is materialized by a standard and
>> > > independant topic, thus normal ACL applies to it like any other topic.
>> > > This should not introduce any security issues, obviously, the right
>> > > ACL would need to be provided to write to the DLQ if configured.
>> > >
>> > > Cheers,
>> > > Damien
>> > >
>> > > On Fri, 12 Apr 2024 at 

Re: [VOTE] KIP-1031: Control offset translation in MirrorSourceConnector

2024-04-12 Thread Chris Egerton
+1 (binding), thanks Omnia!

On Fri, Apr 12, 2024, 03:46 Mickael Maison  wrote:

> Hi Omnia,
>
> +1 (binding), thanks for the KIP!
>
> Mickael
>
> On Fri, Apr 12, 2024 at 9:01 AM Omnia Ibrahim 
> wrote:
> >
> > Hi everyone, I would like to start a voting thread for KIP-1031: Control
> offset translation in MirrorSourceConnector
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector
> >
> > For comments or feedback please check the discussion thread here
> https://lists.apache.org/thread/ym6zr0wrhglft5c000x9c8ych098s7h6
> >
> > Thanks
> > Omnia
> >
>


Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Damien Gasparina
Hi Nick,

Thanks a lot for your review and your useful comments!

1. It is a good point, as you mentioned, I think it would make sense
in some use cases to have potentially multiple DLQ topics, so we
should provide an API to let users do it.
Thinking out-loud here, maybe it is a better approach to create a new
Record class containing the topic name, e.g. DeadLetterQueueRecord and
changing the signature to
withDeadLetterQueueRecords(Iteratable
deadLetterQueueRecords) instead of
withDeadLetterQueueRecord(ProducerRecord
deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would
be something like "class DeadLetterQueueRecord extends
org.apache.kafka.streams.processor.api;.ProducerRecords { String
topic; /*  + getter/setter + */ } "

2. I think the root question here is: should we have one DLQ topic or
multiple DLQ topics by default. This question highly depends on the
context, but implementing a default implementation to handle multiple
DLQ topics would be opinionated, e.g. how to manage errors in a
punctuate?
I think it makes sense to have the default implementation writing all
faulty records to a single DLQ, that's at least the approach I used in
past applications: one DLQ per Kafka Streams application. Of course
the message format could change in the DLQ e.g. due to the source
topic, but those DLQ records will be very likely troubleshooted, and
maybe replay, manually anyway.
If a user needs to have multiple DLQ topics or want to enforce a
specific schema, it's still possible, but they would need to implement
custom Exception Handlers.
Coming back to 1. I do agree that it would make sense to have the user
set the DLQ topic name in the handlers for more flexibility.

3. Good point, sorry it was a typo, the ProcessingContext makes much
more sense here indeed.

4. I do assume that we could implement KIP-1033 (Processing exception
handler) independently from KIP-1034. I do hope that KIP-1033 would be
adopted and implemented before KIP-1034, but if that's not the case,
we could implement KIP-1034 indepantly and update KIP-1033 to include
the DLQ record afterward (in the same KIP or in a new one if not
possible).

5. I think we should be clear that this KIP only covers the DLQ record produced.
Everything related to replay messages or recovery plan should be
considered out-of-scope as it is use-case and error specific.

Let me know if that's not clear, there are definitely points that
highly debatable.

Cheers,
Damien

On Fri, 12 Apr 2024 at 13:00, Nick Telford  wrote:
>
> Oh, and one more thing:
>
> 5.
> Whenever you take a record out of the stream, and then potentially
> re-introduce it at a later date, you introduce the potential for record
> ordering issues. For example, that record could have been destined for a
> Window that has been closed by the time it's re-processed. I'd like to see
> a section that considers these consequences, and perhaps make those risks
> clear to users. For the record, this is exactly what sunk KIP-990, which
> was an alternative approach to error handling that introduced the same
> issues.
>
> Cheers,
>
> Nick
>
> On Fri, 12 Apr 2024 at 11:54, Nick Telford  wrote:
>
> > Hi Damien,
> >
> > Thanks for the KIP! Dead-letter queues are something that I think a lot of
> > users would like.
> >
> > I think there are a few points with this KIP that concern me:
> >
> > 1.
> > It looks like you can only define a single, global DLQ for the entire
> > Kafka Streams application? What about applications that would like to
> > define different DLQs for different data flows? This is especially
> > important when dealing with multiple source topics that have different
> > record schemas.
> >
> > 2.
> > Your DLQ payload value can either be the record value that failed, or an
> > error string (such as "error during punctuate"). This is likely to cause
> > problems when users try to process the records from the DLQ, as they can't
> > guarantee the format of every record value will be the same. This is very
> > loosely related to point 1. above.
> >
> > 3.
> > You provide a ProcessorContext to both exception handlers, but state they
> > cannot be used to forward records. In that case, I believe you should use
> > ProcessingContext instead, which statically guarantees that it can't be
> > used to forward records.
> >
> > 4.
> > You mention the KIP-1033 ProcessingExceptionHandler, but what's the plan
> > if KIP-1033 is not adopted, or if KIP-1034 lands before 1033?
> >
> > Regards,
> >
> > Nick
> >
> > On Fri, 12 Apr 2024 at 11:38, Damien Gasparina 
> > wrote:
> >
> >> In a general way, if the user does not configure the right ACL, that
> >> would be a security issue, but that's true for any topic.
> >>
> >> This KIP allows users to configure a Dead Letter Queue without writing
> >> custom Java code in Kafka Streams, not at the topic level.
> >> A lot of applications are already implementing this pattern, but the
> >> required code to do it is quite painful and error prone, for example
> >> 

Re: [VOTE] KIP-477: Add PATCH method for connector config in Connect REST API

2024-04-12 Thread Yash Mayya
Hi Ivan,

Thanks for reviving this KIP, I think it will be a useful addition to
Connect!

+1 (binding)

Cheers,
Yash

On Tue, Apr 9, 2024 at 4:23 AM Knowles Atchison Jr 
wrote:

> +1 (non binding)
>
> On Mon, Apr 8, 2024, 3:30 PM Chris Egerton 
> wrote:
>
> > Thanks Ivan! +1 (binding) from me.
> >
> > On Mon, Apr 8, 2024, 06:59 Ivan Yurchenko  wrote:
> >
> > > Hello!
> > >
> > > I'd like to put the subj KIP[1] to a vote. Thank you.
> > >
> > > Best regards,
> > > Ivan
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-477%3A+Add+PATCH+method+for+connector+config+in+Connect+REST+API
> > >
> >
>


Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Nick Telford
Oh, and one more thing:

5.
Whenever you take a record out of the stream, and then potentially
re-introduce it at a later date, you introduce the potential for record
ordering issues. For example, that record could have been destined for a
Window that has been closed by the time it's re-processed. I'd like to see
a section that considers these consequences, and perhaps make those risks
clear to users. For the record, this is exactly what sunk KIP-990, which
was an alternative approach to error handling that introduced the same
issues.

Cheers,

Nick

On Fri, 12 Apr 2024 at 11:54, Nick Telford  wrote:

> Hi Damien,
>
> Thanks for the KIP! Dead-letter queues are something that I think a lot of
> users would like.
>
> I think there are a few points with this KIP that concern me:
>
> 1.
> It looks like you can only define a single, global DLQ for the entire
> Kafka Streams application? What about applications that would like to
> define different DLQs for different data flows? This is especially
> important when dealing with multiple source topics that have different
> record schemas.
>
> 2.
> Your DLQ payload value can either be the record value that failed, or an
> error string (such as "error during punctuate"). This is likely to cause
> problems when users try to process the records from the DLQ, as they can't
> guarantee the format of every record value will be the same. This is very
> loosely related to point 1. above.
>
> 3.
> You provide a ProcessorContext to both exception handlers, but state they
> cannot be used to forward records. In that case, I believe you should use
> ProcessingContext instead, which statically guarantees that it can't be
> used to forward records.
>
> 4.
> You mention the KIP-1033 ProcessingExceptionHandler, but what's the plan
> if KIP-1033 is not adopted, or if KIP-1034 lands before 1033?
>
> Regards,
>
> Nick
>
> On Fri, 12 Apr 2024 at 11:38, Damien Gasparina 
> wrote:
>
>> In a general way, if the user does not configure the right ACL, that
>> would be a security issue, but that's true for any topic.
>>
>> This KIP allows users to configure a Dead Letter Queue without writing
>> custom Java code in Kafka Streams, not at the topic level.
>> A lot of applications are already implementing this pattern, but the
>> required code to do it is quite painful and error prone, for example
>> most apps I have seen created a new KafkaProducer to send records to
>> their DLQ.
>>
>> As it would be disabled by default for backward compatibility, I doubt
>> it would generate any security concern.
>> If a user explicitly configures a Deal Letter Queue, it would be up to
>> him to configure the relevant ACLs to ensure that the right principal
>> can access it.
>> It is already the case for all internal, input and output Kafka
>> Streams topics (e.g. repartition, changelog topics) that also could
>> contain confidential data, so I do not think we should implement a
>> different behavior for this one.
>>
>> In this KIP, we configured the default DLQ record to have the initial
>> record key/value as we assume that it is the expected and wanted
>> behavior for most applications.
>> If a user does not want to have the key/value in the DLQ record for
>> any reason, they could still implement exception handlers to build
>> their own DLQ record.
>>
>> Regarding ACL, maybe something smarter could be done in Kafka Streams,
>> but this is out of scope for this KIP.
>>
>> On Fri, 12 Apr 2024 at 11:58, Claude Warren  wrote:
>> >
>> > My concern is that someone would create a dead letter queue on a
>> sensitive
>> > topic and not get the ACL correct from the start.  Thus causing
>> potential
>> > confidential data leak.  Is there anything in the proposal that would
>> > prevent that from happening?  If so I did not recognize it as such.
>> >
>> > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina > >
>> > wrote:
>> >
>> > > Hi Claude,
>> > >
>> > > In  this KIP, the Dead Letter Queue is materialized by a standard and
>> > > independant topic, thus normal ACL applies to it like any other topic.
>> > > This should not introduce any security issues, obviously, the right
>> > > ACL would need to be provided to write to the DLQ if configured.
>> > >
>> > > Cheers,
>> > > Damien
>> > >
>> > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
>> > >  wrote:
>> > > >
>> > > > I am new to the Kafka codebase so please excuse any ignorance on my
>> part.
>> > > >
>> > > > When a dead letter queue is established is there a process to
>> ensure that
>> > > > it at least is defined with the same ACL as the original queue?
>> Without
>> > > > such a guarantee at the start it seems that managing dead letter
>> queues
>> > > > will be fraught with security issues.
>> > > >
>> > > >
>> > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina <
>> d.gaspar...@gmail.com
>> > > >
>> > > > wrote:
>> > > >
>> > > > > Hi everyone,
>> > > > >
>> > > > > To continue on our effort to improve Kafka Streams error
>> handling, we
>> > > > 

Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Nick Telford
Hi Damien,

Thanks for the KIP! Dead-letter queues are something that I think a lot of
users would like.

I think there are a few points with this KIP that concern me:

1.
It looks like you can only define a single, global DLQ for the entire Kafka
Streams application? What about applications that would like to define
different DLQs for different data flows? This is especially important when
dealing with multiple source topics that have different record schemas.

2.
Your DLQ payload value can either be the record value that failed, or an
error string (such as "error during punctuate"). This is likely to cause
problems when users try to process the records from the DLQ, as they can't
guarantee the format of every record value will be the same. This is very
loosely related to point 1. above.

3.
You provide a ProcessorContext to both exception handlers, but state they
cannot be used to forward records. In that case, I believe you should use
ProcessingContext instead, which statically guarantees that it can't be
used to forward records.

4.
You mention the KIP-1033 ProcessingExceptionHandler, but what's the plan if
KIP-1033 is not adopted, or if KIP-1034 lands before 1033?

Regards,

Nick

On Fri, 12 Apr 2024 at 11:38, Damien Gasparina 
wrote:

> In a general way, if the user does not configure the right ACL, that
> would be a security issue, but that's true for any topic.
>
> This KIP allows users to configure a Dead Letter Queue without writing
> custom Java code in Kafka Streams, not at the topic level.
> A lot of applications are already implementing this pattern, but the
> required code to do it is quite painful and error prone, for example
> most apps I have seen created a new KafkaProducer to send records to
> their DLQ.
>
> As it would be disabled by default for backward compatibility, I doubt
> it would generate any security concern.
> If a user explicitly configures a Deal Letter Queue, it would be up to
> him to configure the relevant ACLs to ensure that the right principal
> can access it.
> It is already the case for all internal, input and output Kafka
> Streams topics (e.g. repartition, changelog topics) that also could
> contain confidential data, so I do not think we should implement a
> different behavior for this one.
>
> In this KIP, we configured the default DLQ record to have the initial
> record key/value as we assume that it is the expected and wanted
> behavior for most applications.
> If a user does not want to have the key/value in the DLQ record for
> any reason, they could still implement exception handlers to build
> their own DLQ record.
>
> Regarding ACL, maybe something smarter could be done in Kafka Streams,
> but this is out of scope for this KIP.
>
> On Fri, 12 Apr 2024 at 11:58, Claude Warren  wrote:
> >
> > My concern is that someone would create a dead letter queue on a
> sensitive
> > topic and not get the ACL correct from the start.  Thus causing potential
> > confidential data leak.  Is there anything in the proposal that would
> > prevent that from happening?  If so I did not recognize it as such.
> >
> > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina 
> > wrote:
> >
> > > Hi Claude,
> > >
> > > In  this KIP, the Dead Letter Queue is materialized by a standard and
> > > independant topic, thus normal ACL applies to it like any other topic.
> > > This should not introduce any security issues, obviously, the right
> > > ACL would need to be provided to write to the DLQ if configured.
> > >
> > > Cheers,
> > > Damien
> > >
> > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
> > >  wrote:
> > > >
> > > > I am new to the Kafka codebase so please excuse any ignorance on my
> part.
> > > >
> > > > When a dead letter queue is established is there a process to ensure
> that
> > > > it at least is defined with the same ACL as the original queue?
> Without
> > > > such a guarantee at the start it seems that managing dead letter
> queues
> > > > will be fraught with security issues.
> > > >
> > > >
> > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina <
> d.gaspar...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > To continue on our effort to improve Kafka Streams error handling,
> we
> > > > > propose a new KIP to add out of the box support for Dead Letter
> Queue.
> > > > > The goal of this KIP is to provide a default implementation that
> > > > > should be suitable for most applications and allow users to
> override
> > > > > it if they have specific requirements.
> > > > >
> > > > > In order to build a suitable payload, some additional changes are
> > > > > included in this KIP:
> > > > >   1. extend the ProcessingContext to hold, when available, the
> source
> > > > > node raw key/value byte[]
> > > > >   2. expose the ProcessingContext to the
> ProductionExceptionHandler,
> > > > > it is currently not available in the handle parameters.
> > > > >
> > > > > Regarding point 2.,  to expose the ProcessingContext to the
> > > > > 

Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Damien Gasparina
In a general way, if the user does not configure the right ACL, that
would be a security issue, but that's true for any topic.

This KIP allows users to configure a Dead Letter Queue without writing
custom Java code in Kafka Streams, not at the topic level.
A lot of applications are already implementing this pattern, but the
required code to do it is quite painful and error prone, for example
most apps I have seen created a new KafkaProducer to send records to
their DLQ.

As it would be disabled by default for backward compatibility, I doubt
it would generate any security concern.
If a user explicitly configures a Deal Letter Queue, it would be up to
him to configure the relevant ACLs to ensure that the right principal
can access it.
It is already the case for all internal, input and output Kafka
Streams topics (e.g. repartition, changelog topics) that also could
contain confidential data, so I do not think we should implement a
different behavior for this one.

In this KIP, we configured the default DLQ record to have the initial
record key/value as we assume that it is the expected and wanted
behavior for most applications.
If a user does not want to have the key/value in the DLQ record for
any reason, they could still implement exception handlers to build
their own DLQ record.

Regarding ACL, maybe something smarter could be done in Kafka Streams,
but this is out of scope for this KIP.

On Fri, 12 Apr 2024 at 11:58, Claude Warren  wrote:
>
> My concern is that someone would create a dead letter queue on a sensitive
> topic and not get the ACL correct from the start.  Thus causing potential
> confidential data leak.  Is there anything in the proposal that would
> prevent that from happening?  If so I did not recognize it as such.
>
> On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina 
> wrote:
>
> > Hi Claude,
> >
> > In  this KIP, the Dead Letter Queue is materialized by a standard and
> > independant topic, thus normal ACL applies to it like any other topic.
> > This should not introduce any security issues, obviously, the right
> > ACL would need to be provided to write to the DLQ if configured.
> >
> > Cheers,
> > Damien
> >
> > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
> >  wrote:
> > >
> > > I am new to the Kafka codebase so please excuse any ignorance on my part.
> > >
> > > When a dead letter queue is established is there a process to ensure that
> > > it at least is defined with the same ACL as the original queue?  Without
> > > such a guarantee at the start it seems that managing dead letter queues
> > > will be fraught with security issues.
> > >
> > >
> > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina  > >
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > To continue on our effort to improve Kafka Streams error handling, we
> > > > propose a new KIP to add out of the box support for Dead Letter Queue.
> > > > The goal of this KIP is to provide a default implementation that
> > > > should be suitable for most applications and allow users to override
> > > > it if they have specific requirements.
> > > >
> > > > In order to build a suitable payload, some additional changes are
> > > > included in this KIP:
> > > >   1. extend the ProcessingContext to hold, when available, the source
> > > > node raw key/value byte[]
> > > >   2. expose the ProcessingContext to the ProductionExceptionHandler,
> > > > it is currently not available in the handle parameters.
> > > >
> > > > Regarding point 2.,  to expose the ProcessingContext to the
> > > > ProductionExceptionHandler, we considered two choices:
> > > >   1. exposing the ProcessingContext as a parameter in the handle()
> > > > method. That's the cleanest way IMHO, but we would need to deprecate
> > > > the old method.
> > > >   2. exposing the ProcessingContext as an attribute in the interface.
> > > > This way, no method is deprecated, but we would not be consistent with
> > > > the other ExceptionHandler.
> > > >
> > > > In the KIP, we chose the 1. solution (new handle signature with old
> > > > one deprecated), but we could use other opinions on this part.
> > > > More information is available directly on the KIP.
> > > >
> > > > KIP link:
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams
> > > >
> > > > Feedbacks and suggestions are welcome,
> > > >
> > > > Cheers,
> > > > Damien, Sebastien and Loic
> > > >
> >
>
>
> --
> LinkedIn: http://www.linkedin.com/in/claudewarren


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-12 Thread Nick Telford
On further thought, it's clear that this can't work for one simple reason:
StateStores don't know their associated TaskId (and hence, their
StateDirectory) until the init() call. Therefore, committedOffset() can't
be called before init(), unless we also added a StateStoreContext argument
to committedOffset(), which I think might be trying to shoehorn too much
into committedOffset().

I still don't like the idea of the Streams engine maintaining the cache of
changelog offsets independently of stores, mostly because of the
maintenance burden of the code duplication, but it looks like we'll have to
live with it.

Unless you have any better ideas?

Regards,
Nick

On Wed, 10 Apr 2024 at 14:12, Nick Telford  wrote:

> Hi Bruno,
>
> Immediately after I sent my response, I looked at the codebase and came to
> the same conclusion. If it's possible at all, it will need to be done by
> creating temporary StateManagers and StateStores during rebalance. I think
> it is possible, and probably not too expensive, but the devil will be in
> the detail.
>
> I'll try to find some time to explore the idea to see if it's possible and
> report back, because we'll need to determine this before we can vote on the
> KIP.
>
> Regards,
> Nick
>
> On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna  wrote:
>
>> Hi Nick,
>>
>> Thanks for reacting on my comments so quickly!
>>
>>
>> 2.
>> Some thoughts on your proposal.
>> State managers (and state stores) are parts of tasks. If the task is not
>> assigned locally, we do not create those tasks. To get the offsets with
>> your approach, we would need to either create kind of inactive tasks
>> besides active and standby tasks or store and manage state managers of
>> non-assigned tasks differently than the state managers of assigned
>> tasks. Additionally, the cleanup thread that removes unassigned task
>> directories needs to concurrently delete those inactive tasks or
>> task-less state managers of unassigned tasks. This seems all quite messy
>> to me.
>> Could we create those state managers (or state stores) for locally
>> existing but unassigned tasks on demand when
>> TaskManager#getTaskOffsetSums() is executed? Or have a different
>> encapsulation for the unused task directories?
>>
>>
>> Best,
>> Bruno
>>
>>
>>
>> On 4/10/24 11:31 AM, Nick Telford wrote:
>> > Hi Bruno,
>> >
>> > Thanks for the review!
>> >
>> > 1, 4, 5.
>> > Done
>> >
>> > 3.
>> > You're right. I've removed the offending paragraph. I had originally
>> > adapted this from the guarantees outlined in KIP-892. But it's
>> difficult to
>> > provide these guarantees without the KIP-892 transaction buffers.
>> Instead,
>> > we'll add the guarantees back into the JavaDoc when KIP-892 lands.
>> >
>> > 2.
>> > Good point! This is the only part of the KIP that was (significantly)
>> > changed when I extracted it from KIP-892. My prototype currently
>> maintains
>> > this "cache" of changelog offsets in .checkpoint, but doing so becomes
>> very
>> > messy. My intent with this change was to try to better encapsulate this
>> > offset "caching", especially for StateStores that can cheaply provide
>> the
>> > offsets stored directly in them without needing to duplicate them in
>> this
>> > cache.
>> >
>> > It's clear some more work is needed here to better encapsulate this. My
>> > immediate thought is: what if we construct *but don't initialize* the
>> > StateManager and StateStores for every Task directory on-disk? That
>> should
>> > still be quite cheap to do, and would enable us to query the offsets for
>> > all on-disk stores, even if they're not open. If the StateManager (aka.
>> > ProcessorStateManager/GlobalStateManager) proves too expensive to hold
>> open
>> > for closed stores, we could always have a "StubStateManager" in its
>> place,
>> > that enables the querying of offsets, but nothing else?
>> >
>> > IDK, what do you think?
>> >
>> > Regards,
>> >
>> > Nick
>> >
>> > On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna  wrote:
>> >
>> >> Hi Nick,
>> >>
>> >> Thanks for breaking out the KIP from KIP-892!
>> >>
>> >> Here a couple of comments/questions:
>> >>
>> >> 1.
>> >> In Kafka Streams, we have a design guideline which says to not use the
>> >> "get"-prefix for getters on the public API. Could you please change
>> >> getCommittedOffsets() to committedOffsets()?
>> >>
>> >>
>> >> 2.
>> >> It is not clear to me how TaskManager#getTaskOffsetSums() should read
>> >> offsets of tasks the stream thread does not own but that have a state
>> >> directory on the Streams client by calling
>> >> StateStore#getCommittedOffsets(). If the thread does not own a task it
>> >> does also not create any state stores for the task, which means there
>> is
>> >> no state store on which to call getCommittedOffsets().
>> >> I would have rather expected that a checkpoint file is written for all
>> >> state stores on close -- not only for the RocksDBStore -- and that this
>> >> checkpoint file is read in TaskManager#getTaskOffsetSums() for the
>> tasks

Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.7 #133

2024-04-12 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Claude Warren
My concern is that someone would create a dead letter queue on a sensitive
topic and not get the ACL correct from the start.  Thus causing potential
confidential data leak.  Is there anything in the proposal that would
prevent that from happening?  If so I did not recognize it as such.

On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina 
wrote:

> Hi Claude,
>
> In  this KIP, the Dead Letter Queue is materialized by a standard and
> independant topic, thus normal ACL applies to it like any other topic.
> This should not introduce any security issues, obviously, the right
> ACL would need to be provided to write to the DLQ if configured.
>
> Cheers,
> Damien
>
> On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
>  wrote:
> >
> > I am new to the Kafka codebase so please excuse any ignorance on my part.
> >
> > When a dead letter queue is established is there a process to ensure that
> > it at least is defined with the same ACL as the original queue?  Without
> > such a guarantee at the start it seems that managing dead letter queues
> > will be fraught with security issues.
> >
> >
> > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina  >
> > wrote:
> >
> > > Hi everyone,
> > >
> > > To continue on our effort to improve Kafka Streams error handling, we
> > > propose a new KIP to add out of the box support for Dead Letter Queue.
> > > The goal of this KIP is to provide a default implementation that
> > > should be suitable for most applications and allow users to override
> > > it if they have specific requirements.
> > >
> > > In order to build a suitable payload, some additional changes are
> > > included in this KIP:
> > >   1. extend the ProcessingContext to hold, when available, the source
> > > node raw key/value byte[]
> > >   2. expose the ProcessingContext to the ProductionExceptionHandler,
> > > it is currently not available in the handle parameters.
> > >
> > > Regarding point 2.,  to expose the ProcessingContext to the
> > > ProductionExceptionHandler, we considered two choices:
> > >   1. exposing the ProcessingContext as a parameter in the handle()
> > > method. That's the cleanest way IMHO, but we would need to deprecate
> > > the old method.
> > >   2. exposing the ProcessingContext as an attribute in the interface.
> > > This way, no method is deprecated, but we would not be consistent with
> > > the other ExceptionHandler.
> > >
> > > In the KIP, we chose the 1. solution (new handle signature with old
> > > one deprecated), but we could use other opinions on this part.
> > > More information is available directly on the KIP.
> > >
> > > KIP link:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams
> > >
> > > Feedbacks and suggestions are welcome,
> > >
> > > Cheers,
> > > Damien, Sebastien and Loic
> > >
>


-- 
LinkedIn: http://www.linkedin.com/in/claudewarren


Re: [VOTE] KIP-1031: Control offset translation in MirrorSourceConnector

2024-04-12 Thread Mickael Maison
Hi Omnia,

+1 (binding), thanks for the KIP!

Mickael

On Fri, Apr 12, 2024 at 9:01 AM Omnia Ibrahim  wrote:
>
> Hi everyone, I would like to start a voting thread for KIP-1031: Control 
> offset translation in MirrorSourceConnector 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector
>
> For comments or feedback please check the discussion thread here 
> https://lists.apache.org/thread/ym6zr0wrhglft5c000x9c8ych098s7h6
>
> Thanks
> Omnia
>


Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Damien Gasparina
Hi Claude,

In  this KIP, the Dead Letter Queue is materialized by a standard and
independant topic, thus normal ACL applies to it like any other topic.
This should not introduce any security issues, obviously, the right
ACL would need to be provided to write to the DLQ if configured.

Cheers,
Damien

On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
 wrote:
>
> I am new to the Kafka codebase so please excuse any ignorance on my part.
>
> When a dead letter queue is established is there a process to ensure that
> it at least is defined with the same ACL as the original queue?  Without
> such a guarantee at the start it seems that managing dead letter queues
> will be fraught with security issues.
>
>
> On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina 
> wrote:
>
> > Hi everyone,
> >
> > To continue on our effort to improve Kafka Streams error handling, we
> > propose a new KIP to add out of the box support for Dead Letter Queue.
> > The goal of this KIP is to provide a default implementation that
> > should be suitable for most applications and allow users to override
> > it if they have specific requirements.
> >
> > In order to build a suitable payload, some additional changes are
> > included in this KIP:
> >   1. extend the ProcessingContext to hold, when available, the source
> > node raw key/value byte[]
> >   2. expose the ProcessingContext to the ProductionExceptionHandler,
> > it is currently not available in the handle parameters.
> >
> > Regarding point 2.,  to expose the ProcessingContext to the
> > ProductionExceptionHandler, we considered two choices:
> >   1. exposing the ProcessingContext as a parameter in the handle()
> > method. That's the cleanest way IMHO, but we would need to deprecate
> > the old method.
> >   2. exposing the ProcessingContext as an attribute in the interface.
> > This way, no method is deprecated, but we would not be consistent with
> > the other ExceptionHandler.
> >
> > In the KIP, we chose the 1. solution (new handle signature with old
> > one deprecated), but we could use other opinions on this part.
> > More information is available directly on the KIP.
> >
> > KIP link:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams
> >
> > Feedbacks and suggestions are welcome,
> >
> > Cheers,
> > Damien, Sebastien and Loic
> >


[VOTE] KIP-1031: Control offset translation in MirrorSourceConnector

2024-04-12 Thread Omnia Ibrahim
Hi everyone, I would like to start a voting thread for KIP-1031: Control offset 
translation in MirrorSourceConnector 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector

For comments or feedback please check the discussion thread here 
https://lists.apache.org/thread/ym6zr0wrhglft5c000x9c8ych098s7h6

Thanks
Omnia



Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Claude Warren, Jr
I am new to the Kafka codebase so please excuse any ignorance on my part.

When a dead letter queue is established is there a process to ensure that
it at least is defined with the same ACL as the original queue?  Without
such a guarantee at the start it seems that managing dead letter queues
will be fraught with security issues.


On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina 
wrote:

> Hi everyone,
>
> To continue on our effort to improve Kafka Streams error handling, we
> propose a new KIP to add out of the box support for Dead Letter Queue.
> The goal of this KIP is to provide a default implementation that
> should be suitable for most applications and allow users to override
> it if they have specific requirements.
>
> In order to build a suitable payload, some additional changes are
> included in this KIP:
>   1. extend the ProcessingContext to hold, when available, the source
> node raw key/value byte[]
>   2. expose the ProcessingContext to the ProductionExceptionHandler,
> it is currently not available in the handle parameters.
>
> Regarding point 2.,  to expose the ProcessingContext to the
> ProductionExceptionHandler, we considered two choices:
>   1. exposing the ProcessingContext as a parameter in the handle()
> method. That's the cleanest way IMHO, but we would need to deprecate
> the old method.
>   2. exposing the ProcessingContext as an attribute in the interface.
> This way, no method is deprecated, but we would not be consistent with
> the other ExceptionHandler.
>
> In the KIP, we chose the 1. solution (new handle signature with old
> one deprecated), but we could use other opinions on this part.
> More information is available directly on the KIP.
>
> KIP link:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams
>
> Feedbacks and suggestions are welcome,
>
> Cheers,
> Damien, Sebastien and Loic
>


[jira] [Created] (KAFKA-16542) The remote node is not a BROKER that supports the METADATA api. (org.apache.kafka.clients.admin.internals.AdminMetadataManager)

2024-04-12 Thread Hiro (Jira)
Hiro created KAFKA-16542:


 Summary: The remote node is not a BROKER that supports the 
METADATA api. (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
 Key: KAFKA-16542
 URL: https://issues.apache.org/jira/browse/KAFKA-16542
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 3.7.0
Reporter: Hiro


I got this error by this cli. but when I executed this command to the broker, 
this was successful. Is kafka-metadata-quorum available only to brokers?

Confluent(ansible) was running that command to the controller.
https://github.com/confluentinc/cp-ansible/blob/master/roles/kafka_controller/tasks/health_check.yml#L5-L6

{code}
sudo /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server :9093 --command-config controller-admin.properties  describe --replication
[2024-04-12 06:36:38,400] WARN [AdminClient clientId=adminclient-1] The remote 
node is not a BROKER that supports the METADATA api. 
(org.apache.kafka.clients.admin.internals.AdminMetadataManager)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh

2024-04-12 Thread Hiro (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hiro resolved KAFKA-16510.
--
Resolution: Fixed

> java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
> --
>
> Key: KAFKA-16510
> URL: https://issues.apache.org/jira/browse/KAFKA-16510
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.4.1
>Reporter: Hiro
>Priority: Major
>
> kafka-metadata-quorum is not available in SASL_PLAIN.
> I got this error, I only use SASL_PLAIN. not use SSL.
> I found a person with a similar situation, but he is using mTLS.
> https://issues.apache.org/jira/browse/KAFKA-16006
> {code:java}
> sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server  ip>:9093 --command-config controller-admin.properties  describe --replication
> [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread 
> ‘kafka-admin-client-thread | adminclient-1': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.lang.OutOfMemoryError: Java heap space
> at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64)
> at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
> at 
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573)
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435)
> at java.base/java.lang.Thread.run(Thread.java:840)
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: describeMetadataQuorum
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: describeMetadataQuorum
> at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
> at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62)
> at 
> org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: describeMetadataQuorum {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16473) KafkaDockerWrapper uses wrong cluster ID when formatting log dir

2024-04-12 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-16473.
---
Fix Version/s: 3.8.0
   3.7.1
   Resolution: Fixed

> KafkaDockerWrapper uses wrong cluster ID when formatting log dir
> 
>
> Key: KAFKA-16473
> URL: https://issues.apache.org/jira/browse/KAFKA-16473
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Sebastian Marsching
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> There is a bug in {{{}KafkaDockerWrapper{}}}, that causes {{Some( CLUSTER_ID environment variable>)}} to be used when formatting the log dir 
> when Kafka is started for the first time inside a Docker container.
> More specifically, the problem is in {{{}formatStorageCmd{}}}: The code uses 
> {{{}env.get("CLUSTER_ID"){}}}, but this returns an {{Option}} not a 
> {{{}String{}}}.
> The code should instead check whether the environment variable is set, 
> raising an exception if it is not set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2807

2024-04-12 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 465869 lines...]
[2024-04-12T05:22:43.131Z] 
[2024-04-12T05:22:43.131Z] Gradle Test Run :streams:test > Gradle Test Executor 
106 > EosIntegrationTest > [exactly_once_v2, processing threads = true] > 
shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing 
threads = true] PASSED
[2024-04-12T05:22:43.131Z] 
[2024-04-12T05:22:43.131Z] Gradle Test Run :streams:test > Gradle Test Executor 
106 > EosIntegrationTest > [exactly_once_v2, processing threads = true] > 
shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_v2, processing threads 
= true] STARTED
[2024-04-12T05:22:43.131Z] 
[2024-04-12T05:22:43.131Z] Gradle Test Run :streams:test > Gradle Test Executor 
106 > EosIntegrationTest > [exactly_once_v2, processing threads = true] > 
shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_v2, processing threads 
= true] PASSED
[2024-04-12T05:22:43.131Z] 
[2024-04-12T05:22:43.131Z] 7718 tests completed, 7 failed, 25 skipped
[2024-04-12T05:22:43.942Z] There were failing tests. See the report at: 
file:///home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka_trunk/streams/build/reports/tests/test/index.html
[2024-04-12T05:22:46.423Z] 
[2024-04-12T05:22:46.423Z] FAILURE: Build completed with 2 failures.
[2024-04-12T05:22:46.423Z] 
[2024-04-12T05:22:46.423Z] 1: Task failed with an exception.
[2024-04-12T05:22:46.423Z] ---
[2024-04-12T05:22:46.423Z] * What went wrong:
[2024-04-12T05:22:46.423Z] Execution failed for task ':core:test'.
[2024-04-12T05:22:46.423Z] > Process 'Gradle Test Executor 11' finished with 
non-zero exit value 137
[2024-04-12T05:22:46.423Z]   This problem might be caused by incorrect test 
process configuration.
[2024-04-12T05:22:46.423Z]   For more on test execution, please refer to 
https://docs.gradle.org/8.6/userguide/java_testing.html#sec:test_execution in 
the Gradle documentation.
[2024-04-12T05:22:46.423Z] 
[2024-04-12T05:22:46.423Z] * Try:
[2024-04-12T05:22:46.423Z] > Run with --stacktrace option to get the stack 
trace.
[2024-04-12T05:22:46.423Z] > Run with --info or --debug option to get more log 
output.
[2024-04-12T05:22:46.423Z] > Get more help at https://help.gradle.org.
[2024-04-12T05:22:46.423Z] 
==
[2024-04-12T05:22:46.423Z] 
[2024-04-12T05:22:46.423Z] 2: Task failed with an exception.
[2024-04-12T05:22:46.423Z] ---
[2024-04-12T05:22:46.423Z] * What went wrong:
[2024-04-12T05:22:46.423Z] Execution failed for task ':connect:mirror:test'.
[2024-04-12T05:22:46.423Z] > Process 'Gradle Test Executor 39' finished with 
non-zero exit value 137
[2024-04-12T05:22:46.423Z]   This problem might be caused by incorrect test 
process configuration.
[2024-04-12T05:22:46.423Z]   For more on test execution, please refer to 
https://docs.gradle.org/8.6/userguide/java_testing.html#sec:test_execution in 
the Gradle documentation.
[2024-04-12T05:22:46.423Z] 
[2024-04-12T05:22:46.423Z] * Try:
[2024-04-12T05:22:46.423Z] > Run with --stacktrace option to get the stack 
trace.
[2024-04-12T05:22:46.423Z] > Run with --info or --debug option to get more log 
output.
[2024-04-12T05:22:46.423Z] > Get more help at https://help.gradle.org.
[2024-04-12T05:22:46.423Z] 
==
[2024-04-12T05:22:46.423Z] 
[2024-04-12T05:22:46.423Z] Deprecated Gradle features were used in this build, 
making it incompatible with Gradle 9.0.
[2024-04-12T05:22:46.423Z] 
[2024-04-12T05:22:46.423Z] You can use '--warning-mode all' to show the 
individual deprecation warnings and determine if they come from your own 
scripts or plugins.
[2024-04-12T05:22:46.423Z] 
[2024-04-12T05:22:46.423Z] For more on this, please refer to 
https://docs.gradle.org/8.6/userguide/command_line_interface.html#sec:command_line_warnings
 in the Gradle documentation.
[2024-04-12T05:22:46.423Z] 
[2024-04-12T05:22:46.423Z] BUILD FAILED in 5h 30m 33s
[2024-04-12T05:22:46.423Z] 321 actionable tasks: 119 executed, 202 up-to-date
[2024-04-12T05:22:46.423Z] 
[2024-04-12T05:22:46.423Z] Publishing build scan...
[2024-04-12T05:22:47.983Z] https://ge.apache.org/s/5mhyq5ez7r6f6
[2024-04-12T05:22:47.983Z] 
[2024-04-12T05:22:47.983Z] 
[2024-04-12T05:22:47.983Z] See the profiling report at: 
file:///home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka_trunk/build/reports/profile/profile-2024-04-11-23-52-20.html
[2024-04-12T05:22:47.983Z] A fine-grained performance profile is available: use 
the --scan option.
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // node
[Pipeline] }
[Pipeline] // timestamps
[Pipeline] }
[Pipeline] // timeout
[Pipeline] }
[Pipeline] // stage
[Pipeline] }
Failed in branch JDK 11 and Scala 2.13
[2024-04-12T05:24:07.105Z]