Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-09-05 Thread Edoardo Comar
Hi all,

we have opened the VOTE thread a few weeks ago as we hoped that this
DISCUSS thread exchange had been exhaustive.
If so, we would you like any interested party to. cast a vote there.
Of course we're happy to further progress the KIP discussion if needed.

Thanks
Edo & Adrian

On Wed, 5 Jul 2023 at 16:55, Edoardo Comar  wrote:
>
> Hi Jorge!
>
> On Fri, 30 Jun 2023 at 15:47, Jorge Esteban Quilcate Otoya
>  wrote:
> >
> > Thank you both for the replies! A couple more comments:
> >
> > The current proposal is to have ‘record.validation.policy’ per topic
> > (default null). A flag would be something like
> > ‘record.validation.policy.enable’ (default=false) may be simpler to
> > configure from the user perspective.
> >
> > Also, at the moment, is a bit unclear to me what value the topic config
> > ‘record.validation.policy’ should contain: is the policy class name? How is
> > the policy expected to use the name received?
> >
>
> The 'record.validation.policy' will typically contain a value that is
> meaningful to the policy implementation.
> For example, a schema registry might support different strategies to
> associate a schema with a topic.
> The policy could use this property to determine which strategy is in
> use and then evaluate whether the record is valid.
> We decided to reserve the 'null' value to mean disable validation for
> this topic to avoid the need for introducing a second inter-dependent
> boolean property.
>
> >
> > Thanks! I think adding a simple example of a Policy implementation and how
> > plugin developer may use this hints (and metadata as well) may bring some
> > clarity to the proposal.
> >
>
> We've added a sample to the KIP, hope this helps.
>
> We expect the RecordIntrospectionHints to be a declaration the policy makes,
> which the implementation of the KIP may use to optimise record
> iteration avoiding a full decompression in the case where a message is
> received with compression type matching the topic compression config.
> Currently Kafka optimizes that case by supplying an iterator that does
> not provide access to the record data, only answers hasKey/hasValue
> checks.
>
> HTH,
> best
> Edo & Adrian


Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-07-05 Thread Edoardo Comar
Hi Jorge!

On Fri, 30 Jun 2023 at 15:47, Jorge Esteban Quilcate Otoya
 wrote:
>
> Thank you both for the replies! A couple more comments:
>
> The current proposal is to have ‘record.validation.policy’ per topic
> (default null). A flag would be something like
> ‘record.validation.policy.enable’ (default=false) may be simpler to
> configure from the user perspective.
>
> Also, at the moment, is a bit unclear to me what value the topic config
> ‘record.validation.policy’ should contain: is the policy class name? How is
> the policy expected to use the name received?
>

The 'record.validation.policy' will typically contain a value that is
meaningful to the policy implementation.
For example, a schema registry might support different strategies to
associate a schema with a topic.
The policy could use this property to determine which strategy is in
use and then evaluate whether the record is valid.
We decided to reserve the 'null' value to mean disable validation for
this topic to avoid the need for introducing a second inter-dependent
boolean property.

>
> Thanks! I think adding a simple example of a Policy implementation and how
> plugin developer may use this hints (and metadata as well) may bring some
> clarity to the proposal.
>

We've added a sample to the KIP, hope this helps.

We expect the RecordIntrospectionHints to be a declaration the policy makes,
which the implementation of the KIP may use to optimise record
iteration avoiding a full decompression in the case where a message is
received with compression type matching the topic compression config.
Currently Kafka optimizes that case by supplying an iterator that does
not provide access to the record data, only answers hasKey/hasValue
checks.

HTH,
best
Edo & Adrian


Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-30 Thread Jorge Esteban Quilcate Otoya
Thank you both for the replies! A couple more comments:

On Tue, 27 Jun 2023 at 14:57, Edoardo Comar  wrote:

> Hi Jorge
> thanks for the feedback. Comments inline below
>
> > 1. Similar to Kirk's first point, I'm also concerned on how would the
> > plugin developers / operators be able to apply multiple policies and how
> > configurations would be passed to each policy.
>
> We’ve only attempted to tackle the “one plugin per broker” model with
> this KIP, as that’s the use-case we most clearly understand. Although,
> as noted in the rejected alternatives section, it would be possible to
> use a facade-like pattern to delegate from one plugin implementation
> to others. The reason we’ve avoided tackling multiple plugins is that
> it introduces further complexity (which takes precedence? Is
> configuration needed to say which plugin applies to which topic?
> Etc.), and we are concerned that without a clear use-case we might
> make decisions we later come to regret. Hopefully by offering minimal
> configuration options, we don’t hinder a future “support multiple
> record validation policies” KIP.
>

Got it. Thanks!


>
> > Some approaches from other plugins we can get some inspiration from:
> >
> > - AlterConfig, CreateTopic policies are designed to be 1 policy
> > implementing the different APIs. Up to the plugin developer to pull
> > policies together and configure it on the broker side. I guess for Record
> > Validation this may be cumbersome considering some Schema Registry
> > providers may want to offer implementations for their own backend.
> >
> > - Connect Transforms: here there's a named set of plugins to apply per
> > connector, and each transform has its own configuration defined by
> prefix.
> > Personally, I'd consider this one an interesting approach if we decide to
> > allow multiple record validations to be configured.
> >
> > - Tiered Storage (probably Connectors as well) have class-loader aware
> > implementations with class path specific to the plugin. Not sure if this
> is
> > something to discuss on the KIP or later on the PR, but we could mention
> > something on how this plugin would deal with dependency conflicts (e.g.
> > different jackson version between broker, plugin(s)).
>
>
> Thanks for highlighting all of these places where we can draw
> inspiration. We’ve updated the KIP with an additional classloader
> property to match the tiered storage implementation. It seems likely
> that record validation policy implementations will live in the
> codebase of their corresponding schema registry (as is the case,
> today, for the client serdes used to integrate with a schema registry)
> - so it makes sense to insulate their implementation from specific
> .jar versions that may (or may not) be present in a particular version
> of the broker.
>
> > Also, by potentially supporting multiple plugins for record validation,
> it
> > would be important to consider if it's an all or nothing relation, or
> > posible to choose _some_ policies apply per topic.
> > I see there's some preference for setting the validation policy name on
> the
> > topic, though this could be cumbersome to operate: topic creation users
> may
> > not be aware of the record validation (similar to CreateTopic/AlterConfig
> > policies) and would impose additional coordination.
> > Maybe a flag to whether apply policies or not would be a better approach?
>
> Could you elaborate more on your comments about “maybe a flag to
> whether to apply policies or not would be a better approach?”. We
> thought that setting the ‘record.validation.policy’ property on a
> topic to a value supported by the plugin was such a flag - but it
> sounds like you might have a different approach in mind?
>
>
The current proposal is to have ‘record.validation.policy’ per topic
(default null). A flag would be something like
‘record.validation.policy.enable’ (default=false) may be simpler to
configure from the user perspective.

Also, at the moment, is a bit unclear to me what value the topic config
‘record.validation.policy’ should contain: is the policy class name? How is
the policy expected to use the name received?


> > 2. Have you consider adding the record metadata to the API? It may be
> > useful for logging purposes (e.g. if record validation fails, to log
> > topic-partition), or some policies are interested on record metadata
> (e.g.
> > compression, timestamp type, etc.)
>
> The topic/partition is available to the plugin via the TopicMetadata
> interace. Additional record properties could be added to the
> ‘RecordProxy’ interface, however the topic of how rich to make the
> interface was a sticking point for KIP-729. The intent behind the
> ‘TopicMetadata’ and ‘RecordProxy’ classes is that they can be extended
> in the future without breaking existing plugin implementations - so
> we’re not precluding further properties from being added if we’ve been
> too austere.
>

I see, agree.


>
> > 3. A minor comment for consistency regarding the 

Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-28 Thread Edoardo Comar
Hi Andrew,

thanks for your comments ! Please see replies inline below.

On Mon, 26 Jun 2023 at 16:51, Andrew Schofield
 wrote:
> 4) For a new interface, I wonder whether it would be better to use 
> TopicIdPartition rather
> than TopicPartition. Topic IDs are gradually spreading across the public 
> interfaces for Kafka.

Thanks for the suggestion, we’ve updated the KIP.

> 5) The new topic config is called `record.validation.policy`. The javadoc for 
> the validationPolicy()
> method says `validation.policy`.

oops, fixed in the KIP, thx.

> 6) I’m surprised that you need a `HeaderProxy` interface when `Headers` and 
> `Header` are
> already interfaces. I would have expected it was possible to create proxy 
> instances of the
> headers using the existing interfaces with a little cunning.

A header value() returns a byte[] which can naturally be modified.
We introduced the HeaderProxy interface to make it clear that the
returned values are read-only,
without being forced to make deep copies of byte[].

Thanks,
Edo & Adrian


Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-28 Thread Edoardo Comar
Hi Tom,

thanks for tour comments, replies inline below.

On Thu, 22 Jun 2023 at 10:58, Tom Bentley  wrote:
>
> Hi Edorado and Adrian,
>
> Thanks for the KIP.
>
> I think it would be good to elaborate on exactly how validate() gets
> called, because I think there are a number of potential problems, or at
> least things to consider.
>
> From the broker's point of view, validate() can do arbitrary things. It
> might never return due to blocking or an infinite loop. It might cause an
> OOME, or throw a StackOverflowException. These are not entirely unlikely
> and the risks cannot always be easily avoided by a careful policy
> implementation. For example, a plugin which performs schema validation
> would typically be fetching schema information from a remote registry (and
> caching it for future use), and so could block on the networking (there are
> further questions about retry in the case of network error). Then, when
> deserializing a format like JSON deserializers might be prone to SOE or
> OOME (e.g. consider a naive recursive JSON parser with JSON input starting
> "..."). More generally, incorrect
> deserialization of untrusted input is a common kind of CVE. Similarly
> validation might involve regular expression matching (for example
> JSONSchema supports pattern constraints). The matcher implementation really
> matters and common matchers, including Java's Pattern, expose you to the
> possibility of nasty exponential time behaviour.

We agree with your observations, running 3rd party code inside the
broker exposes it to these problems.
The Authorizer for example, although it’s not typically involved with
user input deserialization and it is not invoked in a lock,
is an example of existing plugin code invoked from the IO threads and
implementations might access external systems.
Server side input validation carries a tradeoff between functionality
and risk, if it is not acceptable in a certain deployment then it
should not be enabled.

An implementation could use an own thread-pool and have the call
coming from the IO thread bounded by a timeout.
We do not think such a solution should be mandated as part of the
plugin interface.
We envision that the record validation plugin implementations used in
a production system to be production quality code,
likely developed and tested by the schema registry provider as are the serdes.
In fact there is a natural semantic coupling between the serdes and
the validator.
We do not expect Kafka cluster administrators to just run any code
within their brokers.

Furthermore, not all validation requires parsing of the message
payload to provide value.
For example, a policy that checks records carry a valid schema ID
would prevent common misconfigurations - like running a client without
a registry’s serdes.

> You mentioned LogValidator in the KIP. This executes on an IO thread and
> gets called with the log lock held. So the consequences of the validation
> blocking could actually be a real problem from a broker availability PoV if
> this validation happens in the same place. In the worst case all the IO
> threads get stuck because of bad input (perhaps from a single producer), or
> network problems between the broker and the registry. I don't think simply
> moving the validation to before the acquisition of the lock is an easy
> solution either, because of the dependency on the compression validation.

The existing LogValidator seems a very natural point to perform an
optional deeper validation than the existing one,
Again an implementation that uses a timeout-bounded call seems a possibility.

Thanks to your observation we think some metrics should be introduced
to monitor the plugin behaviour.
We could enhance the KIP introducing metrics similar to the existing
ones related to message conversions and invalid messages, e.g.

kafka.network:type=RequestMetrics,name=MessageValidationTimeMs

kafka.server:type=BrokerTopicMetrics,name=ProduceMessageValidationsPerSec
kafka.server:type=BrokerTopicMetrics,name=ProduceMessageValidationsPerSec,topic=

kafka.server:type=BrokerTopicMetrics,name=InvalidMessageRecordsPerSec
kafka.server:type=BrokerTopicMetrics,name=InvalidMessageRecordsPerSec,topic=

What do you think?

Thanks,
Edo & Adrian


Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-27 Thread Edoardo Comar
Hi Jorge
thanks for the feedback. Comments inline below

> 1. Similar to Kirk's first point, I'm also concerned on how would the
> plugin developers / operators be able to apply multiple policies and how
> configurations would be passed to each policy.

We’ve only attempted to tackle the “one plugin per broker” model with
this KIP, as that’s the use-case we most clearly understand. Although,
as noted in the rejected alternatives section, it would be possible to
use a facade-like pattern to delegate from one plugin implementation
to others. The reason we’ve avoided tackling multiple plugins is that
it introduces further complexity (which takes precedence? Is
configuration needed to say which plugin applies to which topic?
Etc.), and we are concerned that without a clear use-case we might
make decisions we later come to regret. Hopefully by offering minimal
configuration options, we don’t hinder a future “support multiple
record validation policies” KIP.

> Some approaches from other plugins we can get some inspiration from:
>
> - AlterConfig, CreateTopic policies are designed to be 1 policy
> implementing the different APIs. Up to the plugin developer to pull
> policies together and configure it on the broker side. I guess for Record
> Validation this may be cumbersome considering some Schema Registry
> providers may want to offer implementations for their own backend.
>
> - Connect Transforms: here there's a named set of plugins to apply per
> connector, and each transform has its own configuration defined by prefix.
> Personally, I'd consider this one an interesting approach if we decide to
> allow multiple record validations to be configured.
>
> - Tiered Storage (probably Connectors as well) have class-loader aware
> implementations with class path specific to the plugin. Not sure if this is
> something to discuss on the KIP or later on the PR, but we could mention
> something on how this plugin would deal with dependency conflicts (e.g.
> different jackson version between broker, plugin(s)).


Thanks for highlighting all of these places where we can draw
inspiration. We’ve updated the KIP with an additional classloader
property to match the tiered storage implementation. It seems likely
that record validation policy implementations will live in the
codebase of their corresponding schema registry (as is the case,
today, for the client serdes used to integrate with a schema registry)
- so it makes sense to insulate their implementation from specific
.jar versions that may (or may not) be present in a particular version
of the broker.

> Also, by potentially supporting multiple plugins for record validation, it
> would be important to consider if it's an all or nothing relation, or
> posible to choose _some_ policies apply per topic.
> I see there's some preference for setting the validation policy name on the
> topic, though this could be cumbersome to operate: topic creation users may
> not be aware of the record validation (similar to CreateTopic/AlterConfig
> policies) and would impose additional coordination.
> Maybe a flag to whether apply policies or not would be a better approach?

Could you elaborate more on your comments about “maybe a flag to
whether to apply policies or not would be a better approach?”. We
thought that setting the ‘record.validation.policy’ property on a
topic to a value supported by the plugin was such a flag - but it
sounds like you might have a different approach in mind?

> 2. Have you consider adding the record metadata to the API? It may be
> useful for logging purposes (e.g. if record validation fails, to log
> topic-partition), or some policies are interested on record metadata (e.g.
> compression, timestamp type, etc.)

The topic/partition is available to the plugin via the TopicMetadata
interace. Additional record properties could be added to the
‘RecordProxy’ interface, however the topic of how rich to make the
interface was a sticking point for KIP-729. The intent behind the
‘TopicMetadata’ and ‘RecordProxy’ classes is that they can be extended
in the future without breaking existing plugin implementations - so
we’re not precluding further properties from being added if we’ve been
too austere.

> 3. A minor comment for consistency regarding the APIs. As far as I have
> seen, we tend to use the name of the object returned directly instead of
> getters notation, see `AlterConfigPolicy.RecordMetadata` [1]. We may rename
> some of the proposed APIs accordingly:
>
> - `RecordProxy#headers()|key()|value()`
> - `TopicMetadata#topicPartition()`

Good point, and thanks for taking the time to find those examples.
We’ve fixed the KIP to be consistent with this use of returned object
name, and dropped the use of getter notation.

> 4. For the `RecordIntrospectionHints`, I'm struggling to see how this may
> be used by the policy developers. Would you mind adding some examples on
> how the policy in general may be used?
> Specifically, `long needKeyBytes|needKeyValue` are 

Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-26 Thread Andrew Schofield
Hi Edo,
Thanks for the KIP. Looks like a useful improvement. I have some 
comments/questions.

4) For a new interface, I wonder whether it would be better to use 
TopicIdPartition rather
than TopicPartition. Topic IDs are gradually spreading across the public 
interfaces for Kafka.

5) The new topic config is called `record.validation.policy`. The javadoc for 
the validationPolicy()
method says `validation.policy`.

6) I’m surprised that you need a `HeaderProxy` interface when `Headers` and 
`Header` are
already interfaces. I would have expected it was possible to create proxy 
instances of the
headers using the existing interfaces with a little cunning.

Thanks,
Andrew

> On 26 Jun 2023, at 16:05, Edoardo Comar  wrote:
>
> Hi Kirk,
> thanks for your comments.
>
>> 1. Does record.validation.policy.class.name support multiple classes, or 
>> just one? I’m probably not wrapping my head around it,
>> but I’d imagine different policies for different families or groupings of 
>> topics, thus the need for supporting multiple policies.
>> But if there are multiple then you run the risk of conflicts of ownership of 
>> validation, so ¯\_(ツ)_/¯
>
> We have only identified the use case for a single policy, but as
> mentioned in the Rejected Alternatives section of the KIP, we think a
> Facade-like policy could be written by a Kafka admin to dispatch
> validation to different policies. Allowing multiple policies to be
> specified would introduce complexities (as you noted), and we wanted
> to avoid making too many assumptions without having a strong use-case
> for this support.
>
>> 2. Is there any concern that a validation class may alter the contents of 
>> the ByteBuffer of the key and/or value?
>> Perhaps that’s already handled and/or outside the scope of this KIP?
>
> Good point. This behaviour isn’t defined, and what happens could
> change between Kafka versions (or depending on compression settings).
> We have modified the Javadoc in the KIP to indicate that the
> ByteBuffer’s are read-only wrapper, as we don’t intend this plug-point
> to be used for modifying message data. We also spotted that this was a
> problem for returning headers (as the common Header interface returns
> a byte array from one of its methods), and have updated the Javadoc
> accordingly.
>
>
>> 3. What is the benefit to introducing the inner TopicMetadata and 
>> RecordProxy interfaces vs.
>> passing the TopicPartition, String (validation policy), and Record into the 
>> validate() method directly?
>
> We wanted to avoid Record as the package
> org.apache.kafka.common.record is documented as *not* being a
> published API, and doesn’t necessarily maintain compatibility between
> versions. This was highlighted as a potential problem during the
> discussion of KIP-729.
>
> We designed the API using interfaces as arguments to the methods, so
> that further properties can be added in the future without breaking
> existing implementations.
>
>
> On Wed, 21 Jun 2023 at 17:08, Kirk True  wrote:
>>
>> Hi Edo/Adrian!
>>
>> Thanks for the KIP.
>>
>> I have some questions, and apologies that the may fall under the “stupid” 
>> column because I’m not that familiar with this area :)
>>
>> 1. Does record.validation.policy.class.name support multiple classes, or 
>> just one? I’m probably not wrapping my head around it, but I’d imagine 
>> different policies for different families or groupings of topics, thus the 
>> need for supporting multiple policies. But if there are multiple then you 
>> run the risk of conflicts of ownership of validation, so ¯\_(ツ)_/¯
>>
>> 2. Is there any concern that a validation class may alter the contents of 
>> the ByteBuffer of the key and/or value? Perhaps that’s already handled 
>> and/or outside the scope of this KIP?
>>
>> 3. What is the benefit to introducing the inner TopicMetadata and 
>> RecordProxy interfaces vs. passing the TopicPartition, String (validation 
>> policy), and Record into the validate() method directly?
>>
>> Thanks,
>> Kirk
>>
>>> On Jun 20, 2023, at 2:28 AM, Edoardo Comar  wrote:
>>>
>>> Thanks Николай,
>>> We’d like to open a vote next week.
>>> Hopefully getting some more feedback before then.
>>>
>>> Edo
>>>
>>>
>>> On Wed, 7 Jun 2023 at 19:15, Николай Ижиков  wrote:
>>>
 Hello.

 As author of one of related KIPs I’m +1 for this change.
 Long waited feature.

> 7 июня 2023 г., в 19:02, Edoardo Comar  написал(а):
>
> Dear all,
> Adrian and I would like to start a discussion thread on
>
> KIP-940: Broker extension point for validating record contents at
 produce time
>
>
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-940%3A+Broker+extension+point+for+validating+record+contents+at+produce+time
>
> This KIP proposes a new broker-side extension point (a “record
 validation policy”) that can be used to reject records published by a
 misconfigured client.
> Though general, it is aimed at the common, best-practice 

Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-26 Thread Edoardo Comar
Hi Kirk,
thanks for your comments.

> 1. Does record.validation.policy.class.name support multiple classes, or just 
> one? I’m probably not wrapping my head around it,
> but I’d imagine different policies for different families or groupings of 
> topics, thus the need for supporting multiple policies.
> But if there are multiple then you run the risk of conflicts of ownership of 
> validation, so ¯\_(ツ)_/¯

We have only identified the use case for a single policy, but as
mentioned in the Rejected Alternatives section of the KIP, we think a
Facade-like policy could be written by a Kafka admin to dispatch
validation to different policies. Allowing multiple policies to be
specified would introduce complexities (as you noted), and we wanted
to avoid making too many assumptions without having a strong use-case
for this support.

> 2. Is there any concern that a validation class may alter the contents of the 
> ByteBuffer of the key and/or value?
> Perhaps that’s already handled and/or outside the scope of this KIP?

Good point. This behaviour isn’t defined, and what happens could
change between Kafka versions (or depending on compression settings).
We have modified the Javadoc in the KIP to indicate that the
ByteBuffer’s are read-only wrapper, as we don’t intend this plug-point
to be used for modifying message data. We also spotted that this was a
problem for returning headers (as the common Header interface returns
a byte array from one of its methods), and have updated the Javadoc
accordingly.


> 3. What is the benefit to introducing the inner TopicMetadata and RecordProxy 
> interfaces vs.
> passing the TopicPartition, String (validation policy), and Record into the 
> validate() method directly?

We wanted to avoid Record as the package
org.apache.kafka.common.record is documented as *not* being a
published API, and doesn’t necessarily maintain compatibility between
versions. This was highlighted as a potential problem during the
discussion of KIP-729.

We designed the API using interfaces as arguments to the methods, so
that further properties can be added in the future without breaking
existing implementations.


On Wed, 21 Jun 2023 at 17:08, Kirk True  wrote:
>
> Hi Edo/Adrian!
>
> Thanks for the KIP.
>
> I have some questions, and apologies that the may fall under the “stupid” 
> column because I’m not that familiar with this area :)
>
> 1. Does record.validation.policy.class.name support multiple classes, or just 
> one? I’m probably not wrapping my head around it, but I’d imagine different 
> policies for different families or groupings of topics, thus the need for 
> supporting multiple policies. But if there are multiple then you run the risk 
> of conflicts of ownership of validation, so ¯\_(ツ)_/¯
>
> 2. Is there any concern that a validation class may alter the contents of the 
> ByteBuffer of the key and/or value? Perhaps that’s already handled and/or 
> outside the scope of this KIP?
>
> 3. What is the benefit to introducing the inner TopicMetadata and RecordProxy 
> interfaces vs. passing the TopicPartition, String (validation policy), and 
> Record into the validate() method directly?
>
> Thanks,
> Kirk
>
> > On Jun 20, 2023, at 2:28 AM, Edoardo Comar  wrote:
> >
> > Thanks Николай,
> > We’d like to open a vote next week.
> > Hopefully getting some more feedback before then.
> >
> > Edo
> >
> >
> > On Wed, 7 Jun 2023 at 19:15, Николай Ижиков  wrote:
> >
> >> Hello.
> >>
> >> As author of one of related KIPs I’m +1 for this change.
> >> Long waited feature.
> >>
> >>> 7 июня 2023 г., в 19:02, Edoardo Comar  написал(а):
> >>>
> >>> Dear all,
> >>> Adrian and I would like to start a discussion thread on
> >>>
> >>> KIP-940: Broker extension point for validating record contents at
> >> produce time
> >>>
> >>>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-940%3A+Broker+extension+point+for+validating+record+contents+at+produce+time
> >>>
> >>> This KIP proposes a new broker-side extension point (a “record
> >> validation policy”) that can be used to reject records published by a
> >> misconfigured client.
> >>> Though general, it is aimed at the common, best-practice use case of
> >> defining Kafka message formats with schemas maintained in a schema 
> >> registry.
> >>>
> >>> Please post your feedback, thanks !
> >>>
> >>> Edoardo & Adrian
> >>>
> >>> Unless otherwise stated above:
> >>>
> >>> IBM United Kingdom Limited
> >>> Registered in England and Wales with number 741598
> >>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >>
> >>
>


Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-22 Thread Tom Bentley
Hi Edorado and Adrian,

Thanks for the KIP.

I think it would be good to elaborate on exactly how validate() gets
called, because I think there are a number of potential problems, or at
least things to consider.

>From the broker's point of view, validate() can do arbitrary things. It
might never return due to blocking or an infinite loop. It might cause an
OOME, or throw a StackOverflowException. These are not entirely unlikely
and the risks cannot always be easily avoided by a careful policy
implementation. For example, a plugin which performs schema validation
would typically be fetching schema information from a remote registry (and
caching it for future use), and so could block on the networking (there are
further questions about retry in the case of network error). Then, when
deserializing a format like JSON deserializers might be prone to SOE or
OOME (e.g. consider a naive recursive JSON parser with JSON input starting
"..."). More generally, incorrect
deserialization of untrusted input is a common kind of CVE. Similarly
validation might involve regular expression matching (for example
JSONSchema supports pattern constraints). The matcher implementation really
matters and common matchers, including Java's Pattern, expose you to the
possibility of nasty exponential time behaviour.

You mentioned LogValidator in the KIP. This executes on an IO thread and
gets called with the log lock held. So the consequences of the validation
blocking could actually be a real problem from a broker availability PoV if
this validation happens in the same place. In the worst case all the IO
threads get stuck because of bad input (perhaps from a single producer), or
network problems between the broker and the registry. I don't think simply
moving the validation to before the acquisition of the lock is an easy
solution either, because of the dependency on the compression validation.

Kind regards,

Tom

On Thu, 22 Jun 2023 at 04:16, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Hi Eduardo, Adrian.
>
> Thanks for the KIP. I agree that allowing custom validations on the
> broker-side addresses a real gap as you clearly stated on the motivation.
>
> Some initial thoughts from my side:
>
> 1. Similar to Kirk's first point, I'm also concerned on how would the
> plugin developers / operators be able to apply multiple policies and how
> configurations would be passed to each policy.
>
> Some approaches from other plugins we can get some inspiration from:
>
> - AlterConfig, CreateTopic policies are designed to be 1 policy
> implementing the different APIs. Up to the plugin developer to pull
> policies together and configure it on the broker side. I guess for Record
> Validation this may be cumbersome considering some Schema Registry
> providers may want to offer implementations for their own backend.
>
> - Connect Transforms: here there's a named set of plugins to apply per
> connector, and each transform has its own configuration defined by prefix.
> Personally, I'd consider this one an interesting approach if we decide to
> allow multiple record validations to be configured.
>
> - Tiered Storage (probably Connectors as well) have class-loader aware
> implementations with class path specific to the plugin. Not sure if this is
> something to discuss on the KIP or later on the PR, but we could mention
> something on how this plugin would deal with dependency conflicts (e.g.
> different jackson version between broker, plugin(s)).
>
> Also, by potentially supporting multiple plugins for record validation, it
> would be important to consider if it's an all or nothing relation, or
> posible to choose _some_ policies apply per topic.
> I see there's some preference for setting the validation policy name on the
> topic, though this could be cumbersome to operate: topic creation users may
> not be aware of the record validation (similar to CreateTopic/AlterConfig
> policies) and would impose additional coordination.
> Maybe a flag to whether apply policies or not would be a better approach?
>
> 2. Have you consider adding the record metadata to the API? It may be
> useful for logging purposes (e.g. if record validation fails, to log
> topic-partition), or some policies are interested on record metadata (e.g.
> compression, timestamp type, etc.)
>
> 3. A minor comment for consistency regarding the APIs. As far as I have
> seen, we tend to use the name of the object returned directly instead of
> getters notation, see `AlterConfigPolicy.RecordMetadata` [1]. We may rename
> some of the proposed APIs accordingly:
>
> - `RecordProxy#headers()|key()|value()`
> - `TopicMetadata#topicPartition()`
>
> 4. For the `RecordIntrospectionHints`, I'm struggling to see how this may
> be used by the policy developers. Would you mind adding some examples on
> how the policy in general may be used?
> Specifically, `long needKeyBytes|needKeyValue` are difficult to interpret
> to me.
> nit: maybe replace 

Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-21 Thread Jorge Esteban Quilcate Otoya
Hi Eduardo, Adrian.

Thanks for the KIP. I agree that allowing custom validations on the
broker-side addresses a real gap as you clearly stated on the motivation.

Some initial thoughts from my side:

1. Similar to Kirk's first point, I'm also concerned on how would the
plugin developers / operators be able to apply multiple policies and how
configurations would be passed to each policy.

Some approaches from other plugins we can get some inspiration from:

- AlterConfig, CreateTopic policies are designed to be 1 policy
implementing the different APIs. Up to the plugin developer to pull
policies together and configure it on the broker side. I guess for Record
Validation this may be cumbersome considering some Schema Registry
providers may want to offer implementations for their own backend.

- Connect Transforms: here there's a named set of plugins to apply per
connector, and each transform has its own configuration defined by prefix.
Personally, I'd consider this one an interesting approach if we decide to
allow multiple record validations to be configured.

- Tiered Storage (probably Connectors as well) have class-loader aware
implementations with class path specific to the plugin. Not sure if this is
something to discuss on the KIP or later on the PR, but we could mention
something on how this plugin would deal with dependency conflicts (e.g.
different jackson version between broker, plugin(s)).

Also, by potentially supporting multiple plugins for record validation, it
would be important to consider if it's an all or nothing relation, or
posible to choose _some_ policies apply per topic.
I see there's some preference for setting the validation policy name on the
topic, though this could be cumbersome to operate: topic creation users may
not be aware of the record validation (similar to CreateTopic/AlterConfig
policies) and would impose additional coordination.
Maybe a flag to whether apply policies or not would be a better approach?

2. Have you consider adding the record metadata to the API? It may be
useful for logging purposes (e.g. if record validation fails, to log
topic-partition), or some policies are interested on record metadata (e.g.
compression, timestamp type, etc.)

3. A minor comment for consistency regarding the APIs. As far as I have
seen, we tend to use the name of the object returned directly instead of
getters notation, see `AlterConfigPolicy.RecordMetadata` [1]. We may rename
some of the proposed APIs accordingly:

- `RecordProxy#headers()|key()|value()`
- `TopicMetadata#topicPartition()`

4. For the `RecordIntrospectionHints`, I'm struggling to see how this may
be used by the policy developers. Would you mind adding some examples on
how the policy in general may be used?
Specifically, `long needKeyBytes|needKeyValue` are difficult to interpret
to me.
nit: maybe replace `need*` with `requiresAccess*` or simply `access*`, or
similar.

Many thanks,

Jorge.

[1]
https://github.com/apache/kafka/blob/3c059133d3008d87f018f2efa4af27027fd5d18e/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java#L41

On Wed, 21 Jun 2023 at 19:08, Kirk True  wrote:

> Hi Edo/Adrian!
>
> Thanks for the KIP.
>
> I have some questions, and apologies that the may fall under the “stupid”
> column because I’m not that familiar with this area :)
>
> 1. Does record.validation.policy.class.name support multiple classes, or
> just one? I’m probably not wrapping my head around it, but I’d imagine
> different policies for different families or groupings of topics, thus the
> need for supporting multiple policies. But if there are multiple then you
> run the risk of conflicts of ownership of validation, so ¯\_(ツ)_/¯
>
> 2. Is there any concern that a validation class may alter the contents of
> the ByteBuffer of the key and/or value? Perhaps that’s already handled
> and/or outside the scope of this KIP?
>
> 3. What is the benefit to introducing the inner TopicMetadata and
> RecordProxy interfaces vs. passing the TopicPartition, String (validation
> policy), and Record into the validate() method directly?
>
> Thanks,
> Kirk
>
> > On Jun 20, 2023, at 2:28 AM, Edoardo Comar 
> wrote:
> >
> > Thanks Николай,
> > We’d like to open a vote next week.
> > Hopefully getting some more feedback before then.
> >
> > Edo
> >
> >
> > On Wed, 7 Jun 2023 at 19:15, Николай Ижиков  wrote:
> >
> >> Hello.
> >>
> >> As author of one of related KIPs I’m +1 for this change.
> >> Long waited feature.
> >>
> >>> 7 июня 2023 г., в 19:02, Edoardo Comar  написал(а):
> >>>
> >>> Dear all,
> >>> Adrian and I would like to start a discussion thread on
> >>>
> >>> KIP-940: Broker extension point for validating record contents at
> >> produce time
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-940%3A+Broker+extension+point+for+validating+record+contents+at+produce+time
> >>>
> >>> This KIP proposes a new broker-side extension point (a “record
> >> validation policy”) that can be used to reject records 

Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-21 Thread Kirk True
Hi Edo/Adrian!

Thanks for the KIP.

I have some questions, and apologies that the may fall under the “stupid” 
column because I’m not that familiar with this area :)

1. Does record.validation.policy.class.name support multiple classes, or just 
one? I’m probably not wrapping my head around it, but I’d imagine different 
policies for different families or groupings of topics, thus the need for 
supporting multiple policies. But if there are multiple then you run the risk 
of conflicts of ownership of validation, so ¯\_(ツ)_/¯

2. Is there any concern that a validation class may alter the contents of the 
ByteBuffer of the key and/or value? Perhaps that’s already handled and/or 
outside the scope of this KIP?

3. What is the benefit to introducing the inner TopicMetadata and RecordProxy 
interfaces vs. passing the TopicPartition, String (validation policy), and 
Record into the validate() method directly? 

Thanks,
Kirk

> On Jun 20, 2023, at 2:28 AM, Edoardo Comar  wrote:
> 
> Thanks Николай,
> We’d like to open a vote next week.
> Hopefully getting some more feedback before then.
> 
> Edo
> 
> 
> On Wed, 7 Jun 2023 at 19:15, Николай Ижиков  wrote:
> 
>> Hello.
>> 
>> As author of one of related KIPs I’m +1 for this change.
>> Long waited feature.
>> 
>>> 7 июня 2023 г., в 19:02, Edoardo Comar  написал(а):
>>> 
>>> Dear all,
>>> Adrian and I would like to start a discussion thread on
>>> 
>>> KIP-940: Broker extension point for validating record contents at
>> produce time
>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-940%3A+Broker+extension+point+for+validating+record+contents+at+produce+time
>>> 
>>> This KIP proposes a new broker-side extension point (a “record
>> validation policy”) that can be used to reject records published by a
>> misconfigured client.
>>> Though general, it is aimed at the common, best-practice use case of
>> defining Kafka message formats with schemas maintained in a schema registry.
>>> 
>>> Please post your feedback, thanks !
>>> 
>>> Edoardo & Adrian
>>> 
>>> Unless otherwise stated above:
>>> 
>>> IBM United Kingdom Limited
>>> Registered in England and Wales with number 741598
>>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>> 
>> 



Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-20 Thread Edoardo Comar
Thanks Николай,
We’d like to open a vote next week.
Hopefully getting some more feedback before then.

Edo


On Wed, 7 Jun 2023 at 19:15, Николай Ижиков  wrote:

> Hello.
>
> As author of one of related KIPs I’m +1 for this change.
> Long waited feature.
>
> > 7 июня 2023 г., в 19:02, Edoardo Comar  написал(а):
> >
> > Dear all,
> > Adrian and I would like to start a discussion thread on
> >
> > KIP-940: Broker extension point for validating record contents at
> produce time
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-940%3A+Broker+extension+point+for+validating+record+contents+at+produce+time
> >
> > This KIP proposes a new broker-side extension point (a “record
> validation policy”) that can be used to reject records published by a
> misconfigured client.
> > Though general, it is aimed at the common, best-practice use case of
> defining Kafka message formats with schemas maintained in a schema registry.
> >
> > Please post your feedback, thanks !
> >
> > Edoardo & Adrian
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>
>


Re: [DISCUSS] KIP-940: Broker extension point for validating record contents at produce time

2023-06-07 Thread Николай Ижиков
Hello.

As author of one of related KIPs I’m +1 for this change.
Long waited feature.

> 7 июня 2023 г., в 19:02, Edoardo Comar  написал(а):
> 
> Dear all,
> Adrian and I would like to start a discussion thread on
> 
> KIP-940: Broker extension point for validating record contents at produce time
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-940%3A+Broker+extension+point+for+validating+record+contents+at+produce+time
> 
> This KIP proposes a new broker-side extension point (a “record validation 
> policy”) that can be used to reject records published by a misconfigured 
> client.
> Though general, it is aimed at the common, best-practice use case of defining 
> Kafka message formats with schemas maintained in a schema registry.
> 
> Please post your feedback, thanks !
> 
> Edoardo & Adrian
> 
> Unless otherwise stated above:
> 
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU