Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.5 #25

2023-06-22 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1949

2023-06-22 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-15098) KRaft migration does not proceed and broker dies if authorizer.class.name is set

2023-06-22 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-15098.
--
Resolution: Fixed

> KRaft migration does not proceed and broker dies if authorizer.class.name is 
> set
> 
>
> Key: KAFKA-15098
> URL: https://issues.apache.org/jira/browse/KAFKA-15098
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Ron Dagostino
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.6.0, 3.5.1
>
>
> [ERROR] 2023-06-16 20:14:14,298 [main] kafka.Kafka$ - Exiting Kafka due to 
> fatal exception
> java.lang.IllegalArgumentException: requirement failed: ZooKeeper migration 
> does not yet support authorizers. Remove authorizer.class.name before 
> performing a migration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15109) ISR shrink/expand issues on ZK brokers during migration

2023-06-22 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-15109.
--
Resolution: Fixed

> ISR shrink/expand issues on ZK brokers during migration
> ---
>
> Key: KAFKA-15109
> URL: https://issues.apache.org/jira/browse/KAFKA-15109
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, replication
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.6.0
>
>
> KAFKA-15021 introduced a new controller behavior that avoids increasing the 
> leader epoch during the controlled shutdown scenario. This prevents some 
> unnecessary thrashing of metadata and threads on the brokers and clients. 
> While a cluster is in a KIP-866 migration and has a KRaft controller with ZK 
> brokers, we cannot employ this leader epoch bump avoidance. The ZK brokers 
> must have the leader epoch bump in order for ReplicaManager to react to the 
> LeaderAndIsrRequest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1948

2023-06-22 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-941: Range queries to accept null lower and upper bounds

2023-06-22 Thread Sophie Blee-Goldman
Hey Lucia, thanks for the KIP! Just some minor notes:

I'm in favor of the proposal overall, at least I think so -- for someone
not intimately familiar with the new IQ API and *RangeQuery* class, the KIP
was a bit difficult to follow along and I had to read in between the lines
to figure out what the old behavior was and what the new and improved logic
would do.

It would be good to state clearly in the beginning what happens when null
is passed in right now, and what will happen after this KIP is implemented.
For example in the "Public Interfaces" section, I couldn't tell if the
middle sentence was describing what was changing, or what it was changing
*to.*

One last little thing is can you link to the jira ticket at the top? And
please create one if it doesn't already exist -- it helps people figure out
when a KIP has been implemented and in which versions, as well as navigate
from the KIP to the actual code that was merged. Things can change during
implementation and the KIP document is how most people read up on new
features, but almost all of us are probably guilty of forgetting to update
the KIP document. So it's important to be able to find the code when in
doubt.

Otherwise nice KIP!

On Thu, Jun 22, 2023 at 8:19 AM Lucia Cerchie 
wrote:

> Thanks Kirk and John for the valuable feedback!
>
> John, I'll update the KIP to reflect that nuance you mention -- yes it is
> just about making the withRange method more permissive. Thanks for the
> testing file as well, I'll be sure to write my test cases there.
>
> On Wed, Jun 21, 2023 at 10:50 AM Kirk True  wrote:
>
> > Hi John/Lucia,
> >
> > Thanks for the feedback!
> >
> > Of course I only noticed the private-ness of the RangeQuery constructor
> > moments after sending my email ¯\_(ツ)_/¯
> >
> > Just to be clear, I’m happy with the proposed change as it conforms to
> > Postel’s Law ;) Apologies that it was worded tersely.
> >
> > Thanks,
> > Kirk
> >
> > > On Jun 21, 2023, at 10:20 AM, John Roesler 
> wrote:
> > >
> > > Hi all,
> > >
> > > Thanks for the KIP, Lucia! This is a nice change.
> > >
> > > To Kirk's question (1), the example is a bit misleading. The typical
> > case that would ease user pain is specifically using "null" to indicate
> an
> > open-ended range, especially since null is not a valid key.
> > >
> > > I could additionally see an empty string as being nice, but the actual
> > API is generic, not String, so there's no meaningful concept of
> > empty/blank/whitespace that we could check for, just null or not.
> > >
> > > Regarding (2), there's no public factory that takes Optional
> parameters.
> > I think you're looking at the private constructor. An alternative Lucia
> > could consider is to instead propose adding a new factory like
> > `withRange(Optional lower, Optional upper)`.
> > >
> > > FWIW, I'd be in favor of this KIP as proposed.
> > >
> > > A couple of smaller notes:
> > >
> > > 3. In the compatibility notes, I wasn't sure what "web request" was
> > referring to. I think you just mean that all existing valid API calls
> will
> > continue to work the same, and we're only making the withRange method
> more
> > permissive with its arguments.
> > >
> > > 4. For the Test Plan, I wrote some tests that validate these queries
> > against every kind and configuration of store possible. Please add your
> new
> > test cases to that one to make absolutely sure it'll work for every
> store.
> > Obviously, you may also want to add some specific unit tests in addition.
> > >
> > > See
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
> > >
> > > Thanks again!
> > > -John
> > >
> > > On 6/21/23 12:00, Kirk True wrote:
> > >> Hi Lucia,
> > >> One question:
> > >> 1. Since the proposed implementation change for withRange() method
> uses
> > Optional.ofNullable() (which only catches nulls and not blank/whitespace
> > strings), wouldn’t users still need to have code like that in the
> example?
> > >> 2. Why don't users create RangeQuery objects that use Optional
> > directly? What’s the benefit of introducing what appears to be a very
> thin
> > utility facade?
> > >> Thanks,
> > >> Kirk
> > >>> On Jun 21, 2023, at 9:51 AM, Kirk True  wrote:
> > >>>
> > >>> Hi Lucia,
> > >>>
> > >>> Thanks for the KIP!
> > >>>
> > >>> The KIP wasn’t in the email and I didn’t see it on the main KIP
> > directory. Here it is:
> > >>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds
> > >>>
> > >>> Can the KIP be added to the main KIP page (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> )?
> > That will help with discoverability and encourage discussion.
> > >>>
> > >>> Thanks,
> > >>> Kirk
> > >>>
> >  On Jun 15, 2023, at 2:13 PM, Lucia Cerchie
> >  wrote:
> > 
> >  Hi everyone,
> > 
> >  I'd like to discuss KIP-941, which will change the 

[jira] [Created] (KAFKA-15115) Implement resetPositions functionality in ListOffsetRequestManager

2023-06-22 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15115:
--

 Summary: Implement resetPositions functionality in 
ListOffsetRequestManager
 Key: KAFKA-15115
 URL: https://issues.apache.org/jira/browse/KAFKA-15115
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


Introduce support for resetting positions in the new ListOffsetsRequestManager. 
This task will include a new event for the resetPositions calls performed from 
the new consumer, and the logic for handling such events in the 
ListOffsetRequestManager.

The reset positions implementation will keep the same behaviour as the one in 
the old consumer, but adapted to the new threading model. So it is based in a 
RESET_POSITIONS events that is submitted to the background thread, and the 
processed by the ApplicationEventProcessor. The processing itself is done by 
the ListOffsetRequestManager given that this will require a LIST_OFFSETS 
request for the partitions awaiting reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14694) RPCProducerIdManager should not wait for a new block

2023-06-22 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-14694.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

> RPCProducerIdManager should not wait for a new block
> 
>
> Key: KAFKA-14694
> URL: https://issues.apache.org/jira/browse/KAFKA-14694
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
> Fix For: 3.6.0
>
>
> RPCProducerIdManager initiates an async request to the controller to grab a 
> block of producer IDs and then blocks waiting for a response from the 
> controller.
> This is done in the request handler threads while holding a global lock. This 
> means that if many producers are requesting producer IDs and the controller 
> is slow to respond, many threads can get stuck waiting for the lock.
> This may also be a deadlock concern under the following scenario:
> if the controller has 1 request handler thread (1 chosen for simplicity) and 
> receives an InitProducerId request, it may deadlock.
> basically any time the controller has N InitProducerId requests where N >= # 
> of request handler threads has the potential to deadlock.
> consider this:
> 1. the request handler thread tries to handle an InitProducerId request to 
> the controller by forwarding an AllocateProducerIds request.
> 2. the request handler thread then waits on the controller response (timed 
> poll on nextProducerIdBlock)
> 3. the controller's request handler threads need to pick this request up, and 
> handle it, but the controller's request handler threads are blocked waiting 
> for the forwarded AllocateProducerIds response.
>  
> We should not block while waiting for a new block and instead return 
> immediately to free the request handler threads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1947

2023-06-22 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-941: Range queries to accept null lower and upper bounds

2023-06-22 Thread Lucia Cerchie
Thanks Kirk and John for the valuable feedback!

John, I'll update the KIP to reflect that nuance you mention -- yes it is
just about making the withRange method more permissive. Thanks for the
testing file as well, I'll be sure to write my test cases there.

On Wed, Jun 21, 2023 at 10:50 AM Kirk True  wrote:

> Hi John/Lucia,
>
> Thanks for the feedback!
>
> Of course I only noticed the private-ness of the RangeQuery constructor
> moments after sending my email ¯\_(ツ)_/¯
>
> Just to be clear, I’m happy with the proposed change as it conforms to
> Postel’s Law ;) Apologies that it was worded tersely.
>
> Thanks,
> Kirk
>
> > On Jun 21, 2023, at 10:20 AM, John Roesler  wrote:
> >
> > Hi all,
> >
> > Thanks for the KIP, Lucia! This is a nice change.
> >
> > To Kirk's question (1), the example is a bit misleading. The typical
> case that would ease user pain is specifically using "null" to indicate an
> open-ended range, especially since null is not a valid key.
> >
> > I could additionally see an empty string as being nice, but the actual
> API is generic, not String, so there's no meaningful concept of
> empty/blank/whitespace that we could check for, just null or not.
> >
> > Regarding (2), there's no public factory that takes Optional parameters.
> I think you're looking at the private constructor. An alternative Lucia
> could consider is to instead propose adding a new factory like
> `withRange(Optional lower, Optional upper)`.
> >
> > FWIW, I'd be in favor of this KIP as proposed.
> >
> > A couple of smaller notes:
> >
> > 3. In the compatibility notes, I wasn't sure what "web request" was
> referring to. I think you just mean that all existing valid API calls will
> continue to work the same, and we're only making the withRange method more
> permissive with its arguments.
> >
> > 4. For the Test Plan, I wrote some tests that validate these queries
> against every kind and configuration of store possible. Please add your new
> test cases to that one to make absolutely sure it'll work for every store.
> Obviously, you may also want to add some specific unit tests in addition.
> >
> > See
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
> >
> > Thanks again!
> > -John
> >
> > On 6/21/23 12:00, Kirk True wrote:
> >> Hi Lucia,
> >> One question:
> >> 1. Since the proposed implementation change for withRange() method uses
> Optional.ofNullable() (which only catches nulls and not blank/whitespace
> strings), wouldn’t users still need to have code like that in the example?
> >> 2. Why don't users create RangeQuery objects that use Optional
> directly? What’s the benefit of introducing what appears to be a very thin
> utility facade?
> >> Thanks,
> >> Kirk
> >>> On Jun 21, 2023, at 9:51 AM, Kirk True  wrote:
> >>>
> >>> Hi Lucia,
> >>>
> >>> Thanks for the KIP!
> >>>
> >>> The KIP wasn’t in the email and I didn’t see it on the main KIP
> directory. Here it is:
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds
> >>>
> >>> Can the KIP be added to the main KIP page (
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)?
> That will help with discoverability and encourage discussion.
> >>>
> >>> Thanks,
> >>> Kirk
> >>>
>  On Jun 15, 2023, at 2:13 PM, Lucia Cerchie
>  wrote:
> 
>  Hi everyone,
> 
>  I'd like to discuss KIP-941, which will change the behavior of range
>  queries to make it easier for users to execute full range scans when
> using
>  interactive queries with upper and lower bounds from query parameters
> in
>  web client requests.
> 
>  I much appreciate your input!
> 
>  Lucia Cerchie
>  --
> 
>  [image: Confluent] 
>  Lucia Cerchie
>  Developer Advocate
>  Follow us: [image: Blog]
>  <
> https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog
> >[image:
>  Twitter] [image: Slack]
>  [image: YouTube]
>  
> 
>  [image: Try Confluent Cloud for Free]
>  <
> https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic
> >
> >>>
>
>

-- 

[image: Confluent] 
Lucia Cerchie
Developer Advocate
Follow us: [image: Blog]
[image:
Twitter] [image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]



[jira] [Created] (KAFKA-15114) StorageTool help specifies user as parameter not name

2023-06-22 Thread Proven Provenzano (Jira)
Proven Provenzano created KAFKA-15114:
-

 Summary: StorageTool help specifies user as parameter not name
 Key: KAFKA-15114
 URL: https://issues.apache.org/jira/browse/KAFKA-15114
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.5.0
Reporter: Proven Provenzano
Assignee: Proven Provenzano
 Fix For: 3.5.1


StorageTool help message current specifies setting a {{user}} parameter when 
creating a SCRAM record for bootstrap.

The StorageTool parses and only accepts the parameter as {{name}} and so the 
help message is wrong.

The choice of using {{name}} vs. {{user}} as a parameter is because internally 
the record uses name, all tests using the StorageTool use name as a parameter, 
KafkaPrincipals are created with {{name}} and because creating SCRAM 
credentials is done with {{--entity-name}}

I will change the help to specify {{name}} instead of {{user}}.


 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-22 Thread Nick Telford
Hi Bruno!

3.
By "less predictable for users", I meant in terms of understanding the
performance profile under various circumstances. The more complex the
solution, the more difficult it would be for users to understand the
performance they see. For example, spilling records to disk when the
transaction buffer reaches a threshold would, I expect, reduce write
throughput. This reduction in write throughput could be unexpected, and
potentially difficult to diagnose/understand for users.
At the moment, I think the "early commit" concept is relatively
straightforward; it's easy to document, and conceptually fairly obvious to
users. We could probably add a metric to make it easier to understand when
it happens though.

3. (the second one)
The IsolationLevel is *essentially* an indirect way of telling StateStores
whether they should be transactional. READ_COMMITTED essentially requires
transactions, because it dictates that two threads calling
`newTransaction()` should not see writes from the other transaction until
they have been committed. With READ_UNCOMMITTED, all bets are off, and
stores can allow threads to observe written records at any time, which is
essentially "no transactions". That said, StateStores are free to implement
these guarantees however they can, which is a bit more relaxed than
dictating "you must use transactions". For example, with RocksDB we would
implement these as READ_COMMITTED == WBWI-based "transactions",
READ_UNCOMMITTED == direct writes to the database. But with other storage
engines, it might be preferable to *always* use transactions, even when
unnecessary; or there may be storage engines that don't provide
transactions, but the isolation guarantees can be met using a different
technique.
My idea was to try to keep the StateStore interface as loosely coupled from
the Streams engine as possible, to give implementers more freedom, and
reduce the amount of internal knowledge required.
That said, I understand that "IsolationLevel" might not be the right
abstraction, and we can always make it much more explicit if required, e.g.
boolean transactional()

7-8.
I can make these changes either later today or tomorrow.

Small update:
I've rebased my branch on trunk and fixed a bunch of issues that needed
addressing. Currently, all the tests pass, which is promising, but it will
need to undergo some performance testing. I haven't (yet) worked on
removing the `newTransaction()` stuff, but I would expect that,
behaviourally, it should make no difference. The branch is available at
https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone is interested
in taking an early look.

Regards,
Nick

On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna  wrote:

> Hi Nick,
>
> 1.
> Yeah, I agree with you. That was actually also my point. I understood
> that John was proposing the ingestion path as a way to avoid the early
> commits. Probably, I misinterpreted the intent.
>
> 2.
> I agree with John here, that actually it is public API. My question is
> how this usage pattern affects normal processing.
>
> 3.
> My concern is that checking for the size of the transaction buffer and
> maybe triggering an early commit affects the whole processing of Kafka
> Streams. The transactionality of a state store is not confined to the
> state store itself, but spills over and changes the behavior of other
> parts of the system. I agree with you that it is a decent compromise. I
> just wanted to analyse the downsides and list the options to overcome
> them. I also agree with you that all options seem quite heavy compared
> with your KIP. I do not understand what you mean with "less predictable
> for users", though.
>
>
> I found the discussions about the alternatives really interesting. But I
> also think that your plan sounds good and we should continue with it!
>
>
> Some comments on your reply to my e-mail on June 20th:
>
> 3.
> Ah, now, I understand the reasoning behind putting isolation level in
> the state store context. Thanks! Should that also be a way to give the
> the state store the opportunity to decide whether to turn on
> transactions or not?
> With my comment, I was more concerned about how do you know if a
> checkpoint file needs to be written under EOS, if you do not have a way
> to know if the state store is transactional or not. If a state store is
> transactional, the checkpoint file can be written during normal
> processing under EOS. If the state store is not transactional, the
> checkpoint file must not be written under EOS.
>
> 7.
> My point was about not only considering the bytes in memory in config
> statestore.uncommitted.max.bytes, but also bytes that might be spilled
> on disk. Basically, I was wondering whether you should remove the
> "memory" in "Maximum number of memory bytes to be used to
> buffer uncommitted state-store records." My thinking was that even if a
> state store spills uncommitted bytes to disk, limiting the overall bytes
> might make sense. Thinking about it again and 

Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #1946

2023-06-22 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-06-22 Thread Bruno Cadonna

Hi Nick,

1.
Yeah, I agree with you. That was actually also my point. I understood 
that John was proposing the ingestion path as a way to avoid the early 
commits. Probably, I misinterpreted the intent.


2.
I agree with John here, that actually it is public API. My question is 
how this usage pattern affects normal processing.


3.
My concern is that checking for the size of the transaction buffer and 
maybe triggering an early commit affects the whole processing of Kafka 
Streams. The transactionality of a state store is not confined to the 
state store itself, but spills over and changes the behavior of other 
parts of the system. I agree with you that it is a decent compromise. I 
just wanted to analyse the downsides and list the options to overcome 
them. I also agree with you that all options seem quite heavy compared 
with your KIP. I do not understand what you mean with "less predictable 
for users", though.



I found the discussions about the alternatives really interesting. But I 
also think that your plan sounds good and we should continue with it!



Some comments on your reply to my e-mail on June 20th:

3.
Ah, now, I understand the reasoning behind putting isolation level in 
the state store context. Thanks! Should that also be a way to give the 
the state store the opportunity to decide whether to turn on 
transactions or not?
With my comment, I was more concerned about how do you know if a 
checkpoint file needs to be written under EOS, if you do not have a way 
to know if the state store is transactional or not. If a state store is 
transactional, the checkpoint file can be written during normal 
processing under EOS. If the state store is not transactional, the 
checkpoint file must not be written under EOS.


7.
My point was about not only considering the bytes in memory in config 
statestore.uncommitted.max.bytes, but also bytes that might be spilled 
on disk. Basically, I was wondering whether you should remove the 
"memory" in "Maximum number of memory bytes to be used to
buffer uncommitted state-store records." My thinking was that even if a 
state store spills uncommitted bytes to disk, limiting the overall bytes 
might make sense. Thinking about it again and considering the recent 
discussions, it does not make too much sense anymore.

I like the name statestore.transaction.buffer.max.bytes that you proposed.

8.
A high-level description (without implementation details) of how Kafka 
Streams will manage the commit of changelog transactions, state store 
transactions and checkpointing would be great. Would be great if you 
could also add some sentences about the behavior in case of a failure. 
For instance how does a transactional state store recover after a 
failure or what happens with the transaction buffer, etc. (that is what 
I meant by "fail-over" in point 9.)


Best,
Bruno

On 21.06.23 18:50, Nick Telford wrote:

Hi Bruno,

1.
Isn't this exactly the same issue that WriteBatchWithIndex transactions
have, whereby exceeding (or likely to exceed) configured memory needs to
trigger an early commit?

2.
This is one of my big concerns. Ultimately, any approach based on cracking
open RocksDB internals and using it in ways it's not really designed for is
likely to have some unforseen performance or consistency issues.

3.
What's your motivation for removing these early commits? While not ideal, I
think they're a decent compromise to ensure consistency whilst maintaining
good and predictable performance.
All 3 of your suggested ideas seem *very* complicated, and might actually
make behaviour less predictable for users as a consequence.

I'm a bit concerned that the scope of this KIP is growing a bit out of
control. While it's good to discuss ideas for future improvements, I think
it's important to narrow the scope down to a design that achieves the most
pressing objectives (constant sized restorations during dirty
close/unexpected errors). Any design that this KIP produces can ultimately
be changed in the future, especially if the bulk of it is internal
behaviour.

I'm going to spend some time next week trying to re-work the original
WriteBatchWithIndex design to remove the newTransaction() method, such that
it's just an implementation detail of RocksDBStore. That way, if we want to
replace WBWI with something in the future, like the SST file management
outlined by John, then we can do so with little/no API changes.

Regards,

Nick



Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-22 Thread Tom Bentley
Hi Edorado and Adrian,

Thanks for the KIP.

I think it would be good to elaborate on exactly how validate() gets
called, because I think there are a number of potential problems, or at
least things to consider.

>From the broker's point of view, validate() can do arbitrary things. It
might never return due to blocking or an infinite loop. It might cause an
OOME, or throw a StackOverflowException. These are not entirely unlikely
and the risks cannot always be easily avoided by a careful policy
implementation. For example, a plugin which performs schema validation
would typically be fetching schema information from a remote registry (and
caching it for future use), and so could block on the networking (there are
further questions about retry in the case of network error). Then, when
deserializing a format like JSON deserializers might be prone to SOE or
OOME (e.g. consider a naive recursive JSON parser with JSON input starting
"..."). More generally, incorrect
deserialization of untrusted input is a common kind of CVE. Similarly
validation might involve regular expression matching (for example
JSONSchema supports pattern constraints). The matcher implementation really
matters and common matchers, including Java's Pattern, expose you to the
possibility of nasty exponential time behaviour.

You mentioned LogValidator in the KIP. This executes on an IO thread and
gets called with the log lock held. So the consequences of the validation
blocking could actually be a real problem from a broker availability PoV if
this validation happens in the same place. In the worst case all the IO
threads get stuck because of bad input (perhaps from a single producer), or
network problems between the broker and the registry. I don't think simply
moving the validation to before the acquisition of the lock is an easy
solution either, because of the dependency on the compression validation.

Kind regards,

Tom

On Thu, 22 Jun 2023 at 04:16, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Hi Eduardo, Adrian.
>
> Thanks for the KIP. I agree that allowing custom validations on the
> broker-side addresses a real gap as you clearly stated on the motivation.
>
> Some initial thoughts from my side:
>
> 1. Similar to Kirk's first point, I'm also concerned on how would the
> plugin developers / operators be able to apply multiple policies and how
> configurations would be passed to each policy.
>
> Some approaches from other plugins we can get some inspiration from:
>
> - AlterConfig, CreateTopic policies are designed to be 1 policy
> implementing the different APIs. Up to the plugin developer to pull
> policies together and configure it on the broker side. I guess for Record
> Validation this may be cumbersome considering some Schema Registry
> providers may want to offer implementations for their own backend.
>
> - Connect Transforms: here there's a named set of plugins to apply per
> connector, and each transform has its own configuration defined by prefix.
> Personally, I'd consider this one an interesting approach if we decide to
> allow multiple record validations to be configured.
>
> - Tiered Storage (probably Connectors as well) have class-loader aware
> implementations with class path specific to the plugin. Not sure if this is
> something to discuss on the KIP or later on the PR, but we could mention
> something on how this plugin would deal with dependency conflicts (e.g.
> different jackson version between broker, plugin(s)).
>
> Also, by potentially supporting multiple plugins for record validation, it
> would be important to consider if it's an all or nothing relation, or
> posible to choose _some_ policies apply per topic.
> I see there's some preference for setting the validation policy name on the
> topic, though this could be cumbersome to operate: topic creation users may
> not be aware of the record validation (similar to CreateTopic/AlterConfig
> policies) and would impose additional coordination.
> Maybe a flag to whether apply policies or not would be a better approach?
>
> 2. Have you consider adding the record metadata to the API? It may be
> useful for logging purposes (e.g. if record validation fails, to log
> topic-partition), or some policies are interested on record metadata (e.g.
> compression, timestamp type, etc.)
>
> 3. A minor comment for consistency regarding the APIs. As far as I have
> seen, we tend to use the name of the object returned directly instead of
> getters notation, see `AlterConfigPolicy.RecordMetadata` [1]. We may rename
> some of the proposed APIs accordingly:
>
> - `RecordProxy#headers()|key()|value()`
> - `TopicMetadata#topicPartition()`
>
> 4. For the `RecordIntrospectionHints`, I'm struggling to see how this may
> be used by the policy developers. Would you mind adding some examples on
> how the policy in general may be used?
> Specifically, `long needKeyBytes|needKeyValue` are difficult to interpret
> to me.
> nit: maybe replace 

[jira] [Created] (KAFKA-15113) Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters

2023-06-22 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15113:
--

 Summary: Gracefully handle cases where a sink connector's admin 
and consumer client config overrides target different Kafka clusters
 Key: KAFKA-15113
 URL: https://issues.apache.org/jira/browse/KAFKA-15113
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Yash Mayya


Background reading -
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
 
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 

 

>From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] -
{quote}Currently, admin clients are only instantiated for sink connectors to 
create the DLQ topic if required. So it seems like it could be technically 
possible for a sink connector's consumer client overrides to target a different 
Kafka cluster from its producer and admin client overrides. Such a setup won't 
work with this implementation of the get offsets API as it is using an admin 
client to get a sink connector's consumer group offsets. However, I'm not sure 
we want to use a consumer client to retrieve the offsets either as we shouldn't 
be disrupting the existing sink tasks' consumer group just to fetch offsets. 
Leveraging a sink task's consumer also isn't an option because fetching offsets 
for a stopped sink connector (where all the tasks will be stopped) should be 
allowed. I'm wondering if we should document that a connector's various client 
config override policies shouldn't target different Kafka clusters (side note - 
looks like we don't [currently 
document|https://kafka.apache.org/documentation/#connect] client config 
overrides for Connect beyond just the worker property 
{{{}connector.client.config.override.policy{}}}).
{quote}
 
{quote}I don't think we need to worry too much about this. I cannot imagine a 
sane use case that involves overriding a connector's Kafka clients with 
different Kafka clusters (not just bootstrap servers, but actually different 
clusters) for producer/consumer/admin. I'd be fine with adding a note to our 
docs that that kind of setup isn't supported but I really, really hope that 
it's not necessary and nobody's trying to do that in the first place. I also 
suspect that there are other places where this might cause issues, like with 
exactly-once source support or automatic topic creation for source connectors.

That said, there is a different case we may want to consider: someone may have 
configured consumer overrides for a sink connector, but not admin overrides. 
This may happen if they don't use a DLQ topic. I don't know if we absolutely 
need to handle this now and we may consider filing a follow-up ticket to look 
into this, but one quick-and-dirty thought I've had is to configure the admin 
client used here with a combination of the configurations for the connector's 
admin client and its consumer, giving precedent to the latter.
{quote}
 

Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] -
{quote}We will have undesirable behavior if the connector is targeting a Kafka 
cluster different from the Connect cluster's backing Kafka cluster and the user 
has configured the consumer overrides appropriately for their connector, but 
not the admin overrides (something we also discussed previously 
[here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]).

In the above case, if a user attempts to reset their sink connector's offsets 
via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following 
will occur:
 # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} 
which returns an empty partition offsets map for the sink connector's consumer 
group ID (it exists on a different Kafka cluster to the one that the admin 
client is connecting to).
 # We call {{SinkConnector::alterOffsets}} with an empty offsets map which 
could cause the sink connector to propagate the offsets reset related changes 
to the sink system.
 # We attempt to delete the consumer group via {{Admin::deleteConsumerGroups}} 
which returns {{GroupIdNotFoundException}} which we essentially swallow in 
order to keep offsets reset operations idempotent and return a success message 
to the user (even though the real consumer group for the sink connector on the 
other Kafka cluster hasn't been deleted).

This will occur if the connector's admin overrides are missing OR the admin 
overrides are deliberately configured to target a Kafka cluster different from 
the consumer overrides (although like you pointed out in the other linked 
thread, this doesn't seem like a valid use case that we'd even want to support).

I guess we'd want to pursue the approach you suggested where we'd configure the 
admin client with a combination of the connector's admin overrides and