Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
Hi all, How should we proceed here? 1. with the plain .checkpoint file 2. with a way to use the state store interface on unassigned but locally existing task state While I like option 2, I think option 1 is less risky and will give us the benefits of transactional state stores sooner. We should consider the interface approach afterwards, though. Best, Bruno On 4/17/24 3:15 PM, Bruno Cadonna wrote: Hi Nick and Sophie, I think the task ID is not enough to create a state store that can read the offsets of non-assigned tasks for lag computation during rebalancing. The state store also needs the state directory so that it knows where to find the information that it needs to return from changelogOffsets(). In general, I think we should proceed with the plain .checkpoint file for now and iterate back to the state store solution later since it seems it is not that straightforward. Alternatively, Nick could timebox an effort to better understand what would be needed for the state store solution. Nick, let us know your decision. Regarding your question about the state store instance. I am not too familiar with that part of the code, but I think the state store is build when the processor topology is build and the processor topology is build per stream task. So there is one instance of processor topology and state store per stream task. Try to follow the call in [1]. Best, Bruno [1] https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153 On 4/16/24 8:59 PM, Nick Telford wrote: That does make sense. The one thing I can't figure out is how per-Task StateStore instances are constructed. It looks like we construct one StateStore instance for the whole Topology (in InternalTopologyBuilder), and pass that into ProcessorStateManager (via StateManagerUtil) for each Task, which then initializes it. This can't be the case though, otherwise multiple partitions of the same sub-topology (aka Tasks) would share the same StateStore instance, which they don't. What am I missing? On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman wrote: I don't think we need to *require* a constructor accept the TaskId, but we would definitely make sure that the RocksDB state store changes its constructor to one that accepts the TaskID (which we can do without deprecation since its an internal API), and custom state stores can just decide for themselves whether they want to opt-in/use the TaskId param or not. I mean custom state stores would have to opt-in anyways by implementing the new StoreSupplier#get(TaskId) API and the only reason to do that would be to have created a constructor that accepts a TaskId Just to be super clear about the proposal, this is what I had in mind. It's actually fairly simple and wouldn't add much to the scope of the KIP (I think -- if it turns out to be more complicated than I'm assuming, we should definitely do whatever has the smallest LOE to get this done Anyways, the (only) public API changes would be to add this new method to the StoreSupplier API: default T get(final TaskId taskId) { return get(); } We can decide whether or not to deprecate the old #get but it's not really necessary and might cause a lot of turmoil, so I'd personally say we just leave both APIs in place. And that's it for public API changes! Internally, we would just adapt each of the rocksdb StoreSupplier classes to implement this new API. So for example with the RocksDBKeyValueBytesStoreSupplier, we just add @Override public KeyValueStore get(final TaskId taskId) { return returnTimestampedStore ? new RocksDBTimestampedStore(name, metricsScope(), taskId) : new RocksDBStore(name, metricsScope(), taskId); } And of course add the TaskId parameter to each of the actual state store constructors returned here. Does that make sense? It's entirely possible I'm missing something important here, but I think this would be a pretty small addition that would solve the problem you mentioned earlier while also being useful to anyone who uses custom state stores. On Mon, Apr 15, 2024 at 10:21 AM Nick Telford wrote: Hi Sophie, Interesting idea! Although what would that mean for the StateStore interface? Obviously we can't require that the constructor take the TaskId. Is it enough to add the parameter to the StoreSupplier? Would doing this be in-scope for this KIP, or are we over-complicating it? Nick On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman wrote: Somewhat minor point overall, but it actually drives me crazy that you can't get access to the taskId of a StateStore until #init is called. This has caused me a huge headache personally (since the same is true for processors and I was trying to do something that's probably too hacky to actually complain about here lol) Can we just change the StateStoreSupplier to receive and pass along t
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi everyone, Following all the discussion on this KIP and KIP-1033, we introduced a new container class containing only processing context metadata: ProcessingMetadata. This new container class is actually part of KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I think it's the wisest implementation wise. I also clarified the interface of the enums: withDeadLetterQueueRecords(Iterable> deadLetterQueueRecords) . Very likely most users would just send one DLQ record, but there might be specific use-cases and what can do more can do less, so I added an Iterable. I took some time to think about the impact of storing the ProcessingMetadata on the ProductionExceptionHandler. I think storing the topic/offset/partition should be fine, but I am concerned about storing the rawSourceKey/Value. I think it could impact some specific use-cases, for example, a high-throughput Kafka Streams application "counting" messages could have huge source input messages, and very small sink messages, here, I assume storing the rawSourceKey/Value could significantly require more memory than the actual Kafka Producer buffer. I think the safest approach is actually to only store the fixed-size metadata for the ProductionExceptionHandler.handle: topic/partition/offset/processorNodeId/taskId, it might be confusing for the user, but 1) it is still better than nowaday where there are no context information at all, 2) it would be clearly stated in the javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the punctuate case). . Do you think it would be a suitable design Sophie? Cheers, Damien On Sun, 14 Apr 2024 at 21:30, Loic Greffier wrote: > > Hi Sophie, > > Thanks for your feedback. > Completing the Damien's comments here for points S1 and S5B. > > S1: > > I'm confused -- are you saying that we're introducing a new kind of > > ProducerRecord class for this? > > I am wondering if it makes sense to alter the ProducerRecord from Clients API > with a "deadLetterQueueTopicName" attribute dedicated to Kafka Streams DLQ. > Adding "deadLetterQueueTopicName" as an additional parameter to > "withDeadLetterQueueRecord" is a good option, and may allow users to send > records to different DLQ topics depending on conditions: > @Override > public ProductionExceptionHandlerResponse handle(final ProcessingContext > context, > ProducerRecord byte[]> record, > Exception exception) { > if (condition1) { > return ProductionExceptionHandlerResponse.CONTINUE >.withDeadLetterQueueRecord(record, "dlq-topic-a"); > } > if (condition2) { > return ProductionExceptionHandlerResponse.CONTINUE > .withDeadLetterQueueRecord(record, "dlq-topic-b"); > } > return ProductionExceptionHandlerResponse.CONTINUE > .withDeadLetterQueueRecord(record, "dlq-topic-c"); > } > > S5B: > > I was having a bit of trouble understanding what the behavior would be if > > someone configured a "errors.deadletterqueue.topic.name" but didn't > > implement the handlers. > > The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler and > DefaultProductionExceptionHandler should be able to tell if records should be > sent to DLQ or not. > The "errors.deadletterqueue.topic.name" takes place to: > > * Specifying if the provided handlers should or should not send records > to DLQ. > * If the value is empty, the handlers should not send records to DLQ. > * If the value is not empty, the handlers should send records to DLQ. > * Define the name of the DLQ topic that should be used by the provided > handlers. > > Thus, if "errors.deadletterqueue.topic.name" is defined, the provided > handlers should return either: > > * CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue) > * FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue). > If "errors.deadletterqueue.topic.name" is defined but neither > DeserializationExceptionHandler nor ProductionExceptionHandler classes are > defined in the configuration, then nothing should happen as sending to DLQ is > based on handlers’ response. > When providing custom handlers, users would have the possibility to return: > > * FAIL > * CONTINUE > * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") > > A DLQ topic name is currently required using the two last response types. > I am wondering if it could benefit users to ease the use of the default DLQ > topic "errors.deadletterqueue.topic.name" when implementing custom handlers, > with such kind of implementation: > > * FAIL.withDefaultDeadLetterQueueRecord(record) > * CONTINUE.withDefaultDeadLetterQueueRecord(record) > > Regards, > Loïc > > De : Damien Gasparina > Envoyé : dimanche 14 avril 2024 20:24 > À : dev@kafka.apache.org > Objet : [EXT] Re: [DI
Re: [VOTE] KIP-1025: Optionally URL-encode clientID and clientSecret in authorization header
Hi all, Just a kind reminder. I would really appreciate if we could get two more binding +1 votes. Thanks On Mon, Apr 8, 2024, 2:08 PM Manikumar wrote: > Thanks for the KIP. > > +1 (binding) > > > > > On Mon, Apr 8, 2024 at 9:49 AM Kirk True wrote: > > > > +1 (non-binding) > > > > Apologies. I thought I’d already voted :( > > > > > On Apr 7, 2024, at 10:48 AM, Nelson B. > wrote: > > > > > > Hi all, > > > > > > Just wanted to bump up this thread for visibility. > > > > > > Thanks! > > > > > > On Thu, Mar 28, 2024 at 3:40 AM Doğuşcan Namal < > namal.dogus...@gmail.com> > > > wrote: > > > > > >> Thanks for checking it out Nelson. Yeah I think it makes sense to > leave it > > >> for the users who want to use it for testing. > > >> > > >> On Mon, 25 Mar 2024 at 20:44, Nelson B. > wrote: > > >> > > >>> Hi Doğuşcan, > > >>> > > >>> Thanks for your vote! > > >>> > > >>> Currently, the usage of TLS depends on the protocol used by the > > >>> authorization server which is configured > > >>> through the "sasl.oauthbearer.token.endpoint.url" option. So, if the > > >>> URL address uses simple http (not https) > > >>> then secrets will be transmitted in plaintext. I think it's possible > to > > >>> enforce using only https but I think any > > >>> production-grade authorization server uses https anyway and maybe > users > > >> may > > >>> want to test using http in the dev environment. > > >>> > > >>> Thanks, > > >>> > > >>> On Thu, Mar 21, 2024 at 3:56 PM Doğuşcan Namal < > namal.dogus...@gmail.com > > >>> > > >>> wrote: > > >>> > > Hi Nelson, thanks for the KIP. > > > > From the RFC: > > ``` > > The authorization server MUST require the use of TLS as described in > > Section 1.6 when sending requests using password authentication. > > ``` > > > > I believe we already have an enforcement for OAuth to be enabled > only > > >> in > > SSLChannel but would be good to double check. Sending secrets over > > plaintext is a security bad practice :) > > > > +1 (non-binding) from me. > > > > On Tue, 19 Mar 2024 at 16:00, Nelson B. > > >> wrote: > > > > > Hi all, > > > > > > I would like to start a vote on KIP-1025 > > > < > > > > > > > >>> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1025%3A+Optionally+URL-encode+clientID+and+clientSecret+in+authorization+header > > >> , > > > which would optionally URL-encode clientID and clientSecret in the > > > authorization header. > > > > > > I feel like all possible issues have been addressed in the > discussion > > > thread. > > > > > > Thanks, > > > > > > > >>> > > >> > > >
[jira] [Created] (KAFKA-16595) Introduce template in ClusterTests
Kuan Po Tseng created KAFKA-16595: - Summary: Introduce template in ClusterTests Key: KAFKA-16595 URL: https://issues.apache.org/jira/browse/KAFKA-16595 Project: Kafka Issue Type: Improvement Reporter: Kuan Po Tseng Assignee: Kuan Po Tseng discussed in https://github.com/apache/kafka/pull/15761#discussion_r1573850549 Currently we can't apply any template in ClusterTests, thus we have to write down all ClusterConfigProperty in each ClusterTest inside ClusterTests. And that could leave bunch of duplicate code. We need to find a way to reduce the duplicate code. Introduce template in ClusterTests could be a solution. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi Everyone, Following the discussions on KIP-1033 and KIP-1034, we did a few changes: - We introduced a new ProcessingMetadata class containing only the ProcessorContext metadata: topic, partition, offset, headers[], sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName - To be consistent, we propose to deprecate the existing DeserializationExceptionHandler and ProductionExceptionHandler methods to rely on the new ProcessingMetadata - The creation and the ProcessingMetadata and the deprecation of old methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on Dead Letter Queue implementation without touching any interfaces. We introduced a hard dependency for KIP-1034 regarding KIP-1033, we think it's the wisest implementation wise. - Instead of creating a new metric, KIP-1033 updates the dropped-record metric. Let me know what you think, if everything's fine, I think we should be good to start a VOTE? Cheers, Damien On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman wrote: > > Fully agree about creating a new class for the bits of ProcessingContext > that are specific to metadata only. In fact, more or less this same point > just came up in the related KIP 1034 for DLQs, since the RecordMetadata > can't always be trusted to remain immutable. Maybe it's possible to solve > both issues at once, with the same class? > > On another related note -- I had actually also just proposed that we > deprecate the existing DeserializationExceptionHandler method and replace > it with one using the new PAPI as part of KIP-1034. But now that I'm > reading this, I would say it probably makes more sense to do in this KIP. > We can also push that out into a smaller-scoped third KIP if you want, but > clearly, there is some overlap here and so however you guys (the authors) > want to organize this part of the work is fine with me. I do think it > should be done alongside/before this KIP and 1034 though, for all the > reasons already stated. > > Everything else in the discussion so far I agree with! The > ProcessingContext thing is the only open question in my mind > > On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina > wrote: > > > Hi Matthias, Bruno, > > > > 1.a During my previous comment, by Processor Node ID, I meant > > Processor name. This is important information to expose in the handler > > as it allows users to identify the location of the exception in the > > topology. > > I assume this information could be useful in other places, that's why > > I would lean toward adding this as an attribute in the > > ProcessingContext. > > > > 1.b Looking at the ProcessingContext, I do think the following 3 > > methods should not be accessible in the exception handlers: > > getStateStore(), schedule() and commit(). > > Having a separate interface would make a cleaner signature. It would > > also be a great time to ensure that all exception handlers are > > consistent, at the moment, the > > DeserializationExceptionHandler.handle() relies on the old PAPI > > ProcessorContext and the ProductionExceptionHandler.handle() has none. > > It could make sense to build the new interface in this KIP and track > > the effort to migrate the existing handlers in a separate KIP, what do > > you think? > > Maybe I am overthinking this part and the ProcessingContext would be fine. > > > > 4. Good point regarding the dropped-record metric, as it is used by > > the other handlers, I do think it makes sense to leverage it instead > > of creating a new metric. > > I will update the KIP to update the dropped-record-metric. > > > > 8. Regarding the DSL, I am aligned with Bruno, I think we could close > > the gaps in a future KIP. > > > > Cheers, > > Damien > > > > > > On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna wrote: > > > > > > Hi Matthias, > > > > > > > > > 1.a > > > With processor node ID, I mean the ID that is exposed in the tags of > > > processor node metrics. That ID cannot be internal since it is exposed > > > in metrics. I think the processor name and the processor node ID is the > > > same thing. I followed how the processor node ID is set in metrics and I > > > ended up in addProcessor(name, ...). > > > > > > > > > 1.b > > > Regarding ProcessingContext, I also thought about a separate class to > > > pass-in context information into the handler, but then I dismissed the > > > idea because I thought I was overthinking it. Apparently, I was not > > > overthinking it if you also had the same idea. So let's consider a > > > separate class. > > > > > > > > > 4. > > > Regarding the metric, thanks for pointing to the dropped-record metric, > > > Matthias. The dropped-record metric is used with the deserialization > > > handler and the production handler. So, it would make sense to also use > > > it for this handler. However, the dropped-record metric only records > > > records that are skipped by the handler and not the number of calls to > > > the handler. But that difference is probably irrelevant since in case of > >
[jira] [Created] (KAFKA-16594) Add a test to detect CDS errors
Vedarth Sharma created KAFKA-16594: -- Summary: Add a test to detect CDS errors Key: KAFKA-16594 URL: https://issues.apache.org/jira/browse/KAFKA-16594 Project: Kafka Issue Type: Sub-task Reporter: Vedarth Sharma Assignee: Vedarth Sharma Currently pipeline cannot detect whether CDS is working as expected or not. A test for this will help. -- This message was sent by Atlassian Jira (v8.20.10#820010)