Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-21 Thread Bruno Cadonna

Hi all,

How should we proceed here?

1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but locally 
existing task state


While I like option 2, I think option 1 is less risky and will give us 
the benefits of transactional state stores sooner. We should consider 
the interface approach afterwards, though.



Best,
Bruno



On 4/17/24 3:15 PM, Bruno Cadonna wrote:

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can read 
the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the state directory so that it 
knows where to find the information that it needs to return from 
changelogOffsets().


In general, I think we should proceed with the plain .checkpoint file 
for now and iterate back to the state store solution later since it 
seems it is not that straightforward. Alternatively, Nick could timebox 
an effort to better understand what would be needed for the state store 
solution. Nick, let us know your decision.


Regarding your question about the state store instance. I am not too 
familiar with that part of the code, but I think the state store is 
build when the processor topology is build and the processor topology is 
build per stream task. So there is one instance of processor topology 
and state store per stream task. Try to follow the call in [1].


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153




On 4/16/24 8:59 PM, Nick Telford wrote:

That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole Topology
(in InternalTopologyBuilder), and pass that into ProcessorStateManager 
(via

StateManagerUtil) for each Task, which then initializes it.

This can't be the case though, otherwise multiple partitions of the same
sub-topology (aka Tasks) would share the same StateStore instance, which
they don't.

What am I missing?

On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman 
wrote:

I don't think we need to *require* a constructor accept the TaskId, 
but we

would definitely make sure that the RocksDB state store changes its
constructor to one that accepts the TaskID (which we can do without
deprecation since its an internal API), and custom state stores can just
decide for themselves whether they want to opt-in/use the TaskId param
or not. I mean custom state stores would have to opt-in anyways by
implementing the new StoreSupplier#get(TaskId) API and the only
reason to do that would be to have created a constructor that accepts
a TaskId

Just to be super clear about the proposal, this is what I had in mind.
It's actually fairly simple and wouldn't add much to the scope of the
KIP (I think -- if it turns out to be more complicated than I'm 
assuming,

we should definitely do whatever has the smallest LOE to get this done

Anyways, the (only) public API changes would be to add this new
method to the StoreSupplier API:

default T get(final TaskId taskId) {
 return get();
}

We can decide whether or not to deprecate the old #get but it's not
really necessary and might cause a lot of turmoil, so I'd personally
say we just leave both APIs in place.

And that's it for public API changes! Internally, we would just adapt
each of the rocksdb StoreSupplier classes to implement this new
API. So for example with the RocksDBKeyValueBytesStoreSupplier,
we just add

@Override
public KeyValueStore get(final TaskId taskId) {
 return returnTimestampedStore ?
 new RocksDBTimestampedStore(name, metricsScope(), taskId) :
 new RocksDBStore(name, metricsScope(), taskId);
}

And of course add the TaskId parameter to each of the actual
state store constructors returned here.

Does that make sense? It's entirely possible I'm missing something
important here, but I think this would be a pretty small addition that
would solve the problem you mentioned earlier while also being
useful to anyone who uses custom state stores.

On Mon, Apr 15, 2024 at 10:21 AM Nick Telford 
wrote:


Hi Sophie,

Interesting idea! Although what would that mean for the StateStore
interface? Obviously we can't require that the constructor take the

TaskId.

Is it enough to add the parameter to the StoreSupplier?

Would doing this be in-scope for this KIP, or are we over-complicating

it?


Nick

On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman 


wrote:


Somewhat minor point overall, but it actually drives me crazy that you
can't get access to the taskId of a StateStore until #init is called.

This

has caused me a huge headache personally (since the same is true for
processors and I was trying to do something that's probably too hacky

to

actually complain about here lol)

Can we just change the StateStoreSupplier to receive and pass along 
t

Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-21 Thread Damien Gasparina
Hi everyone,

Following all the discussion on this KIP and KIP-1033, we introduced a
new container class containing only processing context metadata:
ProcessingMetadata. This new container class is actually part of
KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
think it's the wisest implementation wise.

I also clarified the interface of the enums:
withDeadLetterQueueRecords(Iterable> deadLetterQueueRecords) . Very likely most users would just
send one DLQ record, but there might be specific use-cases and what
can do more can do less, so I added an Iterable.

I took some time to think about the impact of storing the
ProcessingMetadata on the ProductionExceptionHandler. I think storing
the topic/offset/partition should be fine, but I am concerned about
storing the rawSourceKey/Value. I think it could impact some specific
use-cases, for example, a high-throughput Kafka Streams application
"counting" messages could have huge source input messages, and very
small sink messages, here, I assume storing the rawSourceKey/Value
could significantly require more memory than the actual Kafka Producer
buffer.

I think the safest approach is actually to only store the fixed-size
metadata for the ProductionExceptionHandler.handle:
topic/partition/offset/processorNodeId/taskId, it might be confusing
for the user, but 1) it is still better than nowaday where there are
no context information at all, 2) it would be clearly stated in the
javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
punctuate case). .

Do you think it would be a suitable design Sophie?

Cheers,
Damien

On Sun, 14 Apr 2024 at 21:30, Loic Greffier  wrote:
>
> Hi Sophie,
>
> Thanks for your feedback.
> Completing the Damien's comments here for points S1 and S5B.
>
> S1:
> > I'm confused -- are you saying that we're introducing a new kind of 
> > ProducerRecord class for this?
>
> I am wondering if it makes sense to alter the ProducerRecord from Clients API 
> with a "deadLetterQueueTopicName" attribute dedicated to Kafka Streams DLQ.
> Adding "deadLetterQueueTopicName" as an additional parameter to 
> "withDeadLetterQueueRecord" is a good option, and may allow users to send 
> records to different DLQ topics depending on conditions:
> @Override
> public ProductionExceptionHandlerResponse handle(final ProcessingContext 
> context,
>  ProducerRecord byte[]> record,
>  Exception exception) {
> if (condition1) {
> return ProductionExceptionHandlerResponse.CONTINUE
>.withDeadLetterQueueRecord(record, "dlq-topic-a");
> }
> if (condition2) {
> return ProductionExceptionHandlerResponse.CONTINUE
> .withDeadLetterQueueRecord(record, "dlq-topic-b");
> }
> return ProductionExceptionHandlerResponse.CONTINUE
> .withDeadLetterQueueRecord(record, "dlq-topic-c");
> }
>
> S5B:
> > I was having a bit of trouble understanding what the behavior would be if 
> > someone configured a "errors.deadletterqueue.topic.name" but didn't 
> > implement the handlers.
>
> The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler and 
> DefaultProductionExceptionHandler should be able to tell if records should be 
> sent to DLQ or not.
> The "errors.deadletterqueue.topic.name" takes place to:
>
>   *   Specifying if the provided handlers should or should not send records 
> to DLQ.
>  *   If the value is empty, the handlers should not send records to DLQ.
>  *   If the value is not empty, the handlers should send records to DLQ.
>   *   Define the name of the DLQ topic that should be used by the provided 
> handlers.
>
> Thus, if "errors.deadletterqueue.topic.name" is defined, the provided 
> handlers should return either:
>
>   *   CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue)
>   *   FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue).
> If "errors.deadletterqueue.topic.name" is defined but neither 
> DeserializationExceptionHandler nor ProductionExceptionHandler classes are 
> defined in the configuration, then nothing should happen as sending to DLQ is 
> based on handlers’ response.
> When providing custom handlers, users would have the possibility to return:
>
>   *   FAIL
>   *   CONTINUE
>   *   FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
>   *   CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")
>
> A DLQ topic name is currently required using the two last response types.
> I am wondering if it could benefit users to ease the use of the default DLQ 
> topic "errors.deadletterqueue.topic.name" when implementing custom handlers, 
> with such kind of implementation:
>
>   *   FAIL.withDefaultDeadLetterQueueRecord(record)
>   *   CONTINUE.withDefaultDeadLetterQueueRecord(record)
>
> Regards,
> Loïc
>
> De : Damien Gasparina 
> Envoyé : dimanche 14 avril 2024 20:24
> À : dev@kafka.apache.org
> Objet : [EXT] Re: [DI

Re: [VOTE] KIP-1025: Optionally URL-encode clientID and clientSecret in authorization header

2024-04-21 Thread Nelson B.
Hi all,

Just a kind reminder. I would really appreciate if we could get two more
binding +1 votes.

Thanks

On Mon, Apr 8, 2024, 2:08 PM Manikumar  wrote:

> Thanks for the KIP.
>
> +1 (binding)
>
>
>
>
> On Mon, Apr 8, 2024 at 9:49 AM Kirk True  wrote:
> >
> > +1 (non-binding)
> >
> > Apologies. I thought I’d already voted :(
> >
> > > On Apr 7, 2024, at 10:48 AM, Nelson B. 
> wrote:
> > >
> > > Hi all,
> > >
> > > Just wanted to bump up this thread for visibility.
> > >
> > > Thanks!
> > >
> > > On Thu, Mar 28, 2024 at 3:40 AM Doğuşcan Namal <
> namal.dogus...@gmail.com>
> > > wrote:
> > >
> > >> Thanks for checking it out Nelson. Yeah I think it makes sense to
> leave it
> > >> for the users who want to use it for testing.
> > >>
> > >> On Mon, 25 Mar 2024 at 20:44, Nelson B. 
> wrote:
> > >>
> > >>> Hi Doğuşcan,
> > >>>
> > >>> Thanks for your vote!
> > >>>
> > >>> Currently, the usage of TLS depends on the protocol used by the
> > >>> authorization server which is configured
> > >>> through the "sasl.oauthbearer.token.endpoint.url" option. So, if the
> > >>> URL address uses simple http (not https)
> > >>> then secrets will be transmitted in plaintext. I think it's possible
> to
> > >>> enforce using only https but I think any
> > >>> production-grade authorization server uses https anyway and maybe
> users
> > >> may
> > >>> want to test using http in the dev environment.
> > >>>
> > >>> Thanks,
> > >>>
> > >>> On Thu, Mar 21, 2024 at 3:56 PM Doğuşcan Namal <
> namal.dogus...@gmail.com
> > >>>
> > >>> wrote:
> > >>>
> >  Hi Nelson, thanks for the KIP.
> > 
> >  From the RFC:
> >  ```
> >  The authorization server MUST require the use of TLS as described in
> >    Section 1.6 when sending requests using password authentication.
> >  ```
> > 
> >  I believe we already have an enforcement for OAuth to be enabled
> only
> > >> in
> >  SSLChannel but would be good to double check. Sending secrets over
> >  plaintext is a security bad practice :)
> > 
> >  +1 (non-binding) from me.
> > 
> >  On Tue, 19 Mar 2024 at 16:00, Nelson B. 
> > >> wrote:
> > 
> > > Hi all,
> > >
> > > I would like to start a vote on KIP-1025
> > > <
> > >
> > 
> > >>>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1025%3A+Optionally+URL-encode+clientID+and+clientSecret+in+authorization+header
> > >> ,
> > > which would optionally URL-encode clientID and clientSecret in the
> > > authorization header.
> > >
> > > I feel like all possible issues have been addressed in the
> discussion
> > > thread.
> > >
> > > Thanks,
> > >
> > 
> > >>>
> > >>
> >
>


[jira] [Created] (KAFKA-16595) Introduce template in ClusterTests

2024-04-21 Thread Kuan Po Tseng (Jira)
Kuan Po Tseng created KAFKA-16595:
-

 Summary: Introduce template in ClusterTests
 Key: KAFKA-16595
 URL: https://issues.apache.org/jira/browse/KAFKA-16595
 Project: Kafka
  Issue Type: Improvement
Reporter: Kuan Po Tseng
Assignee: Kuan Po Tseng


discussed in https://github.com/apache/kafka/pull/15761#discussion_r1573850549

Currently we can't apply any template in ClusterTests, thus we have to write 
down all ClusterConfigProperty in each ClusterTest inside ClusterTests. And 
that could leave bunch of duplicate code. We need to find a way to reduce the 
duplicate code. Introduce template in ClusterTests could be a solution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-21 Thread Damien Gasparina
Hi Everyone,

Following the discussions on KIP-1033 and KIP-1034, we did a few changes:
  - We introduced a new ProcessingMetadata class containing only the
ProcessorContext metadata: topic, partition, offset, headers[],
sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName
  - To be consistent, we propose to deprecate the existing
DeserializationExceptionHandler and ProductionExceptionHandler methods
to rely on the new ProcessingMetadata
  - The creation and the ProcessingMetadata and the deprecation of old
methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on
Dead Letter Queue implementation without touching any interfaces. We
introduced a hard dependency for KIP-1034 regarding KIP-1033, we think
it's the wisest implementation wise.
   - Instead of creating a new metric, KIP-1033 updates the
dropped-record metric.

Let me know what you think, if everything's fine, I think we should be
good to start a VOTE?

Cheers,
Damien





On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman  wrote:
>
> Fully agree about creating a new class for the bits of ProcessingContext
> that are specific to metadata only. In fact, more or less this same point
> just came up in the related KIP 1034 for DLQs, since the RecordMetadata
> can't always be trusted to remain immutable. Maybe it's possible to solve
> both issues at once, with the same class?
>
> On another related note -- I had actually also just proposed that we
> deprecate the existing DeserializationExceptionHandler method and replace
> it with one using the new PAPI as part of KIP-1034. But now that I'm
> reading this, I would say it probably makes more sense to do in this KIP.
> We can also push that out into a smaller-scoped third KIP if you want, but
> clearly, there is some overlap here and so however you guys (the authors)
> want to organize this part of the work is fine with me. I do think it
> should be done alongside/before this KIP and 1034 though, for all the
> reasons already stated.
>
> Everything else in the discussion so far I agree with! The
> ProcessingContext thing is the only open question in my mind
>
> On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina 
> wrote:
>
> > Hi Matthias, Bruno,
> >
> > 1.a During my previous comment, by Processor Node ID, I meant
> > Processor name. This is important information to expose in the handler
> > as it allows users to identify the location of the exception in the
> > topology.
> > I assume this information could be useful in other places, that's why
> > I would lean toward adding this as an attribute in the
> > ProcessingContext.
> >
> > 1.b Looking at the ProcessingContext, I do think the following 3
> > methods should not be accessible in the exception handlers:
> > getStateStore(), schedule() and commit().
> > Having a separate interface would make a cleaner signature. It would
> > also be a great time to ensure that all exception handlers are
> > consistent, at the moment, the
> > DeserializationExceptionHandler.handle() relies on the old PAPI
> > ProcessorContext and the ProductionExceptionHandler.handle() has none.
> > It could make sense to build the new interface in this KIP and track
> > the effort to migrate the existing handlers in a separate KIP, what do
> > you think?
> > Maybe I am overthinking this part and the ProcessingContext would be fine.
> >
> > 4. Good point regarding the dropped-record metric, as it is used by
> > the other handlers, I do think it makes sense to leverage it instead
> > of creating a new metric.
> > I will update the KIP to update the dropped-record-metric.
> >
> > 8. Regarding the DSL, I am aligned with Bruno, I think we could close
> > the gaps in a future KIP.
> >
> > Cheers,
> > Damien
> >
> >
> > On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna  wrote:
> > >
> > > Hi Matthias,
> > >
> > >
> > > 1.a
> > > With processor node ID, I mean the ID that is exposed in the tags of
> > > processor node metrics. That ID cannot be internal since it is exposed
> > > in metrics. I think the processor name and the processor node ID is the
> > > same thing. I followed how the processor node ID is set in metrics and I
> > > ended up in addProcessor(name, ...).
> > >
> > >
> > > 1.b
> > > Regarding ProcessingContext, I also thought about a separate class to
> > > pass-in context information into the handler, but then I dismissed the
> > > idea because I thought I was overthinking it. Apparently, I was not
> > > overthinking it if you also had the same idea. So let's consider a
> > > separate class.
> > >
> > >
> > > 4.
> > > Regarding the metric, thanks for pointing to the dropped-record metric,
> > > Matthias. The dropped-record metric is used with the deserialization
> > > handler and the production handler. So, it would make sense to also use
> > > it for this handler. However, the dropped-record metric only records
> > > records that are skipped by the handler and not the number of calls to
> > > the handler. But that difference is probably irrelevant since in case of
> >

[jira] [Created] (KAFKA-16594) Add a test to detect CDS errors

2024-04-21 Thread Vedarth Sharma (Jira)
Vedarth Sharma created KAFKA-16594:
--

 Summary: Add a test to detect CDS errors
 Key: KAFKA-16594
 URL: https://issues.apache.org/jira/browse/KAFKA-16594
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vedarth Sharma
Assignee: Vedarth Sharma


Currently pipeline cannot detect whether CDS is working as expected or not. A 
test for this will help.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)