Re: [DISCUSS] KIP-1031: Control offset translation in MirrorSourceConnector

2024-05-21 Thread Omnia Ibrahim
> I have added comments to your PR (
> https://github.com/apache/kafka/pull/15999#pullrequestreview-2066823538)
> 
> in short, `sourcePartition` and `sourceOffset` are unused if
> emit.offset-syncs.enabled=false
I’ll have a look into the PR.
Also regarding my previous comment on `sync.group.offsets.interval.seconds` we 
don’t need to check this if it is -1 as the only way for 
`sync.group.offsets.interval.seconds` or `emit.checkpoints.interval.seconds` to 
be -1 is if emit.checkpoints.enabled and `sync.group.offsets.enabled are both 
false which we check in MirrorCheckpointConnector

Anyway we can continue discussing this on the PR

> BTW, I'm +1 to this KIP, and I noticed my previous comments are related to
> code. Hence, please feel free to open votes. We can have discussion about
> the code later.

It was voted in few weeks ago.

Omnia

> On 21 May 2024, at 14:25, Chia-Ping Tsai  wrote:
> 
>> Which SourceRecord are you referring to here?
> 
> I have added comments to your PR (
> https://github.com/apache/kafka/pull/15999#pullrequestreview-2066823538)
> 
> in short, `sourcePartition` and `sourceOffset` are unused if
> emit.offset-syncs.enabled=false
> 
> BTW, I'm +1 to this KIP, and I noticed my previous comments are related to
> code. Hence, please feel free to open votes. We can have discussion about
> the code later.
> 
> 
> 
> Omnia Ibrahim  於 2024年5月21日 週二 下午9:10寫道:
> 
>> Hi Chia-Ping
>>> It seems we can disable the sync of idle consumers by setting
>> `sync.group.offsets.interval.seconds` to -1, so the fail-fast should
>> include sync.group.offsets.interval.seconds too. For another, maybe we
>> should do fail-fast for MirrorCheckpointConnector even though we don't have
>> this KIP
>> I don’t think we need to fail fast with
>> `sync.group.offsets.interval.seconds` to -1, as `MirrorCheckpointConnector`
>> runs two functionality based on offset-syncs topic that can run separately
>> 1. Write group offset to checkpoints internal topic can be disabled with
>> `emit.checkpoints.interval.seconds` -1
>> 2. Schedule syncing the group offset to __consumer_offsets later can be
>> disabled with  `sync.group.offsets.interval.seconds` to -1
>> 
>> So technically `MirrorCheckpointConnector` can run if only one of these
>> intervals is set to -1 however, if we want to fail fast we should check
>> both `sync.group.offsets.interval.seconds`  and
>> `emit.checkpoints.interval.seconds` not set to -1 as this would be useless.
>> 
>> 
>>> 2) Should we do similar fail-fast for MirrorSourceConnector if user set
>> custom producer configs with emit.offset-syncs.enabled=false? I assume the
>> producer which sending records to offset-syncs topic won't be created if
>> emit.offset-syncs.enabled=false
>> This is a good point I’ll update MirrorSourceConnector’s validate method
>> to address this. I think we should also address
>> `offset-syncs.topic.location` and `offset-syncs.topic.replication.factor`
>> as well as custom consumer, and admin client configs.
>> 
>> 
>>> 3) Should we simplify the SourceRecord if
>> emit.offset-syncs.enabled=false? Maybe that can get a bit performance
>> improvement.
>> Which SourceRecord are you referring to here?
>> 
>> Omnia
>>> On 20 May 2024, at 16:58, Chia-Ping Tsai  wrote:
>>> 
>>> Nice KIP. some minor comments/questions are listed below.
>>> 
>>> 1) It seems we can disable the sync of idle consumers by setting
>> `sync.group.offsets.interval.seconds` to -1, so the fail-fast should
>> include sync.group.offsets.interval.seconds too. For another, maybe we
>> should do fail-fast for MirrorCheckpointConnector even though we don't have
>> this KIP
>>> 
>>> 2) Should we do similar fail-fast for MirrorSourceConnector if user set
>> custom producer configs with emit.offset-syncs.enabled=false? I assume the
>> producer which sending records to offset-syncs topic won't be created if
>> emit.offset-syncs.enabled=false
>>> 
>>> 3) Should we simplify the SourceRecord if
>> emit.offset-syncs.enabled=false? Maybe that can get a bit performance
>> improvement.
>>> 
>>> Best,
>>> Chia-Ping
>>> 
>>> On 2024/04/08 10:03:50 Omnia Ibrahim wrote:
>>>> Hi Chris,
>>>> Validation method is a good call. I updated the KIP to state that the
>> checkpoint connector will fail if the configs aren’t correct. And updated
>> the description of the new config to explain the impact of it on checkpoint
>> connector as well.
>>>> 
>>>> If there is no any other feedback from anyone I

Re: [DISCUSS] KIP-1031: Control offset translation in MirrorSourceConnector

2024-05-21 Thread Omnia Ibrahim
Hi Chia-Ping 
>  It seems we can disable the sync of idle consumers by setting 
> `sync.group.offsets.interval.seconds` to -1, so the fail-fast should include 
> sync.group.offsets.interval.seconds too. For another, maybe we should do 
> fail-fast for MirrorCheckpointConnector even though we don't have this KIP
I don’t think we need to fail fast with `sync.group.offsets.interval.seconds` 
to -1, as `MirrorCheckpointConnector` runs two functionality based on 
offset-syncs topic that can run separately 
1. Write group offset to checkpoints internal topic can be disabled with  
`emit.checkpoints.interval.seconds` -1
2. Schedule syncing the group offset to __consumer_offsets later can be 
disabled with  `sync.group.offsets.interval.seconds` to -1

So technically `MirrorCheckpointConnector` can run if only one of these 
intervals is set to -1 however, if we want to fail fast we should check both 
`sync.group.offsets.interval.seconds`  and `emit.checkpoints.interval.seconds` 
not set to -1 as this would be useless. 

 
> 2) Should we do similar fail-fast for MirrorSourceConnector if user set 
> custom producer configs with emit.offset-syncs.enabled=false? I assume the 
> producer which sending records to offset-syncs topic won't be created if 
> emit.offset-syncs.enabled=false
This is a good point I’ll update MirrorSourceConnector’s validate method to 
address this. I think we should also address `offset-syncs.topic.location` and 
`offset-syncs.topic.replication.factor` as well as custom consumer, and admin 
client configs.


> 3) Should we simplify the SourceRecord if emit.offset-syncs.enabled=false? 
> Maybe that can get a bit performance improvement.
Which SourceRecord are you referring to here? 

Omnia
> On 20 May 2024, at 16:58, Chia-Ping Tsai  wrote:
> 
> Nice KIP. some minor comments/questions are listed below.
> 
> 1) It seems we can disable the sync of idle consumers by setting 
> `sync.group.offsets.interval.seconds` to -1, so the fail-fast should include 
> sync.group.offsets.interval.seconds too. For another, maybe we should do 
> fail-fast for MirrorCheckpointConnector even though we don't have this KIP
> 
> 2) Should we do similar fail-fast for MirrorSourceConnector if user set 
> custom producer configs with emit.offset-syncs.enabled=false? I assume the 
> producer which sending records to offset-syncs topic won't be created if 
> emit.offset-syncs.enabled=false
> 
> 3) Should we simplify the SourceRecord if emit.offset-syncs.enabled=false? 
> Maybe that can get a bit performance improvement.
> 
> Best,
> Chia-Ping
> 
> On 2024/04/08 10:03:50 Omnia Ibrahim wrote:
>> Hi Chris, 
>> Validation method is a good call. I updated the KIP to state that the 
>> checkpoint connector will fail if the configs aren’t correct. And updated 
>> the description of the new config to explain the impact of it on checkpoint 
>> connector as well. 
>> 
>> If there is no any other feedback from anyone I would like to start the 
>> voting thread in few days. 
>> Thanks 
>> Omnia
>> 
>>> On 8 Apr 2024, at 06:31, Chris Egerton  wrote:
>>> 
>>> Hi Omnia,
>>> 
>>> Ah, good catch. I think failing to start the checkpoint connector if offset
>>> syncs are disabled is fine. We'd probably want to do that via the
>>> Connector::validate [1] method in order to be able to catch invalid configs
>>> during preflight validation, but it's not necessary to get that specific in
>>> the KIP (especially since we may add other checks as well).
>>> 
>>> [1] -
>>> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/Connector.html#validate(java.util.Map)
>>> 
>>> Cheers,
>>> 
>>> Chris
>>> 
>>> On Thu, Apr 4, 2024 at 8:07 PM Omnia Ibrahim 
>>> wrote:
>>> 
>>>> Thanks Chris for the feedback
>>>>> 1. It'd be nice to mention that increasing the max offset lag to INT_MAX
>>>>> could work as a partial workaround for users on existing versions (though
>>>>> of course this wouldn't prevent creation of the syncs topic).
>>>> I updated the KIP
>>>> 
>>>>> 2. Will it be illegal to disable offset syncs if other features that rely
>>>>> on offset syncs are explicitly enabled in the connector config? If
>>>> they're
>>>>> not explicitly enabled then it should probably be fine to silently
>>>> disable
>>>>> them, but I'd be interested in your thoughts.
>>>> The rest of the features that relays on this is controlled by
>>>> emit.checkpoints.enabled (enabled by default) and
>>>> sync.group.offsets.enabled (disa

Re: [DISCUSS] KIP-1044: A proposal to change idempotent producer -- server implementation

2024-05-20 Thread Omnia Ibrahim
ine is only a view on its transactions.
>>> This
>>>>   is the classic stream / table dichotomy.
>>>>   - What the "cache" is trying to do is create that view.
>>>>   - In some cases the size of the state exceeds the storage of the
>> cache
>>>>   and the systems fail.
>>>>   - The current solutions have attempted to place limits on the size
>> of
>>>>   the state.
>>>>   - Errors in implementation and or configuration will eventually lead
>>> to
>>>>   "problem producers"
>>>>   - Under the adopted fixes and current slate of proposals, the
>> "problem
>>>>   producers" solutions have cascading side effects on properly behaved
>>>>   producers. (e.g. dropping long running, slow producing producers)
>>>> 
>>>> For decades (at least since the 1980's and anecdotally since the
>> 1960's)
>>>> there has been a solution to processing state where the size of the
>> state
>>>> exceeded the memory available.  It is the solution that drove the idea
>>> that
>>>> you could have tables in Kafka.  The idea that we can store the hot
>> PIDs
>>> in
>>>> memory using an LRU and write data to storage so that we can quickly
>> find
>>>> things not in the cache is not new.  It has been proven.
>>>> 
>>>> I am arguing that we should not throw away state data because we are
>>>> running out of memory.  We should persist that data to disk and
>> consider
>>>> the disk as the source of truth for state.
>>>> 
>>>> Claude
>>>> 
>>>> 
>>>> On Wed, May 15, 2024 at 7:42 PM Justine Olshan
>>>> 
>>>> wrote:
>>>> 
>>>>> +1 to the comment.
>>>>> 
>>>>>> I still feel we are doing all of this only because of a few
>>>> anti-pattern
>>>>> or misconfigured producers and not because we have “too many
>> Producer”.
>>>> I
>>>>> believe that implementing Producer heartbeat and remove short-lived
>>> PIDs
>>>>> from the cache if we didn’t receive heartbeat will be more simpler
>> and
>>>> step
>>>>> on right direction  to improve idempotent logic and maybe try to make
>>> PID
>>>>> get reused between session which will implement a real idempotent
>>>> producer
>>>>> instead of idempotent session.  I admit this wouldn’t help with old
>>>> clients
>>>>> but it will put us on the right path.
>>>>> 
>>>>> This issue is very complicated and I appreciate the attention on it.
>>>>> Hopefully we can find a good solution working together :)
>>>>> 
>>>>> Justine
>>>>> 
>>>>> On Wed, May 15, 2024 at 8:36 AM Omnia Ibrahim <
>> o.g.h.ibra...@gmail.com
>>>> 
>>>>> wrote:
>>>>> 
>>>>>> Also in the rejection alternatives you listed an approved KIP which
>>> is
>>>> a
>>>>>> bit confusing can you move this to motivations instead
>>>>>> 
>>>>>>> On 15 May 2024, at 14:35, Claude Warren 
>> wrote:
>>>>>>> 
>>>>>>> This is a proposal that should solve the OOM problem on the
>> servers
>>>>>> without
>>>>>>> some of the other proposed KIPs being active.
>>>>>>> 
>>>>>>> Full details in
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1044%3A+A+proposal+to+change+idempotent+producer+--+server+implementation
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> LinkedIn: http://www.linkedin.com/in/claudewarren
>>>> 
>>> 
>> 
>> 
>> --
>> LinkedIn: http://www.linkedin.com/in/claudewarren
>> 



Re: [DISCUSS] KIP-1044: A proposal to change idempotent producer -- server implementation

2024-05-20 Thread Omnia Ibrahim
024 at 7:42 PM Justine Olshan
>> 
>> wrote:
>> 
>>> +1 to the comment.
>>> 
>>>> I still feel we are doing all of this only because of a few
>> anti-pattern
>>> or misconfigured producers and not because we have “too many Producer”.
>> I
>>> believe that implementing Producer heartbeat and remove short-lived PIDs
>>> from the cache if we didn’t receive heartbeat will be more simpler and
>> step
>>> on right direction  to improve idempotent logic and maybe try to make PID
>>> get reused between session which will implement a real idempotent
>> producer
>>> instead of idempotent session.  I admit this wouldn’t help with old
>> clients
>>> but it will put us on the right path.
>>> 
>>> This issue is very complicated and I appreciate the attention on it.
>>> Hopefully we can find a good solution working together :)
>>> 
>>> Justine
>>> 
>>> On Wed, May 15, 2024 at 8:36 AM Omnia Ibrahim 
>>> wrote:
>>> 
>>>> Also in the rejection alternatives you listed an approved KIP which is
>> a
>>>> bit confusing can you move this to motivations instead
>>>> 
>>>>> On 15 May 2024, at 14:35, Claude Warren  wrote:
>>>>> 
>>>>> This is a proposal that should solve the OOM problem on the servers
>>>> without
>>>>> some of the other proposed KIPs being active.
>>>>> 
>>>>> Full details in
>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1044%3A+A+proposal+to+change+idempotent+producer+--+server+implementation
>>>> 
>>>> 
>>> 
>> 
>> 
>> --
>> LinkedIn: http://www.linkedin.com/in/claudewarren
>> 



Re: [VOTE] KIP-932: Queues for Kafka

2024-05-16 Thread Omnia Ibrahim
Thanks for the KIP Andrew +1 none binding from me

> On 16 May 2024, at 14:23, David Jacot  wrote:
> 
> Hi Andrew,
> 
> Thanks for the KIP! This is really exciting! +1 (binding) from me.
> 
> One note regarding the partition assignor interface changes that you
> proposed, it would be great to get the changes in 3.8 in order to not break
> the API of KIP-848 after the preview.
> 
> Best,
> David
> 
> On Wed, May 15, 2024 at 10:37 PM Jun Rao  wrote:
> 
>> Hi, Andrew,
>> 
>> Thanks for the update. Should we mark whether those metrics are
>> standard/required for KIP-714?
>> 
>> Jun
>> 
>> On Tue, May 14, 2024 at 7:31 AM Andrew Schofield <
>> andrew_schofi...@live.com>
>> wrote:
>> 
>>> Hi,
>>> I have made a small update to the KIP as a result of testing the new
>>> share consumer with client telemetry (KIP-714).
>>> 
>>> I’ve added telemetry metric names to the table of client metrics and
>>> also updated the metric group names so that the resulting client metrics
>>> sent to the broker have consistent names.
>>> 
>>> Thanks,
>>> Andrew
>>> 
 On 8 May 2024, at 12:51, Manikumar  wrote:
 
 Hi Andrew,
 
 Thanks for the KIP.  Great write-up!
 
 +1 (binding)
 
 Thanks,
 
 On Wed, May 8, 2024 at 12:17 PM Satish Duggana <
>> satish.dugg...@gmail.com>
>>> wrote:
> 
> Hi Andrew,
> Thanks for the nice KIP, it will allow other messaging use cases to be
> onboarded to Kafka.
> 
> +1 from me.
> 
> Satish.
> 
> On Tue, 7 May 2024 at 03:41, Jun Rao 
>> wrote:
>> 
>> Hi, Andrew,
>> 
>> Thanks for the KIP. +1
>> 
>> Jun
>> 
>> On Mon, Mar 18, 2024 at 11:00 AM Edoardo Comar <
>> edoardli...@gmail.com>
>> wrote:
>> 
>>> Thanks Andrew,
>>> 
>>> +1 (binding)
>>> 
>>> Edo
>>> 
>>> On Mon, 18 Mar 2024 at 16:32, Kenneth Eversole
>>>  wrote:
 
 Hi Andrew
 
 + 1 (Non-Binding)
 
 This will be great addition to Kafka
 
 On Mon, Mar 18, 2024 at 8:27 AM Apoorv Mittal <
>>> apoorvmitta...@gmail.com>
 wrote:
 
> Hi Andrew,
> Thanks for writing the KIP. This is indeed going to be a valuable
>>> addition
> to the Kafka, excited to see the KIP.
> 
> + 1 (Non-Binding)
> 
> Regards,
> Apoorv Mittal
> +44 7721681581
> 
> 
> On Sun, Mar 17, 2024 at 11:16 PM Andrew Schofield <
> andrew_schofield_j...@outlook.com> wrote:
> 
>> Hi,
>> I’ve been working to complete KIP-932 over the past few months
>> and
>> discussions have quietened down.
>> 
>> I’d like to open the voting for KIP-932:
>> 
>> 
>> 
> 
>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>> 
>> Thanks,
>> Andrew
> 
>>> 
>>> 
>>> 
>> 



Re: [DISCUSS] KIP-1044: A proposal to change idempotent producer -- server implementation

2024-05-15 Thread Omnia Ibrahim
Also in the rejection alternatives you listed an approved KIP which is a bit 
confusing can you move this to motivations instead 

> On 15 May 2024, at 14:35, Claude Warren  wrote:
> 
> This is a proposal that should solve the OOM problem on the servers without
> some of the other proposed KIPs being active.
> 
> Full details in
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1044%3A+A+proposal+to+change+idempotent+producer+--+server+implementation



Re: [DISCUSS] KIP-1044: A proposal to change idempotent producer -- server implementation

2024-05-15 Thread Omnia Ibrahim
Hi Claude
Thanks for raising this KIP. It is an interesting idea. I had a quick review 
for the KIP and I have few notes 
10. 
> The issue is that the number of PIDs that need to be tracked has exploded and 
> has resulted in OOM failures that cause the entire cluster to crash.  There 
> are multiple efforts underway to mitigate the OOM problem through cache 
> cleanup and throttling of clients.

I think we should clarify here that this only happened  when the cluster has an 
abusive/misconfigured client that initialises too many PIDs. For example I saw 
this issue 3 times 
1. First one was because of an application that kept re-initalizing producer on 
every single error message they received from Kafka instead of retrying or 
skipping the records. This one took longer to fill the memory ~ 24hr (this was 
before the 24hr expiration) but eventually it did. 
2. one producer deployment stuck in crashing loop which created  >500,000 PID 
in few hours due to some misconfiguration that led the application to crash 
after sending the first batch 
3. another encounter was a producer initialising PID on each record which led 
to creation of 1M PID in few hours which is an anti-pattern.
So technically this OOM only happened when we get small number of misconfigured 
producers or anti-pattern design. 
11. Another thing is maybe worth pointing out here is KIP-936 as throttling is 
the other option we are weighting against in this KIP. 
12. I feel the motivation isn’t clear enough for people who aren’t familiar 
with this OOM issue. Especially that not a lot of people experienced this issue.
13. I still feel we are doing all of this only because of a few anti-pattern or 
misconfigured producers and not because we have “too many Producer”.  I believe 
that implementing Producer heartbeat and remove short-lived PIDs from the cache 
if we didn’t receive heartbeat will be more simpler and step on right direction 
 to improve idempotent logic and maybe try to make PID get reused between 
session which will implement a real idempotent producer instead of idempotent 
session.  I admit this wouldn’t help with old clients but it will put us on the 
right path.


Omnia



> On 15 May 2024, at 14:35, Claude Warren  wrote:
> 
> This is a proposal that should solve the OOM problem on the servers without
> some of the other proposed KIPs being active.
> 
> Full details in
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1044%3A+A+proposal+to+change+idempotent+producer+--+server+implementation



Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-05-07 Thread Omnia Ibrahim
Hi Justine Thanks for the feedback 

> So consider a case where there is a storm for a given principal. We could
> have a large mass of short lived producers in addition to some
> "well-behaved" ones. My understanding is that if the "well-behaved" one
> doesn't produce as frequently ie less than once per hour, it will also get
> throttled when a storm of short-lived producers leads the principal to hit
> the given rate necessary for throttling. The idea of the filter is that we
> don't throttle existing producers, but in this case, we will.
I believe this would be part of the limitation of the KIP (which is similar to 
some extend with the rest of Kafka Quotas) If KafkaPrincipal of ClientId is 
shared between different use-cases where we have few well behaving and some 
misbehaving we will punish all as these are the identifiers we have about them. 

If I understand the scenario you described correctly I think I can break it to 
the following uses cases
1. KafkaPrincipal is shared between multiple applications with different 
ClientIds then It is not fair to throttle one application because the other one 
that have been configured with same KafkaPrincipal is misbehaving. This can be 
solved by breaking the throttling to be based on the combination of 
KafkaPrincipal-ClientId, it will increase the number of entries in the cache 
but will at least isolate applications. And I still believe it might be tricky 
for whoever is manning the cluster to list and throttle most client ids.
2. A new version of the application has been misconfigured and during rolling 
the upgrades only half the instances of this app has the misconfigured version 
and it keep creating short-lived PIDs while the other half has the well 
behaving but it produce on slower base so it will produce every two hours. This 
one will be impacted and it bit tricky to track these.

In both cases I don’t believe we need to wait for the 24hr expiration of PID to 
hit however, I believe we need to follow one of these solutions 
1. Break these uses cases to use different KafkaPrincipal (and or ClientId if 
we opt-in for this) 
2. If both cases are using the same KafkaPrincipal then they are most likely 
the same owner of both apps and they will need to shutdown or fix the 
misbehaving/misconfigured application that create all these instances and we 
need to wait for the throttle time to pass before the well behaved client 
proceed with the unseen PID. 

> Note -- one thing that wasn't totally clear from the KIP was whether we
> throttle all new produce requests from the client or just the ones with
> unseen IDs. If we throttle them all, perhaps this point isn't a huge deal.
I’ll clarify this. But the KIP is aiming to throttle all unseen PID for X 
amount of time which is the throttle time. 

> The other concern that I brought up is that when we throttle, we will
> likely continue to throttle until the storm stops. This is because we will
> have to wait 1 day or so for IDs to expire, and we will likely replace them
> at a pretty fast rate. This can be acceptable if we believe that it is
> helpful to getting the behavior to stop, but I just wanted to call out that
> the user will likely not be able to start clients in the meantime.
Am not sure the producer need to wait for 1 day (unless the PID quota is set 
too high) as we are throttling unseen PID per user any PID above the quota will 
not be registered at the leader side and we don’t store anything for idempotent 
at initialising so am not sure I see need to wait for 1 day unless am missing 
something. 
If User-A has `producer_ids_rate` 100 and the broker can store 2,000,000 before 
hit out of memory. Then the leader will only store a 100 PIDs in 1 hour and 
throttle any unseen PID as these will be considered new PIDs. If we ht the 
scenario that you described within  1 hr window then this client should be 
alerted that it didn’t produce and got throttled. Then we can apply one of the 
solutions I mentioned in first point. They either split the use cases to 
different KafkaPrincipals or shutdown the misconfigured app and wait the 
throttle time pass. The throttle doesn’t get controlled by the 24hr expiration 
of PID and we don’t even check if PID has expired or not. 

I think I might be missing something regarding the need to wait for 24hr to 
resume! 

Omnia


> On 6 May 2024, at 20:46, Justine Olshan  wrote:
> 
> Hi Claude,
> 
> I can clarify my comments.
> 
> Just to clarify -- my understanding is that we don't intend to throttle any
> new producer IDs at the beginning. I believe this amount is specified by
> `producer_ids_rate`, but you can see this as a number of producer IDs per
> hour.
> 
> So consider a case where there is a storm for a given principal. We could
> have a large mass of short lived producers in addition to some
> "well-behaved" ones. My understanding is that if the "well-behaved" one
> doesn't produce as frequently ie less than once per hour, it will also get
> throttled when a 

Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-05-07 Thread Omnia Ibrahim
Hi Igor, thanks for the feedback and sorry for the late response. 

> 10 Given the goal is to prevent OOMs, do we also need to
> limit the number of KafkaPrincipals in use?

None of the Kafka quotas ever limited number of KafkaPrincipals and I don’t 
really think this is the issue as you just need one misconfigured application 
to hit the OOM issue. 
Also limiting number of KafkaPrincipals will cause problems for on-boarding new 
cases to use the cluster. What is the question behind this question? Is there a 
use case you have in mind that need this?

> 11. How would an operator know or decide to change the configuration
> for the number layers – producer.id.quota.cache.layer.count –
> e.g. increasing from 4 to 5; and why?
> Do we need a new metric to indicate that change could be useful?
> 
> 12. Is producer.id.quota.cache.cleanup.scheduler.interval.ms a
> guaranteed interval, or rather simply a delay between cleanups?
> How did you decide on the default value of 10ms?

I am leaning toward dropping all the caching layer configs and keeping them as 
constant and we can later make them a config if there is a request for this. 
The 10min doesn’t really have a reason behind. It was a random number. 

> 16. LayeredBloomFilter will have a fixed size (right?), but some
> users (KafkaPrincipal) might only use a small number of PIDs.
> It it worth having a dual strategy, where we simply keep a Set of
> PIDs until we reach certain size where it pays off to use
> the LayeredBloomFilter?
The size is driven from the `producer_id_rate` for each KafkaPrincipal. We 
might hit this issue for sure. I believe Claude covered this point. 
> 
>  a. INIT_PRODUCER_ID for idempotent producer request PIDs from
>  random controller every time so if a client got throttled on
>  one controller doesn't guarantee it will not go through on next
>  controller causing OOM at the leader later.
> 
> Is the INIT_PRODUCER_ID request really sent to a "random controller"?
> From a quick look at Sender.maybeSendAndPollTransactionalRequest,
> for an idempotent producer, targetNode is set to the broker with
> fewest outstanding requests. Am I looking at the wrong place?
No you are not looking at the wrong place however this still a random comaring 
with TXN_ID for example or GROUP_ID and not dedicated by any mean as it can 
change at any point depend on which one has less outstanding requests. 

However if we want to focus on where the problem happened this will be the 
leader. We can throttle INIT_PID but this will not limit how many PID can 
produce to one 1 broker 
Let’s image this scenario where we have a cluster where each broker can max 
have 200 PID without hitting OOM. And each KafkaPrincipal can INIT_PID for X 
number (let say 10) and we have like 20 active KafkaPrincipal. Which in total 
we can have 200 INIT_PID request. Now on the leadership side of producing we 
have 
Broker-1 has 100 PID in memory 
Broker-2 has 150 PID in memory 
Broker-3 has 100 PID in memory 
Broker-4 has 120 PID in memory 
Broker-5 has 100 PID in memory 
Broker-6 has 90 PID in memory 
Now Broker-1 is down and for the sake of this example not all 100 PID will move 
to one broker but 60 PID will connect to Broker-2 bringing its total PID to 
210. Now these PIDs aren’t going to be throttle by INIT_PID as they already has 
been initialised which will cause OOM now Broker-2 is down. If Broker-1/2 down 
now the problem is propagated and the cluster will be down as each broker try 
to take leadership will get the initialised PIDs and hit OOM until someone 
manually increase the memory of the brokers or if we hit the PID cleanup at 
some point of all of this mess. 

Hope this explain my concerns with throttling `INIT_PRODUCER_ID`. 

Thanks
Omnia

> On 1 May 2024, at 15:42, Igor Soarez  wrote:
> 
> Hi Omnia, Hi Claude,
> 
> Thanks for putting this KIP together.
> This is an important unresolved issue in Kafka,
> which I have witnessed several times in production.
> 
> Please see my questions below:
> 
> 10 Given the goal is to prevent OOMs, do we also need to
> limit the number of KafkaPrincipals in use?
> 
> 11. How would an operator know or decide to change the configuration
> for the number layers – producer.id.quota.cache.layer.count –
> e.g. increasing from 4 to 5; and why?
> Do we need a new metric to indicate that change could be useful?
> 
> 12. Is producer.id.quota.cache.cleanup.scheduler.interval.ms a
> guaranteed interval, or rather simply a delay between cleanups?
> How did you decide on the default value of 10ms?
> 
> 13. Under "New ProducerIdQuotaManagerCache", the documentation for
> the constructor params for ProducerIDQuotaManagerCache does not
> match the constructor signature.
> 
> 14. Under "New ProducerIdQuotaManagerCache":
>  public boolean track(KafkaPrincipal principal, int producerIdRate, long pid)
> How is producerIdRate used? The reference implementation Claude shared
> does not use it.
> 

Re: [DISCUSS] solutions for broker OOM caused by many producer IDs

2024-05-01 Thread Omnia Ibrahim
Hi Luke and Justine. There are few updates on KIP-936  
https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
 to introduce throttling on PIDs per User and would love to hear your feedback 
in the discussion thread  
https://lists.apache.org/thread/nxp395zmvc0s8r4ohg91kdb19dxsbxlt if you have 
time. 

Thanks 
Omnia

> On 6 Jun 2023, at 15:04, Omnia Ibrahim  wrote:
> 
> Thanks, Luke for the feedback
> 
>> 1. how do we store value in bloom filter? It's unclear from this KIP that 
>> what we store inside bloom filter, and how we throttle them.
>> My understanding is, we have a map with key = kafkaPrinciple, and value = 
>> PID for each bloom filter.
>> And when new PID created for a userA, we update the map to add PID into the 
>> cache value (i.e. the bloom filter)
>> When the window passed half of the time, we created another bloom filter, 
>> and this time, when new PID comes, we check if this new PID existed in 
>> previous bloom filter, if not, we add into the new bloom filter. And in the 
>> meantime, we track the "new created" count (filtered by previous bloom 
>> filter) for throttling the users.
>> Is my understanding correct?
> 
> Not quite what am proposing. I'm proposing 
> a cache layer to be used only for checking if we encountered the PID before 
> or not for a given KafkaPrincipal. Not as a counter.
> if the cache layer doesn't contain the PIDs I'll increment the metric sensor 
> using Sensor::record (the sensor will be created during the initial 
> interaction with this KafkaPrincipal). Sensor::record fails with 
> QuotaViolationException when we reach the max of the sensor.
> If incrementing the sensor didn't fail with QuotaViolationException then I'll 
> add the PID to the cache for the next time
> To achieve this I'm proposing this the cache layer will be represented as a 
> "cachedMap = Map>". I 
> wrapped "Map>" into 
> "TimedControlledBloomFilter" where we decide which bloom filter to write to, 
> which bloomfilter to delete, etc.
> 
> When we encounter the producer for the first time,
> the new quota manager will create the sensor and update its value.
> Then it will update the cache with this PID for the next time.
> The cache will create an entry for the user in the cachedMap
> The cache will be like this
> Map { "UserA" -> TimedBloomFilter {
>bloom_filter_1_create_timestamp -> 
> bloom_filter_1
>  }
> }
> the PID will be added to bloom_filter_1
> 2. If the producer tries to produce with the same PID the next time; the 
> quota manager will not update the sensor or the cache. And will not throttle 
> the user.
> 3. However, if the producer tries to produce with a new PID, it will be added 
> to bloom_filter_1 as long as we are within the first half of 
> producer.id.quota.window.size.seconds
> 4. If the producer sends any new PIDs after the first half of 
> producer.id.quota.window.size.seconds, we will create a new bloom filter to 
> store the PIDs from the next half of the window
> The cache will be like this
> Map { "UserA" -> TimedBloomFilter {
>bloom_filter_1_create_timestamp -> 
> bloom_filter_1
>bloom_filter_2_create_timestamp -> 
> bloom_filter_2
>  }
> }
> All PIDs from this point until the end of this window will be added to 
> bloom_filter_2.
> And both bloom_filter_1 and bloom_filter_2 will be used for checking if we 
> came across PID before or not.
> 5. a scheduler will run in the background to delete any bloom filter with 
> create_timestamp >= producer.id.quota.window.size.seconds from 
> TimedBloomFilter automatically
> 6. If the user stopped producing and all its bloom filters got deleted by the 
> scheduler the user entry will be removed from the cachedMap.
> 
> I updated the KIP to add more clarification to this caching layer.
> 
>> 2. what will user get when they can't allocate new PID due to throttling?
> 
> 
> The client will get `QuotaViolationException` similar to  ClientQuotaManager 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936:+Throttle+number+of+active+PIDs#KIP936:ThrottlenumberofactivePIDs-ClientErrors
> 
>> 3. This config: producer.id.quota.window.num is unclear to me?
> 
> `quota.window.num` is a standard config between all Kafka quota types. I am 
> re-using the same description and default value in the existing Kafka 
> codebase (am not a fan of the description as it's not clear). The sample here 
> is referring to the sliding time window. Kafka Quotas keeps 

Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-04-30 Thread Omnia Ibrahim
Hi, 
Just bringing some offline discussion and recent updated to the KIP here to the 
mailing list
Claude updated the KIP to use LayeredBloomFilter from Apache-commons 
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayeredBloomFilter.html
 for the caching layer instead of implementing TimeBasedBloomFilter from 
scratch. 
Claude and I had some discussion regarding some configurations that impact the 
bloom filter. 
The KIP originally proposed to split each window into half however this now has 
been updated to use `producer.id <http://producer.id/>.quota.cache.layer.count` 
which control how many layer each window will have. Default is 4 layers. I 
updated the Rejected Alternatives section to reflect why 2 layered bloom filter 
has been rejected
The KIP originally proposed to start a new bloom layer ever producer.id 
<http://producer.id/>.quota.window.size.seconds/2 now this will be triggered 
ever producer.id <http://producer.id/>.quota.window.size.seconds/producer.id 
<http://producer.id/>.quota.cache.layer.count. I updated the diagram in the KIP 
to reflect this. 
The producer_ids_rate per user will be driving the maximum number of PIDs in 
Shape 
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/Shape.html
 instead of introducing a global config for max size of Shape as it goes 
against the other rejected alternatives. Now Shape can be 
`Shape.fromNP(producer_ids_rate, producer.id 
<http://producer.id/>.quota.cache.false.positive.rate)
The KIP now propose another config to control the bloom's false postive rate 
using producer.id <http://producer.id/>.quota.cache.false.positive.rate by 
default this is set to 0.001 instead of being hardcoded. 

I think the KIP is now in better shape to ask others for a review. 

Omnia

> On 24 Apr 2024, at 14:48, Omnia Ibrahim  wrote:
> 
> Hi Glaude sorry that it took me a while to respond. I finally had time to 
> look into your implementation here 
> https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java#L121
>  and so far it make sense. 
> 
>> So an early PID is added to the first filter and the associated metric is
>> updated.
>> that PID is seen multiple times over the next 60 minutes, but is not added
>> to the Bloom filters again.
>> once the 60 minutes elapses the first filter is cleared, or removed and a
>> new one started.  In any case the PID is no longer recorded in any extant
>> Bloom filter.
>> the PID is seen again and is added to the newest bloom filter and the
>> associated metric is updated.
>> 
>> I believe at this point the metric is incorrect, the PID has been counted
>> 2x, when it has been in use for the entire time.
> 
> You are right! My original design with slides window bloom carries this risk 
> of duplication. I had a look into your track method and it makes sense.
>> 1. p.i.q.window.size.seconds the length of time that a window will
>>   exist.  This is also the maximum time between PID uses where the PID is
>>   considered to be the same.  Reuse of the PID after p.iq.window.size.seconds
>>   triggers recording the PID as a new PID.
>>   Define a new configuration option "producer.id.quota.window.count" as
>>   the number of windows active in window.size.seconds.
> This is correct. 
> 
>>   3. p.i.q.window.num, specified as 11 in the KIP.  I thought this was how
>>   many PIDs were expected in each window.  In the original KIP this means
>>   that we expect to see the PID 22 times (2 windows x 11).  In the Layered
>>   Bloom filter this would be the N value for the Shape.
> producer.id.quota.window.num is how many metrics samples we retain but not 
> how many PID we store in memory. 
>>   2. p.i.q.window.count (the new one), how many sections to break
>>   p.i.q.window.size.seconds into.  In the initial KIP this is 2.  This value
>>   gives us the timing for creating a new layer in the layered bloom filter.
>>   So every (p.i.q.window.size.seconds / p.i.q.window.count) seconds a new
>>   layer will be created and an old layer or layers will be removed.  The
>>   layered bloom filter will add layers to keep the probability of false
>>   positives in range.
>>   4. p.i.q.window.fpr (needs to be specified) the expected false positive
>>   rate.  Not sure how to express this in the config in a way that makes sense
>>   but something like 0.06 or the like.  This is the P value for the
>>   Shape.  See https://hur.st/bloomfilter for a Bloom filter 
> 
> For these two we might need to introduce them. How would they interact with 
> each other i

[jira] [Created] (KAFKA-16638) Align the naming convention for config and default variables in *Config classes

2024-04-28 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-16638:
-

 Summary: Align the naming convention for config and default 
variables in *Config classes
 Key: KAFKA-16638
 URL: https://issues.apache.org/jira/browse/KAFKA-16638
 Project: Kafka
  Issue Type: Task
Reporter: Omnia Ibrahim


Some classes in the code is violating the naming naming convention for config, 
doc, and default variables which is:
 * `_CONFIG` suffix for defining the configuration
 * `_DEFAULT` suffix for default value 
 * `_DOC` suffix for doc

The following classes need to be updated 
 * `CleanerConfig` and `RemoteLogManagerConfig` to use  `_CONFIG` suffix 
instead of `_PROP`.
 * Others like `LogConfig` and `QuorumConfig` to use `_DEFAULT` suffix instead 
of `DEFAULT_` prefix .
 * same goes with `CommonClientConfigs`, `StreamsConfig` however these are 
public interfaces and will need a KIP to rename the default value variables and 
mark the old one as deprecated. This might need to be broken to different Jira.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-1023: Follower fetch from tiered offset

2024-04-26 Thread Omnia Ibrahim
Thanks for the KIP. +1 non-binding from me

> On 26 Apr 2024, at 06:29, Abhijeet Kumar  wrote:
> 
> Hi All,
> 
> I would like to start the vote for KIP-1023 - Follower fetch from tiered
> offset
> 
> The KIP is here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset
> 
> Regards.
> Abhijeet.



Re: [VOTE] KIP-1031: Control offset translation in MirrorSourceConnector

2024-04-24 Thread Omnia Ibrahim
Thanks Igor, I’ll conclude the vote with 3 binding votes from Mickael Maison, 
Chris Egerton and Igor Soarez.
Thanks everyone 

> On 24 Apr 2024, at 15:11, Igor Soarez  wrote:
> 
> Hi Omnia,
> 
> Thanks for your answers, and I see you've updated the KIP so thanks for the 
> changes too.
> 
> +1 (binding), thanks for the KIP
> 
> --
> Igor



Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-04-24 Thread Omnia Ibrahim
umber of layers will expand and then shrink back as
> the PIDs expire.  While running there is always at least 1 layer.
> 
> Some calculated points:
> 
>   - No layer will span more than p.i.q.window.size.seconds /
>   p.i.q.window.count seconds.
>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>   p.i.q.window.count =2 and a rate of 22 PIDs per minute there will be 2
>   layers.
>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>   p.i.q.window.count =2 and a rate of 10 PIDs per minute there will be 2
>   layers.
>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>   p.i.q.window.count =2 and that no PIDS have been seen in the last 30
>   seconds there will be 2 layers.
>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>   p.i.q.window.count =2 and that no PIDS have been seen in the last 60
>   seconds there will be 1 layer.
>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>   p.i.q.window.count =2 and a rate of 23 to 44 PIDs per minute there will be
>   4 layers.
>   - the false positive rate across the layers remains at or below Shape.P
>   - Assuming Shape.N = 11 and Shape.P = 0.06 the Bloom filter at each
>   layer will consume 35 bytes. https://hur.st/bloomfilter provides a quick
>   calculator for other values.
> 
> Claude
> 
> 
> 
> 
> 
> 
> On Tue, Apr 16, 2024 at 8:06 AM Claude Warren  wrote:
> 
>> Let's put aside the CPC datasketch idea and just discuss the Bloom filter
>> approach.
>> 
>> I thinkthe problem with the way the KIP is worded is that PIDs are only
>> added if they are not seen in either of the Bloom filters.
>> 
>> So an early PID is added to the first filter and the associated metric is
>> updated.
>> that PID is seen multiple times over the next 60 minutes, but is not added
>> to the Bloom filters again.
>> once the 60 minutes elapses the first filter is cleared, or removed and a
>> new one started.  In any case the PID is no longer recorded in any extant
>> Bloom filter.
>> the PID is seen again and is added to the newest bloom filter and the
>> associated metric is updated.
>> 
>> I believe at this point the metric is incorrect, the PID has been counted
>> 2x, when it has been in use for the entire time.
>> 
>> The "track" method that I added solves this problem by ensuring that the
>> PID is always seen in the latter half of the set of Bloom filters.  In the
>> case of 2 filters that is always the second one, but remember that the
>> number of layers will grow as the filters become saturated.  So if your
>> filter is intended to hold 500 PIDs and the 501st PID is registered before
>> the expiration a new layer (Bloom filter) is added for new PIDS to be added
>> into.
>> 
>> On Mon, Apr 15, 2024 at 5:00 PM Omnia Ibrahim 
>> wrote:
>> 
>>> Hi Claude,
>>> Thanks for the implementation of the LayeredBloomFilter in apache
>>> commons.
>>> 
>>>> Define a new configuration option "producer.id.quota.window.count" as
>>>> the number of windows active in window.size.seconds.
>>> What is the different between “producer.id.quota.window.count” and
>>> producer.id.quota.window.num
>>> 
>>>> Basically the kip says, if the PID is found in either of the Bloom
>>> filters
>>>> then no action is taken
>>>> If the PID is not found then it is added and the quota rating metrics
>>> are
>>>> incremented.
>>>> In this case long running PIDs will be counted multiple times.
>>> 
>>> The PID is considered not encountered if both frames of the window don’t
>>> have it. If you checked the diagram of for `Caching layer to track active
>>> PIDs per KafkaPrincipal` you will see that each window will have 2 bloom
>>> layers and the first created one will be disposed only when we start the
>>> next window. Which means window2 is starting from the 2nd bloom. Basically
>>> the bloom filter in the KIP is trying to implement a sliding window
>>> pattern.
>>> 
>>>> think the question is not whether or not we have seen a given PID
>>> before
>>>> but rather how many unique PIDs did the principal create in the last
>>> hour.
>>>> Perhaps more exactly it is: did the Principal create more than X PIDS in
>>>> the last Y time units?
>>> We don’t really care about the count of unique PIDs per user. The KIP is
>>> trying to follow and build on top of ClientQuotaManager which already have
>>> a patter for throttling that the

Re: [VOTE] KIP-1031: Control offset translation in MirrorSourceConnector

2024-04-24 Thread Omnia Ibrahim
I updated the KIP as well to briefly explain how offset.lag.max would help 
latency. Please let me know if the KIP now looks better? 

> On 24 Apr 2024, at 11:49, Omnia Ibrahim  wrote:
> 
> Thanks Igor for the suggestions. I updated the KIP with some of them. 
> 
>> s/recored's offset/recorded offsets
> I actually mean record’s offset and not recorded offset. 
> MirrorSourceConnector store the offset of the committed record on the 
> destination cluster in an internal topic. 
> 
>> 12. The use of the word "customers" seems a bit odd to me in this context.
>> Did you perhaps mean one of "use-cases", "users" or "operators”?
> Changed this to use-cases 
> 
>> 13. "They still left with part#1 of this feature which add cost to
>> the progress of their replication."
>> I'm unsure what this means. Do you mean to say that
>> MirrorCheckpointConnector is disabled but MirrorSourceConnector is not?
>> Could you also clarify where the additional cost comes from?
> Yes currently MM2 can disable step 2 of this feature by not running 
> MirrorCheckpointConnector or by disabling emit.checkpoints.enabled  and 
> sync.group.offsets.enabled  however as long as MirrorSourceConnector is 
> running this 1st step of the feature will never be disabled and this feature 
> is costing 1. Create internal topic 2. Latency as we need to produce/queue 
> translated offsets 
>> 14. This is probably more ignorance of mine: it doesn't seem obvious in
>> the KIP how increasing offset.lag.max  to INT_MAX helps reduce latency.
> 
> Based on my knowledge and understanding, offset.lag.max controls the size of 
> queued offsets to be synced which are stored in `pendingOffsetSyncs`. Then
> 1. MM2 will try to send these periodically in `commitRecord` using 
> "MirrorSourceTask ::firePendingOffsetSyncs”. However any as `commitRecord` 
> isn’t blocker then any failed offset sync will be skipped.
> 2. To fix this then in `commit` the connector which is a blocker method will 
> retry publish any skipped offsets.   
> 
> Now If we sit  offset.lag.max  to high value then translated queued offset 
> wouldn’t be considered stale and as a result wouldn’t be added to 
> `pendingOffsetSyncs` which what `firePendingOffsetSyncs` will try to commit. 
> Hope this clarify it.
> 
>> On 20 Apr 2024, at 09:41, Igor Soarez  wrote:
>> 
>> Hi Omnia,
>> 
>> Thanks for this KIP.
>> 
>> 11. These seem to me to be small misspellings, please double-check:
>> s/MM2 main features/MM2's main features
>> s/syncing consumer group offset/syncing consumer group offsets
>> s/relays/relies
>> s/recored's offset/recorded offsets
>> s/clusters without need for/clusters without the need for
>> s/creating internal topic./creating an internal topic.
>> s/This KIP propose that/This KIP proposes that
>> 
>> 12. The use of the word "customers" seems a bit odd to me in this context.
>> Did you perhaps mean one of "use-cases", "users" or "operators"?
>> 
>> 13. "They still left with part#1 of this feature which add cost to
>> the progress of their replication."
>> I'm unsure what this means. Do you mean to say that
>> MirrorCheckpointConnector is disabled but MirrorSourceConnector is not?
>> Could you also clarify where the additional cost comes from?
>> 
>> 14. This is probably more ignorance of mine: it doesn't seem obvious in
>> the KIP how increasing offset.lag.max  to INT_MAX helps reduce latency.
>> I'm guessing it's related to KAFKA-14610 but after having a look I
>> still couldn't understand why.
>> 
>> 
>> --
>> Igor
>> 
>> On Wed, Apr 17, 2024, at 3:22 PM, Omnia Ibrahim wrote:
>>> Thanks Chris and Mickael for the votes. 
>>> Can I please get one last +1 binding vote please?
>>> 
>>> Thanks
>>> Omnia
>>> 
>>>> On 12 Apr 2024, at 13:21, Chris Egerton  wrote:
>>>> 
>>>> +1 (binding), thanks Omnia!
>>>> 
>>>> On Fri, Apr 12, 2024, 03:46 Mickael Maison  
>>>> wrote:
>>>> 
>>>>> Hi Omnia,
>>>>> 
>>>>> +1 (binding), thanks for the KIP!
>>>>> 
>>>>> Mickael
>>>>> 
>>>>> On Fri, Apr 12, 2024 at 9:01 AM Omnia Ibrahim 
>>>>> wrote:
>>>>>> 
>>>>>> Hi everyone, I would like to start a voting thread for KIP-1031: Control
>>>>> offset translation in MirrorSourceConnector
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector
>>>>>> 
>>>>>> For comments or feedback please check the discussion thread here
>>>>> https://lists.apache.org/thread/ym6zr0wrhglft5c000x9c8ych098s7h6
>>>>>> 
>>>>>> Thanks
>>>>>> Omnia
>>>>>> 
>>>>> 
>>> 
>>> 
> 



Re: [VOTE] KIP-1031: Control offset translation in MirrorSourceConnector

2024-04-24 Thread Omnia Ibrahim
Thanks Igor for the suggestions. I updated the KIP with some of them. 

> s/recored's offset/recorded offsets
I actually mean record’s offset and not recorded offset. MirrorSourceConnector 
store the offset of the committed record on the destination cluster in an 
internal topic. 

> 12. The use of the word "customers" seems a bit odd to me in this context.
> Did you perhaps mean one of "use-cases", "users" or "operators”?
Changed this to use-cases 

> 13. "They still left with part#1 of this feature which add cost to
> the progress of their replication."
> I'm unsure what this means. Do you mean to say that
> MirrorCheckpointConnector is disabled but MirrorSourceConnector is not?
> Could you also clarify where the additional cost comes from?
Yes currently MM2 can disable step 2 of this feature by not running 
MirrorCheckpointConnector or by disabling emit.checkpoints.enabled  and 
sync.group.offsets.enabled  however as long as MirrorSourceConnector is running 
this 1st step of the feature will never be disabled and this feature is costing 
1. Create internal topic 2. Latency as we need to produce/queue translated 
offsets 
> 14. This is probably more ignorance of mine: it doesn't seem obvious in
> the KIP how increasing offset.lag.max  to INT_MAX helps reduce latency.

Based on my knowledge and understanding, offset.lag.max controls the size of 
queued offsets to be synced which are stored in `pendingOffsetSyncs`. Then
1. MM2 will try to send these periodically in `commitRecord` using 
"MirrorSourceTask ::firePendingOffsetSyncs”. However any as `commitRecord` 
isn’t blocker then any failed offset sync will be skipped.
2. To fix this then in `commit` the connector which is a blocker method will 
retry publish any skipped offsets.   

Now If we sit  offset.lag.max  to high value then translated queued offset 
wouldn’t be considered stale and as a result wouldn’t be added to 
`pendingOffsetSyncs` which what `firePendingOffsetSyncs` will try to commit. 
Hope this clarify it.

> On 20 Apr 2024, at 09:41, Igor Soarez  wrote:
> 
> Hi Omnia,
> 
> Thanks for this KIP.
> 
> 11. These seem to me to be small misspellings, please double-check:
> s/MM2 main features/MM2's main features
> s/syncing consumer group offset/syncing consumer group offsets
> s/relays/relies
> s/recored's offset/recorded offsets
> s/clusters without need for/clusters without the need for
> s/creating internal topic./creating an internal topic.
> s/This KIP propose that/This KIP proposes that
> 
> 12. The use of the word "customers" seems a bit odd to me in this context.
> Did you perhaps mean one of "use-cases", "users" or "operators"?
> 
> 13. "They still left with part#1 of this feature which add cost to
> the progress of their replication."
> I'm unsure what this means. Do you mean to say that
> MirrorCheckpointConnector is disabled but MirrorSourceConnector is not?
> Could you also clarify where the additional cost comes from?
> 
> 14. This is probably more ignorance of mine: it doesn't seem obvious in
> the KIP how increasing offset.lag.max  to INT_MAX helps reduce latency.
> I'm guessing it's related to KAFKA-14610 but after having a look I
> still couldn't understand why.
> 
> 
> --
> Igor
> 
> On Wed, Apr 17, 2024, at 3:22 PM, Omnia Ibrahim wrote:
>> Thanks Chris and Mickael for the votes. 
>> Can I please get one last +1 binding vote please?
>> 
>> Thanks
>> Omnia
>> 
>>> On 12 Apr 2024, at 13:21, Chris Egerton  wrote:
>>> 
>>> +1 (binding), thanks Omnia!
>>> 
>>> On Fri, Apr 12, 2024, 03:46 Mickael Maison  wrote:
>>> 
>>>> Hi Omnia,
>>>> 
>>>> +1 (binding), thanks for the KIP!
>>>> 
>>>> Mickael
>>>> 
>>>> On Fri, Apr 12, 2024 at 9:01 AM Omnia Ibrahim 
>>>> wrote:
>>>>> 
>>>>> Hi everyone, I would like to start a voting thread for KIP-1031: Control
>>>> offset translation in MirrorSourceConnector
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector
>>>>> 
>>>>> For comments or feedback please check the discussion thread here
>>>> https://lists.apache.org/thread/ym6zr0wrhglft5c000x9c8ych098s7h6
>>>>> 
>>>>> Thanks
>>>>> Omnia
>>>>> 
>>>> 
>> 
>> 



Re: [DISCUSS] KIP-1039: Disable automatic topic creation for MirrorMaker2 consumers

2024-04-19 Thread Omnia Ibrahim
Hi Aaron,
You mentioned that there is no public interface changes however changing the 
default value of a config should be considered as a public change. You can 
check other KIP where we changed the default config value for a reference.

Can you please list what is the impact of changing the behaviour of the topic 
creation along side  as well as is there any rejected alternatives like can’t 
customer disable allow.auto.create.topics manually for example as a workaround? 

Thanks
Omnia

> On 19 Apr 2024, at 10:37, aaron ai  wrote:
> 
> Hi all,
> 
> Here is the KIP-1039: Disable automatic topic creation for MirrorMaker2
> consumers
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1039%3A+Disable+automatic+topic+creation+for+MirrorMaker2+consumers
>> 
> 
> Looking forward to your feedback!
> 
> Best regards,
> Aaron



Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-17 Thread Omnia Ibrahim
Hi Alieh, 
Thanks for the KIP! I have couple of comments
- You mentioned in the KIP motivation, 
> Another example for which a production exception handler could be useful is 
> if a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries, the producer would hang retrying forever. 
> A handler could help to break the infinite retry loop.

How the handler can differentiate between something that is temporary and it 
should keep retrying and something permanent like forgot to create the topic? 
temporary here could be 
 the producer get deployed before the topic creation finish (specially if the 
topic creation is handled via IaC)
 temporary offline partitions 
 leadership changing
Isn’t this putting the producer at risk of dropping records 
unintentionally?
- Can you elaborate more on what is written in the compatibility / migration 
plan section please by explaining in bit more details what is the changing 
behaviour and how this will impact client who are upgrading?
- In the proposal changes can you elaborate in the KIP where in the producer 
lifecycle will ClientExceptionHandler and TransactionExceptionHandler get 
triggered, and how will the producer configure them to point to costumed 
implementation.

Thanks 
Omnia

> On 17 Apr 2024, at 13:13, Alieh Saeedi  wrote:
> 
> Hi all,
> Here is the KIP-1038: Add Custom Error Handler to Producer.
> 
> I look forward to your feedback!
> 
> Cheers,
> Alieh



Re: [VOTE] KIP-1031: Control offset translation in MirrorSourceConnector

2024-04-17 Thread Omnia Ibrahim
Thanks Chris and Mickael for the votes. 
Can I please get one last +1 binding vote please?

Thanks
Omnia

> On 12 Apr 2024, at 13:21, Chris Egerton  wrote:
> 
> +1 (binding), thanks Omnia!
> 
> On Fri, Apr 12, 2024, 03:46 Mickael Maison  wrote:
> 
>> Hi Omnia,
>> 
>> +1 (binding), thanks for the KIP!
>> 
>> Mickael
>> 
>> On Fri, Apr 12, 2024 at 9:01 AM Omnia Ibrahim 
>> wrote:
>>> 
>>> Hi everyone, I would like to start a voting thread for KIP-1031: Control
>> offset translation in MirrorSourceConnector
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector
>>> 
>>> For comments or feedback please check the discussion thread here
>> https://lists.apache.org/thread/ym6zr0wrhglft5c000x9c8ych098s7h6
>>> 
>>> Thanks
>>> Omnia
>>> 
>> 



Re: [VOTE] KIP-899: Allow producer and consumer clients to rebootstrap

2024-04-17 Thread Omnia Ibrahim
Hi Ivan, 
Thanks for the KIP this is a very nice feature to have. 
+1(non-binding) 
Omnia
> On 15 Apr 2024, at 14:33, Andrew Schofield  wrote:
> 
> Thanks for the KIP
> 
> +1 (non-binding)
> 
> Andrew
> 
>> On 15 Apr 2024, at 14:16, Chris Egerton  wrote:
>> 
>> Hi Ivan,
>> 
>> Thanks for the KIP. After the recent changes, this LGTM. +1 (binding)
>> 
>> Cheers,
>> 
>> Chris
>> 
>> On Wed, Aug 2, 2023 at 12:15 AM Ivan Yurchenko 
>> wrote:
>> 
>>> Hello,
>>> 
>>> The discussion [1] for KIP-899 [2] has been open for quite some time. I'd
>>> like to put the KIP up for a vote.
>>> 
>>> Best,
>>> Ivan
>>> 
>>> [1] https://lists.apache.org/thread/m0ncbmfxs5m87sszby2jbmtjx2bdpcdl
>>> [2]
>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-899%3A+Allow+producer+and+consumer+clients+to+rebootstrap
>>> 
> 



Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-04-15 Thread Omnia Ibrahim
Hi Claude, 
Thanks for the implementation of the LayeredBloomFilter in apache commons. 

> Define a new configuration option "producer.id.quota.window.count" as
> the number of windows active in window.size.seconds.
What is the different between “producer.id.quota.window.count” and 
producer.id.quota.window.num

> Basically the kip says, if the PID is found in either of the Bloom filters
> then no action is taken
> If the PID is not found then it is added and the quota rating metrics are
> incremented.
> In this case long running PIDs will be counted multiple times.

The PID is considered not encountered if both frames of the window don’t have 
it. If you checked the diagram of for `Caching layer to track active PIDs per 
KafkaPrincipal` you will see that each window will have 2 bloom layers and the 
first created one will be disposed only when we start the next window. Which 
means window2 is starting from the 2nd bloom. Basically the bloom filter in the 
KIP is trying to implement a sliding window pattern. 

>  think the question is not whether or not we have seen a given PID before
> but rather how many unique PIDs did the principal create in the last hour.
> Perhaps more exactly it is: did the Principal create more than X PIDS in
> the last Y time units?
We don’t really care about the count of unique PIDs per user. The KIP is trying 
to follow and build on top of ClientQuotaManager which already have a patter 
for throttling that the producer client is aware of so we don’t need to upgrade 
old clients for brokers to throttle them and they can respect the throttling. 

The pattern for throttling is that we record the activities by incrementing a 
metric sensor and only when we catch `QuotaViolationException` from the quota 
sensor we will be sending a throttleTimeMs to the client. 
For bandwidth throttling for example we increment the sensor by the size of the 
request. For PID the KIP is aiming to call 
`QuotaManagers::producerIdQuotaManager::maybeRecordAndGetThrottleTimeMs` to 
increment by +1 every time we encounter a new PID and if and if 
`Sensor::record` returned `QuotaViolationException` then we will send back to 
the producer the trolling time that the client should wait for before sending a 
new request with a new PID. 
I hope this make sense. 

> This question can be quickly answered by a CPC datasketch [1].  The
> solution would be something like:
> Break the Y time units into a set of Y' smaller partitions (e.g. 60
> 1-minute partitions for an hour).  Create a circular queue of Y' CPC
> datasketches for each principal.  Implement a queue entry selector based on
> the modulus of the system by the resolution of the Y' partitions. On each
> call:
I didn’t evaluate CPC datasketch or any counter solution as I explained above 
the aim is not to build a counter specially the Kafka Sensor can be enough to 
indicate if we are violating the quota or not. 

Thanks 
Omnia 

> On 15 Apr 2024, at 10:35, Claude Warren  wrote:
> 
> After thinking about his KIP over the weekend I think that there is another
> lighter weight approach.
> 
> I think the question is not whether or not we have seen a given PID before
> but rather how many unique PIDs did the principal create in the last hour.
> Perhaps more exactly it is: did the Principal create more than X PIDS in
> the last Y time units?
> 
> This question can be quickly answered by a CPC datasketch [1].  The
> solution would be something like:
> Break the Y time units into a set of Y' smaller partitions (e.g. 60
> 1-minute partitions for an hour).  Create a circular queue of Y' CPC
> datasketches for each principal.  Implement a queue entry selector based on
> the modulus of the system by the resolution of the Y' partitions. On each
> call:
> 
> On queue entry selector change clear the CPC (makes it empty)
> Add the latest PID to the current queue entry.
> Sum up the CPCs and check if the max (or min) estimate of unique counts
> exceeds the limit for the user.
> 
> When the CPC returns a zero estimated count then the principal has gone
> away and the principal/CPC-queue pair can be removed from the tracking
> system.
> 
> I believe that this code solution is smaller and faster than the Bloom
> filter implementation.
> 
> [1] https://datasketches.apache.org/docs/CPC/CPC.html
> 
> 
> 
> On Fri, Apr 12, 2024 at 3:10 PM Claude Warren  wrote:
> 
>> I think there is an issue in the KIP.
>> 
>> Basically the kip says, if the PID is found in either of the Bloom filters
>> then no action is taken
>> If the PID is not found then it is added and the quota rating metrics are
>> incremented.
>> 
>> In this case long running PIDs will be counted multiple times.
>> 
>> Let's assume a 30 minute window with 2 15-minute frames.  So for the first
>> 15 minutes all PIDs are placed in the first Bloom filter and for the 2nd 15
>> minutes all new PIDs are placed in the second bloom filter.  At the 3rd 15
>> minutes the first filter is removed and a new empty one created.
>> 
>> 

[VOTE] KIP-1031: Control offset translation in MirrorSourceConnector

2024-04-12 Thread Omnia Ibrahim
Hi everyone, I would like to start a voting thread for KIP-1031: Control offset 
translation in MirrorSourceConnector 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector

For comments or feedback please check the discussion thread here 
https://lists.apache.org/thread/ym6zr0wrhglft5c000x9c8ych098s7h6

Thanks
Omnia



Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-10 Thread Omnia Ibrahim
 used was
>>> deterministic rather than a race to create the first member. There are
>>> already other uses of the group protocol
>>> such as Kafka Connect, so it’s all a bit confusing even today.
>>> 
>>> It is actually KIP-848 which introduces configurations for group resources
>>> and KIP-932 is just building on
>>> the idea. I think that MM2 will need to sync these configurations. The
>>> question of whether `group.type` is
>>> a sensible configuration I think is separate.
>>> 
>>> Imagine that we do have `group.type` as a group configuration. How would
>>> we end up with groups with
>>> the same ID but different types on the two ends of MM2? Assuming that both
>>> ends have KIP-932 enabled,
>>> either the configuration was not set, and a consumer group was made on one
>>> end while a share group was
>>> made on the other, OR, the configuration was set but its value changed,
>>> and again we get a divergence.
>>> 
>>> I think that on balance, having `group.type` as a configuration does at
>>> least mean there’s a better chance that
>>> the two ends of MM2 do agree on the type of group. I’m happy to consider
>>> other ways to do this better. The
>>> fact that we have different kinds of group in the same namespace is the
>>> tricky thing. I think this was possible
>>> before this KIP, but it’s much more likely now.
>>> 
>>> 
>>> Onto the question of memory. There are several different parts to this,
>>> all of which are distributed across
>>> the cluster.
>>> 
>>> * For the group coordinator, the memory consumption will be affected by
>>> the number of groups,
>>> the number of members and the number of topic-partitions to be assigned to
>>> the members. The
>>> group coordinator is concerned with membership and assignment, so the
>>> memory per topic-partition
>>> will be small.
>>> * For the share coordinator, the memory consumption will be affected by
>>> the number of groups, the
>>> number of topic-partitions being consumed in the group, and the number of
>>> in-flight records, but not
>>> the number of members. We should be talking about no more than kilobytes
>>> per topic-partition.
>>> * For the share-partition leader, the memory consumption will be affected
>>> by the number of share group
>>> members assigned the topic-partition and the number of in-flight records.
>>> Again, we should be talking
>>> about no more than kilobytes per topic-partition.
>>> 
>>> Of these, the factor that is not directly under control is the number of
>>> topic-partitions. The reason is that
>>> I wanted to avoid a situation where the number of partitions in a topic
>>> was increased and suddenly
>>> consumption in a share group hit a limit that was not anticipated.
>>> 
>>> I could introduce a configuration for controlling the number of topics
>>> allowed to be subscribed in a share
>>> group. Personally, I think 1 would be a good starting point.
>>> 
>>> Let me know what you think.
>>> 
>>> Thanks,
>>> Andrew
>>> 
>>> 
>>>> On 2 Apr 2024, at 15:39, Omnia Ibrahim  wrote:
>>>> 
>>>> Hi Andrew,
>>>> Thanks for the KIP it is definitely an interesting read. I have few
>>> questions
>>>> As the KIP proposing extending `AdminClient.incrementalAlterConfigs` to
>>> add an explicit `group.type` what would this means for DR feature in MM2
>>> offering?
>>>> Right now MM2 sync consumer group offsets from source to destination
>>> cluster. And it also offer sync ACLs which contribute to DR feature. Would
>>> this KIP means MM2 needs to also sync the type of groups to destination?
>>>> As `AdminClient.incrementalAlterConfigs` means "when a new group is
>>> created with this name, it must have this type”. What will happened if
>>> clusters on both ends of MM2 has same group id but with different types?
>>>> If this concern is out of the scope we might need to call this out
>>> somewhere in the KIP.
>>>> While the number of share-group and the number of consumers in
>>> share-group is limited by `group.share.max.groups`and
>>> `group.share.max.size` the total number of share-group state records that
>>> might need to be loaded in-memeory has another factor which is the number
>>> of partitions. In cases where group is consuming 

Re: [DISCUSS] KIP-1031: Control offset translation in MirrorSourceConnector

2024-04-08 Thread Omnia Ibrahim
Hi Chris, 
Validation method is a good call. I updated the KIP to state that the 
checkpoint connector will fail if the configs aren’t correct. And updated the 
description of the new config to explain the impact of it on checkpoint 
connector as well. 

If there is no any other feedback from anyone I would like to start the voting 
thread in few days. 
Thanks 
Omnia

> On 8 Apr 2024, at 06:31, Chris Egerton  wrote:
> 
> Hi Omnia,
> 
> Ah, good catch. I think failing to start the checkpoint connector if offset
> syncs are disabled is fine. We'd probably want to do that via the
> Connector::validate [1] method in order to be able to catch invalid configs
> during preflight validation, but it's not necessary to get that specific in
> the KIP (especially since we may add other checks as well).
> 
> [1] -
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/Connector.html#validate(java.util.Map)
> 
> Cheers,
> 
> Chris
> 
> On Thu, Apr 4, 2024 at 8:07 PM Omnia Ibrahim 
> wrote:
> 
>> Thanks Chris for the feedback
>>> 1. It'd be nice to mention that increasing the max offset lag to INT_MAX
>>> could work as a partial workaround for users on existing versions (though
>>> of course this wouldn't prevent creation of the syncs topic).
>> I updated the KIP
>> 
>>> 2. Will it be illegal to disable offset syncs if other features that rely
>>> on offset syncs are explicitly enabled in the connector config? If
>> they're
>>> not explicitly enabled then it should probably be fine to silently
>> disable
>>> them, but I'd be interested in your thoughts.
>> The rest of the features that relays on this is controlled by
>> emit.checkpoints.enabled (enabled by default) and
>> sync.group.offsets.enabled (disabled by default) which are part of
>> MirrorCheckpointConnector config not MirrorSourceConnector, I was thinking
>> that MirrorCheckpointConnector should fail to start if
>> emit.offset-syncs.enabled is disabled while emit.checkpoints.enabled and/or
>> sync.group.offsets.enabled are enabled as no point of creating this
>> connector if the main part is disabled. WDYT?
>> 
>> Thanks
>> Omnia
>> 
>>> On 3 Apr 2024, at 12:45, Chris Egerton  wrote:
>>> 
>>> Hi Omnia,
>>> 
>>> Thanks for the KIP! Two small things come to mind:
>>> 
>>> 1. It'd be nice to mention that increasing the max offset lag to INT_MAX
>>> could work as a partial workaround for users on existing versions (though
>>> of course this wouldn't prevent creation of the syncs topic).
>>> 
>>> 2. Will it be illegal to disable offset syncs if other features that rely
>>> on offset syncs are explicitly enabled in the connector config? If
>> they're
>>> not explicitly enabled then it should probably be fine to silently
>> disable
>>> them, but I'd be interested in your thoughts.
>>> 
>>> Cheers,
>>> 
>>> Chris
>>> 
>>> On Wed, Apr 3, 2024, 20:41 Luke Chen  wrote:
>>> 
>>>> Hi Omnia,
>>>> 
>>>> Thanks for the KIP!
>>>> It LGTM!
>>>> But I'm not an expert of MM2, it would be good to see if there is any
>> other
>>>> comment from MM2 experts.
>>>> 
>>>> Thanks.
>>>> Luke
>>>> 
>>>> On Thu, Mar 14, 2024 at 6:08 PM Omnia Ibrahim 
>>>> wrote:
>>>> 
>>>>> Hi everyone, I would like to start a discussion thread for KIP-1031:
>>>>> Control offset translation in MirrorSourceConnector
>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector
>>>>> 
>>>>> Thanks
>>>>> Omnia
>>>>> 
>>>> 
>> 
>> 



Re: [DISCUSS] KIP-1031: Control offset translation in MirrorSourceConnector

2024-04-04 Thread Omnia Ibrahim
Thanks Chris for the feedback
> 1. It'd be nice to mention that increasing the max offset lag to INT_MAX
> could work as a partial workaround for users on existing versions (though
> of course this wouldn't prevent creation of the syncs topic).
I updated the KIP

> 2. Will it be illegal to disable offset syncs if other features that rely
> on offset syncs are explicitly enabled in the connector config? If they're
> not explicitly enabled then it should probably be fine to silently disable
> them, but I'd be interested in your thoughts.
The rest of the features that relays on this is controlled by 
emit.checkpoints.enabled (enabled by default) and sync.group.offsets.enabled 
(disabled by default) which are part of MirrorCheckpointConnector config not 
MirrorSourceConnector, I was thinking that MirrorCheckpointConnector should 
fail to start if emit.offset-syncs.enabled is disabled while 
emit.checkpoints.enabled and/or sync.group.offsets.enabled are enabled as no 
point of creating this connector if the main part is disabled. WDYT?

Thanks 
Omnia
 
> On 3 Apr 2024, at 12:45, Chris Egerton  wrote:
> 
> Hi Omnia,
> 
> Thanks for the KIP! Two small things come to mind:
> 
> 1. It'd be nice to mention that increasing the max offset lag to INT_MAX
> could work as a partial workaround for users on existing versions (though
> of course this wouldn't prevent creation of the syncs topic).
> 
> 2. Will it be illegal to disable offset syncs if other features that rely
> on offset syncs are explicitly enabled in the connector config? If they're
> not explicitly enabled then it should probably be fine to silently disable
> them, but I'd be interested in your thoughts.
> 
> Cheers,
> 
> Chris
> 
> On Wed, Apr 3, 2024, 20:41 Luke Chen  wrote:
> 
>> Hi Omnia,
>> 
>> Thanks for the KIP!
>> It LGTM!
>> But I'm not an expert of MM2, it would be good to see if there is any other
>> comment from MM2 experts.
>> 
>> Thanks.
>> Luke
>> 
>> On Thu, Mar 14, 2024 at 6:08 PM Omnia Ibrahim 
>> wrote:
>> 
>>> Hi everyone, I would like to start a discussion thread for KIP-1031:
>>> Control offset translation in MirrorSourceConnector
>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector
>>> 
>>> Thanks
>>> Omnia
>>> 
>> 



Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-02 Thread Omnia Ibrahim
Hi Andrew, 
Thanks for the KIP it is definitely an interesting read. I have few questions 
As the KIP proposing extending `AdminClient.incrementalAlterConfigs` to add an 
explicit `group.type` what would this means for DR feature in MM2 offering? 
Right now MM2 sync consumer group offsets from source to destination cluster. 
And it also offer sync ACLs which contribute to DR feature. Would this KIP 
means MM2 needs to also sync the type of groups to destination? 
As `AdminClient.incrementalAlterConfigs` means "when a new group is created 
with this name, it must have this type”. What will happened if clusters on both 
ends of MM2 has same group id but with different types? 
If this concern is out of the scope we might need to call this out somewhere in 
the KIP. 
While the number of share-group and the number of consumers in share-group is 
limited by `group.share.max.groups`and `group.share.max.size` the total number 
of share-group state records that might need to be loaded in-memeory has 
another factor which is the number of partitions. In cases where group is 
consuming from large number of topics with large number of partitions what will 
be the impact on coordinator memory?

Thanks 
Omnia


> On 25 Mar 2024, at 10:23, Andrew Schofield 
>  wrote:
> 
> Hi Justine,
> Thanks for your questions.
> 
> There are several limits in this KIP. With consumer groups, we see problems
> where there are huge numbers of consumer groups, and we also see problems
> when there are huge number of members in a consumer group.
> 
> There’s a limit on the number of members in share group. When the limit is 
> reached,
> additional members are not admitted to the group. The members heartbeat to 
> remain
> in the group and that enables timely expiration.
> 
> There’s also a limit of the number of share groups in a cluster. Initially, 
> this limit has been
> set low. As a result, it would be possible to create sufficient groups to 
> reach the limit,
> and then creating additional groups will fail. It will be possible to delete 
> a share group
> administratively, but share groups do not automatically expire (just like 
> topics do not
> expire and queues in message-queuing systems do not expire).
> 
> The `kafka-console-share-consumer.sh` tool in the KIP defaults the group name 
> to
> “share”. This has two benefits. First, it means that the trivial exploratory 
> use of it running
> multiple concurrent copies will naturally get sharing of the records consumed.
> Second, it means that only one share group is being create, rather than 
> generating another
> group ID for each execution.
> 
> Please do re-read the read-committed section. I’ll grateful for all the 
> thoughtful reviews
> that the community is able to provide. The KIP says that broker-side filtering
> removes the records for aborted transactions. This is obviously quite a 
> difference compared
> with consumers in consumer groups. It think it would also be possible to do 
> it client-side
> but the records fetched from the replica manager are distributed among the 
> consumers,
> and I’m concerned that it would be difficult to distribute the list of 
> aborted transactions
> relevant to the records each consumer receives. I’m considering prototyping 
> client-side
> filtering to see how well it works in practice.
> 
> I am definitely thoughtful about the inter-broker hops in order to persist 
> the share-group
> state. Originally, I did look at writing the state directly into the user’s 
> topic-partitions
> because this means the share-partition leader would be able to write directly.
> This has downsides as documented in the “Rejected Alternatives” section of 
> the KIP.
> 
> We do have opportunities for pipelining and batching which I expect we will 
> exploit
> in order to improve the performance.
> 
> This KIP is only the beginning. I expect a future KIP will address storage of 
> metadata
> in a more performant way.
> 
> Thanks,
> Andrew
> 
>> On 21 Mar 2024, at 15:40, Justine Olshan  
>> wrote:
>> 
>> Thanks Andrew,
>> 
>> That answers some of the questions I have.
>> 
>> With respect to the limits -- how will this be implemented? One issue we
>> saw with producers is "short-lived" producers that send one message and
>> disconnect.
>> Due to how expiration works for producer state, if we have a simple limit
>> for producer IDs, all new producers are blocked until the old ones expire.
>> Will we block new group members as well if we reach our limit?
>> 
>> In the consumer case, we have a heartbeat which can be used for expiration
>> behavior and avoid the headache we see on the producer side, but I can
>> imagine a case where misuse of the groups themselves could occur -- ie
>> creating a short lived share group that I believe will take some time to
>> expire. Do we have considerations for this case?
>> 
>> I also plan to re-read the read-committed section and may have further
>> questions there.
>> 
>> You also mentioned in the KIP how there are a few 

Re: [VOTE] KIP-981: Manage Connect topics with custom implementation of Admin

2024-03-27 Thread Omnia Ibrahim
hat might need this is only Stream for the 
initialising `InternalTopicManager` which create internal topics for the 
topology. 
These 3 APIs are bypassing disabling “auto.create.topics.enable” by using admin 
client which for people who run Kafka ecosystem this doesn’t always make sense 
as most of the time they disable “auto.create.topics.enable” because they have 
some sort of federation/provision/capacity planning layer to control the 
clusters. 

Clients like consumer/producer never needed AdminClient to create/alter their 
resources and as far as I know no one requested this feature in the community. 
And I don’t see the need for these clients to have it. It’s mostly frameworks 
that need these type of power.

> This KIP appearing as a follow-up to KIP-787 is evidence that the
> problem is more general than the proposed solution.
As KIP-787 is built based on the idea that having a class delegate resource 
management to AdminClient isn’t new to Kafka what would your suggestion means 
for this KIP which was voted and approved long time ago?

> At this time I'm -1 for this proposal. I'm happy to discuss this more
> in the DISCUSS thread.
I would keep it here as this will help others to decide how to vote forward. 

Thanks 
Omnia

> On 14 Mar 2024, at 19:36, Greg Harris  wrote:
> 
> Hi Omnia,
> 
> Thanks for the KIP.
> 
> I don't think we can adapt the ForwardingAdmin as-is for use as a
> first-class Connect plugin.
> 1. It doesn't have a default constructor, and so can't be included in
> the existing plugin discovery mechanisms.
> 2. It doesn't implement Versioned, and so won't have version
> information exposed in the REST API
> 
> I also don't think that we should make the ForwardingAdmin a
> second-class Connect plugin.
> 1. Having some plugins but not others benefit from classloader
> isolation would be a "gotcha" for anyone familiar with existing
> Connect plugins
> 2. Some future implementations may have a use-case for classloader
> isolation (such as depending on their own HTTP/json library) and
> retrofitting isolation would be more complicated than including it
> initially.
> 
> I also have concerns about the complexity of the implementation as a
> superclass instead of an interface, especially when considering the
> evolution of the Admin interface.
> 
> I don't think the original proposal included the rejected alternative
> of having the existing AdminClient talk to the federation layer, which
> could implement a Kafka-compatible endpoint.
> If a federation layer needs to intercept the Admin client behaviors,
> It sounds more reasonable for that to be addressed for all Admin
> clients at the network boundary rather than one-by-one updating the
> Java APIs to use this new plugin.
> This KIP appearing as a follow-up to KIP-787 is evidence that the
> problem is more general than the proposed solution.
> 
> At this time I'm -1 for this proposal. I'm happy to discuss this more
> in the DISCUSS thread.
> 
> Thanks,
> Greg
> 
> On Thu, Mar 14, 2024 at 11:07 AM Mickael Maison
>  wrote:
>> 
>> Hi Omnia,
>> 
>> +1 (binding), thanks for the KIP
>> 
>> Mickael
>> 
>> On Tue, Mar 5, 2024 at 10:46 AM Omnia Ibrahim  
>> wrote:
>>> 
>>> Hi everyone, I would like to start the vote on KIP-981: Manage Connect 
>>> topics with custom implementation of Admin 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-981%3A+Manage+Connect+topics+with+custom+implementation+of+Admin
>>> 
>>> Thanks
>>> Omnia



[DISCUSS] KIP-1031: Control offset translation in MirrorSourceConnector

2024-03-14 Thread Omnia Ibrahim
Hi everyone, I would like to start a discussion thread for KIP-1031:
Control offset translation in MirrorSourceConnector
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector

Thanks
Omnia


Re: [VOTE] KIP-939: Support Participation in 2PC

2024-03-13 Thread Omnia Ibrahim
I had a look at the discussion thread and the KIP looks exciting.
+1 non-binding

Best
Omnia

On 1 Dec 2023, at 19:06, Artem Livshits 
wrote:

Hello,

This is a voting thread for
https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
.

The KIP proposes extending Kafka transaction support (that already uses 2PC
under the hood) to enable atomicity of dual writes to Kafka and an external
database, and helps to fix a long standing Flink issue.

An example of code that uses the dual write recipe with JDBC and should
work for most SQL databases is here
https://github.com/apache/kafka/pull/14231.

The FLIP for the sister fix in Flink is here
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710

-Artem


[VOTE] KIP-981: Manage Connect topics with custom implementation of Admin

2024-03-05 Thread Omnia Ibrahim
Hi everyone, I would like to start the vote on KIP-981: Manage Connect topics 
with custom implementation of Admin 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-981%3A+Manage+Connect+topics+with+custom+implementation+of+Admin
 

Thanks
Omnia

[jira] [Created] (KAFKA-16254) Allow MM2 to fully disable offset sync feature

2024-02-14 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-16254:
-

 Summary: Allow MM2 to fully disable offset sync feature
 Key: KAFKA-16254
 URL: https://issues.apache.org/jira/browse/KAFKA-16254
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.6.0, 3.5.0, 3.7.0
Reporter: Omnia Ibrahim
Assignee: Omnia Ibrahim


*Background:* 
At the moment syncing offsets feature in MM2 is broken to 2 parts
 # One is in `MirrorSourceTask` where we store the new recored's offset on 
target cluster to {{offset_syncs}} internal topic after mirroring the record. 
Before KAFKA-14610 in 3.5 MM2 used to just queue the offsets and publish them 
later but since 3.5 this behaviour changed we now publish any offset syncs that 
we've queued up, but have not yet been able to publish when 
`MirrorSourceTask.commit` get invoked. This introduced an over head to commit 
process.
 # The second part is in checkpoints source task where we use the new record 
offsets from {{offset_syncs}} and update {{checkpoints}} and 
{{__consumer_offsets}} topics.

*Problem:*
For customers who only use MM2 for mirroring data and not interested in syncing 
offsets feature they now can disable the second part of this feature which is 
by disabling {{emit.checkpoints.enabled}} and/or {{sync.group.offsets.enabled}} 
to disable emitting {{__consumer_offsets}} topic but nothing disabling 1st part 
of the feature. 

The problem get worse if they disabled MM2 from creating offset syncs internal 
topic as 
1. this will increase throughput as MM2 will try to force trying to update the 
offset with every mirrored batch which impacting the performance of our MM2.
2. Get too many error logs because they don't create the sync offset topic as 
they don't use the feature.

*Possible solution:*
Allow customers to fully disable the feature if they don't really need it 
similar to how we fully can disable other MM2 features like heartbeat feature 
by adding a new config.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-981: Manage Connect topics with custom implementation of Admin

2024-02-09 Thread Omnia Ibrahim
Hi everyone, I'm still keen on implementing this. I'll start a vote if I
don't hear back in the next few days

Thanks
Omnia

On Mon, Jan 15, 2024 at 8:12 PM Omnia Ibrahim 
wrote:

> Hi It has been a while! but can I have a feedback on this. It would be
> nice to unify this between MM2 and Connect as well.
>
> Thanks
>
> On Thu, Oct 19, 2023 at 3:14 PM Omnia Ibrahim 
> wrote:
>
>> Hi, any thoughts on this kip?
>>
>> Thanks
>>
>> On Tue, Sep 19, 2023 at 6:04 PM Omnia Ibrahim 
>> wrote:
>>
>>> Hi everyone,
>>> I want to start the discussion of the KIP-981 to extend Connect to use
>>> org.apache.kafka.clients.admin.ForwardingAdminClient instead of
>>> KafkaAdminClient 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-981%3A+Manage+Connect+topics+with+custom+implementation+of+Admin
>>>
>>>
>>> Thanks for your time and feedback
>>> Omnia
>>>
>>


[jira] [Resolved] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-02-09 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim resolved KAFKA-16162.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> New created topics are unavailable after upgrading to 3.7
> -
>
> Key: KAFKA-16162
> URL: https://issues.apache.org/jira/browse/KAFKA-16162
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
>
> In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
> request will include the `LogDirs` fields with UUID for each log dir in each 
> broker. This info will be stored in the controller and used to identify if 
> the log dir is known and online while handling AssignReplicasToDirsRequest 
> [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
>  
> While upgrading from old version, the kafka cluster will run in 3.7 binary 
> with old metadata version, and then upgrade to newer version using 
> kafka-features.sh. That means, while brokers startup and send the 
> brokerRegistration request, it'll be using older metadata version without 
> `LogDirs` fields included. And it makes the controller has no log dir info 
> for all brokers. Later, after upgraded, if new topic is created, the flow 
> will go like this:
> 1. Controller assign replicas and adds in metadata log
> 2. brokers fetch the metadata and apply it
> 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
> 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica 
> assignment, controller will think the log dir in current replica is offline, 
> so triggering offline handler, and reassign leader to another replica, and 
> offline, until no more replicas to assign, so assigning leader to -1 (i.e. no 
> leader) 
> So, the results will be that new created topics are unavailable (with no 
> leader) because the controller thinks all log dir are offline.
> {code:java}
> lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
> quickstart-events3 --bootstrap-server localhost:9092  
> 
> Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
> 3   ReplicationFactor: 3Configs: segment.bytes=1073741824
>   Topic: quickstart-events3   Partition: 0Leader: none
> Replicas: 7,2,6 Isr: 6
>   Topic: quickstart-events3   Partition: 1Leader: none
> Replicas: 2,6,7 Isr: 6
>   Topic: quickstart-events3   Partition: 2Leader: none
> Replicas: 6,7,2 Isr: 6
> {code}
> The log snippet in the controller :
> {code:java}
> # handling 1st assignReplicaToDirs request
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] 
> offline-dir-assignment: changing partition(s): quickstart-events3-0, 
> quickstart-events3-2, quickstart-events3-1 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, AA] -> 
> [7K5JBERyyqFFxIXSXYluJA, AA, AA], 
> partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
> change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
> isr=null, leader=-2, replicas=null, removingReplicas=null, 
> addingReplicas=null, leaderRecoveryState=-1, 
> directories=[7K5JBERyyqFFxIXSXYluJA, AA, 
> AA], eligibleLeaderReplicas=null, lastKnownELR=null) for 
> topic quickstart-events3 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18

[jira] [Resolved] (KAFKA-14616) Topic recreation with offline broker causes permanent URPs

2024-02-09 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim resolved KAFKA-14616.
---
Fix Version/s: 3.7.0
 Assignee: Colin McCabe
   Resolution: Fixed

> Topic recreation with offline broker causes permanent URPs
> --
>
> Key: KAFKA-14616
> URL: https://issues.apache.org/jira/browse/KAFKA-14616
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>    Reporter: Omnia Ibrahim
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.7.0
>
>
> We are facing an odd situation when we delete and recreate a topic while 
> broker is offline in KRAFT mode. 
> Here’s what we saw step by step
>  # Created topic {{foo.test}} with 10 partitions and 4 replicas — Topic 
> {{foo.test}} was created with topic ID {{MfuZbwdmSMaiSa0g6__TPg}}
>  # Took broker 4 offline — which held replicas for partitions  {{0, 3, 4, 5, 
> 7, 8, 9}}
>  # Deleted topic {{foo.test}} — The deletion process was successful, despite 
> the fact that broker 4 still held replicas for partitions {{0, 3, 4, 5, 7, 8, 
> 9}} on local disk.
>  # Recreated topic {{foo.test}} with 10 partitions and 4 replicas. — Topic 
> {{foo.test}} was created with topic ID {{RzalpqQ9Q7ub2M2afHxY4Q}} and 
> partitions {{0, 1, 2, 7, 8, 9}} got assigned to broker 4 (which was still 
> offline). Notice here that partitions {{0, 7, 8, 9}} are common between the 
> assignment of the deleted topic ({{{}topic_id: MfuZbwdmSMaiSa0g6__TPg{}}}) 
> and the recreated topic ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}}).
>  # Brough broker 4 back online.
>  # Broker started to create new partition replicas for the recreated topic 
> {{foo.test}} ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}})
>  # The broker hit the following error {{Tried to assign topic ID 
> RzalpqQ9Q7ub2M2afHxY4Q to log for topic partition foo.test-9,but log already 
> contained topic ID MfuZbwdmSMaiSa0g6__TPg}} . As a result of this error the 
> broker decided to rename log dir for partitions {{0, 3, 4, 5, 7, 8, 9}} to 
> {{{}-.-delete{}}}.
>  # Ran {{ls }}
> {code:java}
> foo.test-0.658f87fb9a2e42a590b5d7dcc28862b5-delete/
> foo.test-1/
> foo.test-2/
> foo.test-3.a68f05d05bcc4e579087551b539af311-delete/
> foo.test-4.79ce30a5310d4950ad1b28f226f74895-delete/
> foo.test-5.76ed04da75bf46c3a63342be1eb44450-delete/
> foo.test-6/
> foo.test-7.c2d33db3bf844e9ebbcd9ef22f5270da-delete/
> foo.test-8.33836969ac714b41b69b5334a5068ce0-delete/
> foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/{code}
>       9. Waited until the deletion of the old topic was done and ran {{ls 
> }} again, now we were expecting to see log dir for partitions 
> {{0, 1, 2, 7, 8, 9}} however the result is:
> {code:java}
> foo.test-1/
> foo.test-2/
> foo.test-6/{code}
>      10. Ran {{kafka-topics.sh --command-config cmd.properties 
> --bootstrap-server  --describe --topic foo.test}}
> {code:java}
> Topic: foo.test TopicId: RzalpqQ9Q7ub2M2afHxY4Q PartitionCount: 10 
> ReplicationFactor: 4 Configs: 
> min.insync.replicas=2,segment.bytes=1073741824,max.message.bytes=3145728,unclean.leader.election.enable=false,retention.bytes=10
> Topic: foo.test Partition: 0 Leader: 2 Replicas: 2,3,4,5 Isr: 2,3,5
> Topic: foo.test Partition: 1 Leader: 3 Replicas: 3,4,5,6 Isr: 3,5,6,4
> Topic: foo.test Partition: 2 Leader: 5 Replicas: 5,4,6,1 Isr: 5,6,1,4
> Topic: foo.test Partition: 3 Leader: 5 Replicas: 5,6,1,2 Isr: 5,6,1,2
> Topic: foo.test Partition: 4 Leader: 6 Replicas: 6,1,2,3 Isr: 6,1,2,3
> Topic: foo.test Partition: 5 Leader: 1 Replicas: 1,6,2,5 Isr: 1,6,2,5
> Topic: foo.test Partition: 6 Leader: 6 Replicas: 6,2,5,4 Isr: 6,2,5,4
> Topic: foo.test Partition: 7 Leader: 2 Replicas: 2,5,4,3 Isr: 2,5,3
> Topic: foo.test Partition: 8 Leader: 5 Replicas: 5,4,3,1 Isr: 5,3,1
> Topic: foo.test Partition: 9 Leader: 3 Replicas: 3,4,1,6 Isr: 3,1,6{code}
> Here’s a sample of broker logs
>  
> {code:java}
> {"timestamp":"2023-01-11T15:19:53,620Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
>  log for partition foo.test-9 in 
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete.","logger":"kafka.log.LogManager"}
> {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
>  time index 
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/

Re: [DISCUSS] KIP-1016 Make MM2 heartbeats topic name configurable

2024-02-02 Thread Omnia Ibrahim
I noticed you have something addressed in the rejected alternatives in your KIP 
regarding KIP-690, however, as the KIP is implementing one of the previous 
rejected alternatives for KIP-690 I think it would be nice to update the 
motivation section in the new KIP with some clarification for why what was 
mentioned in the rejected alternatives in KIP-690 isn’t valid anymore.

> On 2 Feb 2024, at 15:23, Omnia Ibrahim  wrote:
> 
> Hi 
> Thanks for the KIP. I don’t know if you had a look before into the rejection 
> alternatives in KIP-690 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention#KIP690:AddadditionalconfigurationtocontrolMirrorMaker2internaltopicsnamingconvention-RejectedAlternatives
>  or not but one of the things we were worried about is the amount of 
> configuration MM2 already has at the moment specially for advanced and 
> complicated replication between multiple clusters. This why we bin-backing on 
> the existing of replication policy configs to allow users to define their own 
> replication policy with their customised internal topics. 
> 
> Is the assumption now different meaning that we are okay to add more configs 
> for mm2? If so can you mention this in the KIP as it was rejected before in 
> another KIP
> 
> Also wouldn’t this mean that we now have two different ways to set the 
> `heartbeat` topic name one by this config and another one by overriding the 
> replication policy class? It maybe worth mentioning in your KIP why you are 
> adding this other path to configure heartbeat topic with single config 
> instead of the existing route which providing replication policy with the new 
> names of internal topics. And when users should opt-in for each one of them.
> 
> Thanks
> 
>> On 22 Jan 2024, at 19:52, Chris Egerton  wrote:
>> 
>> Hi Berci,
>> 
>> Thanks for the KIP!
>> 
>> IMO we don't need the "default." prefix for the new property, and it
>> deviates a bit from the precedent set by properties like
>> "replication.policy.internal.topic.separator.enabled". I think we can just
>> call it "replication.policy.heartbeats.topic", or if we really want to be
>> precise, "replication.policy.heartbeats.topic.name".
>> 
>> Regarding multiple source->target pairs, won't we get support for this for
>> free if we add the new property to the DefaultReplicationPolicy class? IIRC
>> it's already possible to configure replication policies on a
>> per-replication-flow basis with that syntax, I don't see why this wouldn't
>> be the case for the new property.
>> 
>> I'm also a little hazy on the motivation for the change. Just out of
>> curiosity, what exactly is meant by "the "heartbeats" topics of other
>> systems" in the Jira ticket's description? Are we trying to better
>> accommodate cases where other harder-to-configure systems (like a picky
>> source connector, for example) create and use a "heartbeats" topic, or are
>> we trying to enable multiple MM2 heartbeat connectors to target the same
>> Kafka cluster? I can understand the former as a niche but possible scenario
>> and one that we can make room for, but the latter is a bit harder to
>> justify short of, e.g., fine-tuning the heartbeat emission interval based
>> on the eventual target of the replication flow that will be reading from
>> the heartbeats topic.
>> 
>> I don't raise the above to cast doubt on the KIP, really I'm just curious
>> about how people are using MM2.
>> 
>> Cheers,
>> 
>> Chris
>> 
>> On Thu, Jan 18, 2024 at 6:11 AM Kondrát Bertalan  wrote:
>> 
>>> Hi Viktor,
>>> 
>>> Let me address your points one by one.
>>> 
>>>   1. The current implementation does not support the source->target pair
>>>   based configuration, it is global.
>>>   2. Yes, I introduced that property both in the client and in the
>>>   connectors
>>>   3. This is a great idea, I am going to do that, and also I tried to
>>>   construct the property name in a way that makes this clear for the
>>> users: '
>>>   default.replication.policy.heartbeats.topic.name'
>>>   4. Yeah, that was my impression too.
>>> 
>>> Thanks,
>>> Berci
>>> 
>>> On Wed, Jan 17, 2024 at 4:51 PM Viktor Somogyi-Vass
>>>  wrote:
>>> 
>>>> Hi Bertalan,
>>>> 
>>>> Thanks for creating this KIP.
>>>> A couple of observations/questions:
>>>> 1. If I have multiple sourc

Re: [DISCUSS] KIP-1016 Make MM2 heartbeats topic name configurable

2024-02-02 Thread Omnia Ibrahim
Hi 
Thanks for the KIP. I don’t know if you had a look before into the rejection 
alternatives in KIP-690 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention#KIP690:AddadditionalconfigurationtocontrolMirrorMaker2internaltopicsnamingconvention-RejectedAlternatives
 or not but one of the things we were worried about is the amount of 
configuration MM2 already has at the moment specially for advanced and 
complicated replication between multiple clusters. This why we bin-backing on 
the existing of replication policy configs to allow users to define their own 
replication policy with their customised internal topics. 

Is the assumption now different meaning that we are okay to add more configs 
for mm2? If so can you mention this in the KIP as it was rejected before in 
another KIP

Also wouldn’t this mean that we now have two different ways to set the 
`heartbeat` topic name one by this config and another one by overriding the 
replication policy class? It maybe worth mentioning in your KIP why you are 
adding this other path to configure heartbeat topic with single config instead 
of the existing route which providing replication policy with the new names of 
internal topics. And when users should opt-in for each one of them.

Thanks

> On 22 Jan 2024, at 19:52, Chris Egerton  wrote:
> 
> Hi Berci,
> 
> Thanks for the KIP!
> 
> IMO we don't need the "default." prefix for the new property, and it
> deviates a bit from the precedent set by properties like
> "replication.policy.internal.topic.separator.enabled". I think we can just
> call it "replication.policy.heartbeats.topic", or if we really want to be
> precise, "replication.policy.heartbeats.topic.name".
> 
> Regarding multiple source->target pairs, won't we get support for this for
> free if we add the new property to the DefaultReplicationPolicy class? IIRC
> it's already possible to configure replication policies on a
> per-replication-flow basis with that syntax, I don't see why this wouldn't
> be the case for the new property.
> 
> I'm also a little hazy on the motivation for the change. Just out of
> curiosity, what exactly is meant by "the "heartbeats" topics of other
> systems" in the Jira ticket's description? Are we trying to better
> accommodate cases where other harder-to-configure systems (like a picky
> source connector, for example) create and use a "heartbeats" topic, or are
> we trying to enable multiple MM2 heartbeat connectors to target the same
> Kafka cluster? I can understand the former as a niche but possible scenario
> and one that we can make room for, but the latter is a bit harder to
> justify short of, e.g., fine-tuning the heartbeat emission interval based
> on the eventual target of the replication flow that will be reading from
> the heartbeats topic.
> 
> I don't raise the above to cast doubt on the KIP, really I'm just curious
> about how people are using MM2.
> 
> Cheers,
> 
> Chris
> 
> On Thu, Jan 18, 2024 at 6:11 AM Kondrát Bertalan  wrote:
> 
>> Hi Viktor,
>> 
>> Let me address your points one by one.
>> 
>>   1. The current implementation does not support the source->target pair
>>   based configuration, it is global.
>>   2. Yes, I introduced that property both in the client and in the
>>   connectors
>>   3. This is a great idea, I am going to do that, and also I tried to
>>   construct the property name in a way that makes this clear for the
>> users: '
>>   default.replication.policy.heartbeats.topic.name'
>>   4. Yeah, that was my impression too.
>> 
>> Thanks,
>> Berci
>> 
>> On Wed, Jan 17, 2024 at 4:51 PM Viktor Somogyi-Vass
>>  wrote:
>> 
>>> Hi Bertalan,
>>> 
>>> Thanks for creating this KIP.
>>> A couple of observations/questions:
>>> 1. If I have multiple source->target pairs, can I set this property per
>>> cluster by prefixing with "source->target" as many other configs or is it
>>> global?
>>> 2. The replication policy must be set in MirrorClient as well. Is your
>>> change applicable to both MirrorClient and the connectors as well?
>>> 3. It might be worth pointing out (both in the docs and the KIP) that if
>>> the user overrides the replication policy to any other than
>>> DefaultReplicationPolicy, then this config has no effect.
>>> 4. With regards to integration tests, I tend to lean towards that we
>> don't
>>> need them if we can cover this well with unit tests and mocking.
>>> 
>>> Thanks,
>>> Viktor
>>> 
>>> On Wed, Jan 17, 2024 at 12:23 AM Ryanne Dolan 
>>> wrote:
>>> 
 Makes sense to me, +1.
 
 On Tue, Jan 16, 2024 at 5:04 PM Kondrát Bertalan 
 wrote:
 
> Hey Team,
> 
> I would like to start a discussion thread about the *KIP-1016 Make MM2
> heartbeats topic name configurable
> <
> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1016+Make+MM2+heartbeats+topic+name+configurable
>> *
> .
> 
> This KIP aims to make the 

Re: [VOTE] 3.7.0 RC2

2024-01-29 Thread Omnia Ibrahim
Hi Stan and Gaurav, 
Just to clarify some points mentioned here before 
 KAFKA-14616: I raised a year ago so it's not related to JBOD work. It is 
rather a blocker bug for KRAFT in general. The PR from Colin should fix this. 
Am not sure if it is a blocker for 3.7 per-say as it was a major bug since 3.3 
and got missed from all other releases.
 
Regarding the JBOD's work: 
KAFKA-16082:  Is not a blocker for 3.7 instead it's nice fix. The pr 
https://github.com/apache/kafka/pull/15136 is quite a small one and was 
approved by Proven and I but it is waiting for a committer's approval.
KAFKA-16162: This is a blocker for 3.7.  Same it’s a small pr 
https://github.com/apache/kafka/pull/15270 and it is approved Proven and I and 
the PR is waiting for committer's approval. 
KAFKA-16157: This is a blocker for 3.7. There is one small suggestion for the 
pr https://github.com/apache/kafka/pull/15263 but I don't think any of the 
current feedback is blocking the pr from getting approved. Assuming we get a 
committer's approval on it. 
KAFKA-16195:  Same it's a blocker but it has approval from Proven and I and we 
are waiting for committer's approval on the pr 
https://github.com/apache/kafka/pull/15262. 

If we can’t get a committer approval for KAFKA-16162, KAFKA-16157 and 
KAFKA-16195  in time for 3.7 then we can mark JBOD as early release assuming we 
merge at least KAFKA-16195.

Regards, 
Omnia

> On 26 Jan 2024, at 15:39, ka...@gnarula.com wrote:
> 
> Apologies, I duplicated KAFKA-16157 twice in my previous message. I intended 
> to mention KAFKA-16195
> with the PR at https://github.com/apache/kafka/pull/15262 as the second JIRA.
> 
> Thanks,
> Gaurav
> 
>> On 26 Jan 2024, at 15:34, ka...@gnarula.com wrote:
>> 
>> Hi Stan,
>> 
>> I wanted to share some updates about the bugs you shared earlier.
>> 
>> - KAFKA-14616: I've reviewed and tested the PR from Colin and have observed
>> the fix works as intended.
>> - KAFKA-16162: I reviewed Proven's PR and found some gaps in the proposed 
>> fix. I've
>> therefore raised https://github.com/apache/kafka/pull/15270 following a 
>> discussion with Luke in JIRA.
>> - KAFKA-16082: I don't think this is marked as a blocker anymore. I'm 
>> awaiting
>> feedback/reviews at https://github.com/apache/kafka/pull/15136
>> 
>> In addition to the above, there are 2 JIRAs I'd like to bring everyone's 
>> attention to:
>> 
>> - KAFKA-16157: This is similar to KAFKA-14616 and is marked as a blocker. 
>> I've raised
>> https://github.com/apache/kafka/pull/15263 and am awaiting reviews on it.
>> - KAFKA-16157: I raised this yesterday and have addressed feedback from 
>> Luke. This should
>> hopefully get merged soon.
>> 
>> Regards,
>> Gaurav
>> 
>> 
>>> On 24 Jan 2024, at 11:51, ka...@gnarula.com wrote:
>>> 
>>> Hi Stanislav,
>>> 
>>> Thanks for bringing these JIRAs/PRs up.
>>> 
>>> I'll be testing the open PRs for KAFKA-14616 and KAFKA-16162 this week and 
>>> I hope to have some feedback
>>> by Friday. I gather the latter JIRA is marked as a WIP by Proven and he's 
>>> away. I'll try to build on his work in the meantime.
>>> 
>>> As for KAFKA-16082, we haven't been able to deduce a data loss scenario. 
>>> There's a PR open
>>> by me for promoting an abandoned future replica with approvals from Omnia 
>>> and Proven,
>>> so I'd appreciate a committer reviewing it.
>>> 
>>> Regards,
>>> Gaurav
>>> 
>>> On 23 Jan 2024, at 20:17, Stanislav Kozlovski 
>>>  wrote:
 
 Hey all, I figured I'd give an update about what known blockers we have
 right now:
 
 - KAFKA-16101: KRaft migration rollback documentation is incorrect -
 https://github.com/apache/kafka/pull/15193; This need not block RC
 creation, but we need the docs updated so that people can test properly
 - KAFKA-14616: Topic recreation with offline broker causes permanent URPs -
 https://github.com/apache/kafka/pull/15230 ; I am of the understanding that
 this is blocking JBOD for 3.7
 - KAFKA-16162: New created topics are unavailable after upgrading to 3.7 -
 a strict blocker with an open PR https://github.com/apache/kafka/pull/15232
 - although I understand Proveen is out of office
 - KAFKA-16082: JBOD: Possible dataloss when moving leader partition - I am
 hearing mixed opinions on whether this is a blocker (
 https://github.com/apache/kafka/pull/15136)
 
 Given that there are 3 JBOD blocker bugs, and I am not confident they will
 all be merged this week - I am on the edge of voting to revert JBOD from
 this release, or mark it early access.
 
 By all accounts, it seems that if we keep with JBOD the release will have
 to spill into February, which is a month extra from the time-based release
 plan we had of start of January.
 
 Can I ask others for an opinion?
 
 Best,
 Stan
 
 On Thu, Jan 18, 2024 at 1:21 PM Luke Chen  wrote:
 
> Hi all,
> 
> I think I've found another blocker issue: 

Re: [DISCUSS] KIP-981: Manage Connect topics with custom implementation of Admin

2024-01-15 Thread Omnia Ibrahim
Hi It has been a while! but can I have a feedback on this. It would be nice
to unify this between MM2 and Connect as well.

Thanks

On Thu, Oct 19, 2023 at 3:14 PM Omnia Ibrahim 
wrote:

> Hi, any thoughts on this kip?
>
> Thanks
>
> On Tue, Sep 19, 2023 at 6:04 PM Omnia Ibrahim 
> wrote:
>
>> Hi everyone,
>> I want to start the discussion of the KIP-981 to extend Connect to use
>> org.apache.kafka.clients.admin.ForwardingAdminClient instead of
>> KafkaAdminClient 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-981%3A+Manage+Connect+topics+with+custom+implementation+of+Admin
>>
>>
>> Thanks for your time and feedback
>> Omnia
>>
>


[jira] [Resolved] (KAFKA-15858) Broker stays fenced until all assignments are correct

2023-12-07 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim resolved KAFKA-15858.
---
Resolution: Won't Fix

`BrokerHeartbeatManager.calculateNextBrokerState` already keeps the broker 
fenced (even if the broker asked to be unfenced) if it didn't catch up with the 
metadata.

> Broker stays fenced until all assignments are correct
> -
>
> Key: KAFKA-15858
> URL: https://issues.apache.org/jira/browse/KAFKA-15858
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>    Assignee: Omnia Ibrahim
>Priority: Major
>
> Until there the broker has caught up with metadata AND corrected any 
> incorrect directory assignments, it should continue to want to stay fenced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15895) Move DynamicBrokerConfig to server module

2023-11-25 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-15895:
-

 Summary: Move DynamicBrokerConfig to server module
 Key: KAFKA-15895
 URL: https://issues.apache.org/jira/browse/KAFKA-15895
 Project: Kafka
  Issue Type: Sub-task
Reporter: Omnia Ibrahim
Assignee: Omnia Ibrahim






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-981: Manage Connect topics with custom implementation of Admin

2023-10-19 Thread Omnia Ibrahim
Hi, any thoughts on this kip?

Thanks

On Tue, Sep 19, 2023 at 6:04 PM Omnia Ibrahim 
wrote:

> Hi everyone,
> I want to start the discussion of the KIP-981 to extend Connect to use
> org.apache.kafka.clients.admin.ForwardingAdminClient instead of
> KafkaAdminClient 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-981%3A+Manage+Connect+topics+with+custom+implementation+of+Admin
>
>
> Thanks for your time and feedback
> Omnia
>


[DISCUSS] KIP-981: Manage Connect topics with custom implementation of Admin

2023-09-19 Thread Omnia Ibrahim
Hi everyone,
I want to start the discussion of the KIP-981 to extend Connect to use
org.apache.kafka.clients.admin.ForwardingAdminClient instead of
KafkaAdminClient
https://cwiki.apache.org/confluence/display/KAFKA/KIP-981%3A+Manage+Connect+topics+with+custom+implementation+of+Admin


Thanks for your time and feedback
Omnia


[jira] [Created] (KAFKA-15478) Update connect to use ForwardingAdmin

2023-09-19 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-15478:
-

 Summary: Update connect to use ForwardingAdmin
 Key: KAFKA-15478
 URL: https://issues.apache.org/jira/browse/KAFKA-15478
 Project: Kafka
  Issue Type: New Feature
Reporter: Omnia Ibrahim


Connect uses AdminClients to create topics; while this simplifies the 
implementation of Connect it has the following problems 
 * It assumes that whoever runs Connect must have admin access to both source 
and destination clusters. This assumption is not necessarily valid all the time.
 * It creates conflict in use-cases where centralised systems or tools manage 
Kafka resources. 

It would be easier if customers could provide how they want to manage Kafka 
topics through admin client or using their centralised system or tools. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-936: Throttle number of active PIDs

2023-08-30 Thread Omnia Ibrahim
so there is no wasted space.
>
> Maybe am missing something here but I can't find anything in the
> > `LayerManager` code that point to how often will the eviction function
> > runs. Do you mean that the eviction function runs every minute? If so,
> can
> > we control this?
> >
>
> The LayerManager.Builder has a setCleanup() method.  That method is run
> whenever a new layer is added to the filter.  This means that you can use
> whatever process you want to delete old filters (including none:
> LayerManager.Cleanup.noCleanup()).  The LayeredBloomFilterTest is an
> example of advancing by time (the one minute intervals) and cleaning by
> time (1 hr).  It also creates a TimestampedBloomFilter to track the time.
> If we need an explicit mechanism to remove filters from the LayerManager we
> can probably add one.
>
> I hope this answers your questions.
>
> I am currently working on getting layered Bloom filters added to commons.
> A recent change set the stage for this change so it should be in soon.
>
> I look forward to hearing from you,
> Claude
>
> [1] https://hur.st/bloomfilter/?n=1000=0.1==
>
> On Sun, Jul 16, 2023 at 1:00 PM Omnia Ibrahim 
> wrote:
>
> > Thanks Claude for the feedback and the raising this implementation to
> > Apache commons-collections.
> > I had a look into your layered bloom filter and at first glance, I think
> it
> > would be a better improvement, however, regarding the following
> suggestion
> >
> > > By hashing the principal and PID into the filter as a single hash only
> > one Bloom filter is required.
> >
> > I am not sure how this will work when we have different producer-id-rate
> > for different KafkaPrincipal as proposed in the KIP.
> > For example `userA` had producer-id-rate of 1000 per hour while `user2`
> has
> > a quota of 100 producer ids per hour. How will we configure the max
> entries
> > for the Shape?
> >
> > The only thing that comes to my mind to maintain this desired behavior in
> > the KIP is to NOT hash PID with KafkaPrincipal and keep a
> > Map
> > then each one of these bloom filters is controlled with
> > `Shape(, 0.1)`.
> >
> > Does that make sense? WDYT?
> >
> > Also regarding the eviction function
> > > (evict function)
> > > The evict function will determine if it has been a minute since the
> last
> > > time we created a new layer, if so create a new layer and evict all
> > layers
> > > older than 1 hour.  Since the layers are a time ordered list this is
> > simply
> > > removing the elderly layers from the front of the list.
> >
> > Maybe am missing something here but I can't find anything in the
> > `LayerManager` code that point to how often will the eviction function
> > runs. Do you mean that the eviction function runs every minute? If so,
> can
> > we control this?
> >
> > Cheers,
> > Omnia
> >
> > On Wed, Jun 21, 2023 at 11:43 AM Claude Warren  wrote:
> >
> > > I think that the either using a Stable bloom filter or a Layered bloom
> > > filter constructed as follows:
> > >
> > >
> > >- Each layer is configured for the maximum number of principal-PID
> > pairs
> > >expected in a single minute.
> > >- Expect 60 layers (one for each minute)
> > >- If the layer becomes fully populated add another layer.
> > >- When the time to insert into the current layer expires, remove the
> > >layers that are older than an hour.
> > >
> > > This will provide a sliding window of one hour with the ability to
> handle
> > > bursts above the expected rate of inserts without additional false
> > > positives.
> > >
> > > By hashing the principal and PID into the filter as a single hash only
> > one
> > > Bloom filter is required.
> > >
> > > The code I pointed to earlier uses the common-collections4 Bloom filter
> > > implementation.  So a rough pseudo code for the implementation is:
> > >
> > > Shape shap = Shape.fromNP( 1000, 0.1 ); // 1000 entries, 0.1 false
> > positive
> > > rate
> > > LayeredBloomFilter filter = LayeredBloomFilter( shape, 60, evictFunc );
> > //
> > > 60 initial layers, eviction function.
> > >
> > > (on PID)
> > >
> > > long[2] buff = Murmur3Hash.hash128x64( String.format("%s%s", principal,
> > PID
> > > ).getBytes(java.nio.charset.Charset.UTF8));
> > > Hasher hasher = new EnhancedDoubleHasher( buff[0], buff[1] );
> > >

Re: [VOTE] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-08-02 Thread Omnia Ibrahim
Thanks, Mickael.
Am closing the voting now with 3 binding votes (from Chris, Greg and
Mickale) and 1 non-binding (from Federico).

On Wed, Aug 2, 2023 at 2:52 PM Mickael Maison 
wrote:

> +1 (binding)
>
> Thanks for the KIP
>
> On Tue, Aug 1, 2023 at 1:26 PM Omnia Ibrahim 
> wrote:
> >
> > Thanks for the binding vote, Greg, We now need one extra binding vote to
> > get this KIP accepted.
> >
> > On Tue, Jul 25, 2023 at 8:10 PM Greg Harris  >
> > wrote:
> >
> > > Hey Omnia,
> > >
> > > Thanks for the KIP!
> > >
> > > I think that MM2 is responsible for providing an upgrade path for
> > > users, even if it isn't backwards-compatible by default due to a
> > > mistake.
> > > The non-configuration-based strategies I could think of aren't viable
> > > due to the danger of inferring the incorrect topic name, and inherent
> > > complexity which makes them hard to backport.
> > > I also support the decision to backport this to 3.1 - 3.5, so that MM2
> > > users can upgrade in minor version increments after those patch
> > > releases go out.
> > >
> > > I'm +1 (binding).
> > >
> > > Thanks,
> > > Greg
> > >
> > > On Mon, Jul 24, 2023 at 7:21 AM Omnia Ibrahim  >
> > > wrote:
> > > >
> > > > Hi Chris, I updated the KIP to address your feedback. Thanks for the
> > > vote.
> > > >
> > > > On Mon, Jul 24, 2023 at 1:30 PM Chris Egerton
> 
> > > > wrote:
> > > >
> > > > > Hi Omnia,
> > > > >
> > > > > I think there's a few clarifications that should still be made on
> the
> > > KIP,
> > > > > but assuming these are agreeable, I'm +1 (binding)
> > > > >
> > > > > - In the description for the
> > > > > replication.policy.internal.topic.separator.enabled property (in
> the
> > > > > "Public Interfaces" section), we should specify that it affects
> only
> > > the
> > > > > checkpoints and offset syncs topic
> > > > > - We can remove the code snippet from the "Proposed Changes"
> section
> > > (right
> > > > > now it's a little buggy; there's two different implementations for
> the
> > > same
> > > > > "internalSuffix" method, and there are references to an
> > > "internalSeparator"
> > > > > method but no implementation for it); since we don't usually
> require
> > > > > specific code changes in KIPs, I think as long as we can describe
> the
> > > > > changes we're proposing in the "Public Interfaces" section, that
> > > should be
> > > > > enough for this KIP
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Mon, Jul 24, 2023 at 2:04 AM Federico Valeri <
> fedeval...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > +1 (non binding)
> > > > > >
> > > > > > Thanks
> > > > > > Fede
> > > > > >
> > > > > >
> > > > > > On Sun, Jul 23, 2023 at 6:30 PM Omnia Ibrahim <
> > > o.g.h.ibra...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi everyone,
> > > > > > > I would like to open a vote for KIP-949. The proposal is here
> > > > > > >
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy
> > > > > > > .
> > > > > > > <
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy
> > > > > > >
> > > > > > >
> > > > > > > Thanks
> > > > > > > Omnia
> > > > > >
> > > > >
> > >
>


Re: [VOTE] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-08-01 Thread Omnia Ibrahim
Thanks for the binding vote, Greg, We now need one extra binding vote to
get this KIP accepted.

On Tue, Jul 25, 2023 at 8:10 PM Greg Harris 
wrote:

> Hey Omnia,
>
> Thanks for the KIP!
>
> I think that MM2 is responsible for providing an upgrade path for
> users, even if it isn't backwards-compatible by default due to a
> mistake.
> The non-configuration-based strategies I could think of aren't viable
> due to the danger of inferring the incorrect topic name, and inherent
> complexity which makes them hard to backport.
> I also support the decision to backport this to 3.1 - 3.5, so that MM2
> users can upgrade in minor version increments after those patch
> releases go out.
>
> I'm +1 (binding).
>
> Thanks,
> Greg
>
> On Mon, Jul 24, 2023 at 7:21 AM Omnia Ibrahim 
> wrote:
> >
> > Hi Chris, I updated the KIP to address your feedback. Thanks for the
> vote.
> >
> > On Mon, Jul 24, 2023 at 1:30 PM Chris Egerton 
> > wrote:
> >
> > > Hi Omnia,
> > >
> > > I think there's a few clarifications that should still be made on the
> KIP,
> > > but assuming these are agreeable, I'm +1 (binding)
> > >
> > > - In the description for the
> > > replication.policy.internal.topic.separator.enabled property (in the
> > > "Public Interfaces" section), we should specify that it affects only
> the
> > > checkpoints and offset syncs topic
> > > - We can remove the code snippet from the "Proposed Changes" section
> (right
> > > now it's a little buggy; there's two different implementations for the
> same
> > > "internalSuffix" method, and there are references to an
> "internalSeparator"
> > > method but no implementation for it); since we don't usually require
> > > specific code changes in KIPs, I think as long as we can describe the
> > > changes we're proposing in the "Public Interfaces" section, that
> should be
> > > enough for this KIP
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Mon, Jul 24, 2023 at 2:04 AM Federico Valeri 
> > > wrote:
> > >
> > > > +1 (non binding)
> > > >
> > > > Thanks
> > > > Fede
> > > >
> > > >
> > > > On Sun, Jul 23, 2023 at 6:30 PM Omnia Ibrahim <
> o.g.h.ibra...@gmail.com>
> > > > wrote:
> > > > >
> > > > > Hi everyone,
> > > > > I would like to open a vote for KIP-949. The proposal is here
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy
> > > > > .
> > > > > <
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy
> > > > >
> > > > >
> > > > > Thanks
> > > > > Omnia
> > > >
> > >
>


Re: [VOTE] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-24 Thread Omnia Ibrahim
Hi Chris, I updated the KIP to address your feedback. Thanks for the vote.

On Mon, Jul 24, 2023 at 1:30 PM Chris Egerton 
wrote:

> Hi Omnia,
>
> I think there's a few clarifications that should still be made on the KIP,
> but assuming these are agreeable, I'm +1 (binding)
>
> - In the description for the
> replication.policy.internal.topic.separator.enabled property (in the
> "Public Interfaces" section), we should specify that it affects only the
> checkpoints and offset syncs topic
> - We can remove the code snippet from the "Proposed Changes" section (right
> now it's a little buggy; there's two different implementations for the same
> "internalSuffix" method, and there are references to an "internalSeparator"
> method but no implementation for it); since we don't usually require
> specific code changes in KIPs, I think as long as we can describe the
> changes we're proposing in the "Public Interfaces" section, that should be
> enough for this KIP
>
> Cheers,
>
> Chris
>
> On Mon, Jul 24, 2023 at 2:04 AM Federico Valeri 
> wrote:
>
> > +1 (non binding)
> >
> > Thanks
> > Fede
> >
> >
> > On Sun, Jul 23, 2023 at 6:30 PM Omnia Ibrahim 
> > wrote:
> > >
> > > Hi everyone,
> > > I would like to open a vote for KIP-949. The proposal is here
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy
> > > .
> > > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy
> > >
> > >
> > > Thanks
> > > Omnia
> >
>


Re: [DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-23 Thread Omnia Ibrahim
Thanks for both of your feedback and time, I updated the rejected
alternative section in the KIP and opened a voting thread here
https://lists.apache.org/thread/cy04n445noyp0pqztlp8rk74crvhlrk7
I'll work on the PR in meanwhile so we are ready to go once we get 3
binding votes in order to get into 3.6 release.

Cheers
Omnia

On Fri, Jul 21, 2023 at 3:43 PM Federico Valeri 
wrote:

> Hi, the point that the legacy approach can only be taken once is
> valid, so LGTM. Thanks.
>
> Cheers
> Fede
>
> On Fri, Jul 21, 2023 at 4:28 PM Chris Egerton 
> wrote:
> >
> > Hi Omnia,
> >
> > LGTM, thanks! We may want to note the LegacyReplicationPolicy option in
> the
> > rejected alternatives section in case others prefer that option.
> >
> > Given that we'd like this to land in time for 3.6.0, you may want to
> start
> > a vote thread soon.
> >
> > Cheers,
> >
> > Chris
> >
> > On Fri, Jul 21, 2023 at 10:08 AM Omnia Ibrahim 
> > wrote:
> >
> > > Hi Chris and Federico,
> > > thinking about I think Chris's concern is valid and the bigger concern
> here
> > > is that having this `LegacyReplicationPolicy` will eventually open the
> door
> > > for diversion at some point between this `LegacyReplicationPolicy` and
> the
> > > default one.
> > > For now, let's have the flag properly fix this bug and we can keep it
> as an
> > > option for people to switch between both behaviours. I know having a
> > > bug-fix property is not great but we can treat it as a backward
> > > compatibility property instead in order to keep old mirrors using the
> old
> > > internal topics.
> > >
> > > Hope this is reasonable for the time being.
> > >
> > > Cheers,
> > > Omnia
> > >
> > > On Wed, Jul 19, 2023 at 11:16 PM Chris Egerton  >
> > > wrote:
> > >
> > > > Hi Federico,
> > > >
> > > > Ah yes, sorry about that! You're correct that this would keep the two
> > > > classes in line and largely eliminate the concern I posed about
> porting
> > > > changes to both. Still, I'm a bit hesitant, and I'm not actually
> certain
> > > > that this alternative is more intuitive. The name isn't very
> descriptive,
> > > > and this is the kind of approach we can only really take once; if we
> > > break
> > > > compatibility again, would we introduce a
> LegacyLegacyReplicationPolicy?
> > > > LegacyReplicationPolicy2? Finally, it seems a bit strange to
> introduce a
> > > > new class to implement a change in behavior this small.
> > > >
> > > > That said, I don't think this is worth blocking on and wouldn't be
> > > opposed
> > > > if others felt strongly that a new replication policy class is
> superior
> > > to
> > > > a new property on the existing class.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Wed, Jul 19, 2023 at 2:53 PM Federico Valeri <
> fedeval...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Chris, the KIP says it would be a subclass of
> > > DefaultReplicationPolicy
> > > > > that overrides the ReplicationPolicy.offsetSyncsTopic and
> > > > > ReplicationPolicy.checkpointsTopic. So, not much to maintain and it
> > > would
> > > > > be more intuitive, as you say.
> > > > >
> > > > > On Wed, Jul 19, 2023, 4:50 PM Chris Egerton
> 
> > > > > wrote:
> > > > >
> > > > > > HI all,
> > > > > >
> > > > > > I'm not sure I understand the benefits of introducing a separate
> > > > > > replication policy class, besides maybe being more
> readable/intuitive
> > > > to
> > > > > > users who would want to know when to use one or the other. It
> feels
> > > > like
> > > > > > we've swapped out a "fix the bug" property for an entire "fix the
> > > bug"
> > > > > > class, and it still leaves the problem of graceful migration from
> > > > legacy
> > > > > > behavior to new behavior unsolved. It would also require us to
> > > consider
> > > > > > whether any future changes we make to the
> DefaultReplicationPolicy
> > > > class
> > > > > > would have to be ported over to the LegacyReplicationPolicy
> class as

[VOTE] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-23 Thread Omnia Ibrahim
Hi everyone,
I would like to open a vote for KIP-949. The proposal is here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy
.


Thanks
Omnia


Re: [DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-21 Thread Omnia Ibrahim
Hi Chris and Federico,
thinking about I think Chris's concern is valid and the bigger concern here
is that having this `LegacyReplicationPolicy` will eventually open the door
for diversion at some point between this `LegacyReplicationPolicy` and the
default one.
For now, let's have the flag properly fix this bug and we can keep it as an
option for people to switch between both behaviours. I know having a
bug-fix property is not great but we can treat it as a backward
compatibility property instead in order to keep old mirrors using the old
internal topics.

Hope this is reasonable for the time being.

Cheers,
Omnia

On Wed, Jul 19, 2023 at 11:16 PM Chris Egerton 
wrote:

> Hi Federico,
>
> Ah yes, sorry about that! You're correct that this would keep the two
> classes in line and largely eliminate the concern I posed about porting
> changes to both. Still, I'm a bit hesitant, and I'm not actually certain
> that this alternative is more intuitive. The name isn't very descriptive,
> and this is the kind of approach we can only really take once; if we break
> compatibility again, would we introduce a LegacyLegacyReplicationPolicy?
> LegacyReplicationPolicy2? Finally, it seems a bit strange to introduce a
> new class to implement a change in behavior this small.
>
> That said, I don't think this is worth blocking on and wouldn't be opposed
> if others felt strongly that a new replication policy class is superior to
> a new property on the existing class.
>
> Cheers,
>
> Chris
>
> On Wed, Jul 19, 2023 at 2:53 PM Federico Valeri 
> wrote:
>
> > Hi Chris, the KIP says it would be a subclass of DefaultReplicationPolicy
> > that overrides the ReplicationPolicy.offsetSyncsTopic and
> > ReplicationPolicy.checkpointsTopic. So, not much to maintain and it would
> > be more intuitive, as you say.
> >
> > On Wed, Jul 19, 2023, 4:50 PM Chris Egerton 
> > wrote:
> >
> > > HI all,
> > >
> > > I'm not sure I understand the benefits of introducing a separate
> > > replication policy class, besides maybe being more readable/intuitive
> to
> > > users who would want to know when to use one or the other. It feels
> like
> > > we've swapped out a "fix the bug" property for an entire "fix the bug"
> > > class, and it still leaves the problem of graceful migration from
> legacy
> > > behavior to new behavior unsolved. It would also require us to consider
> > > whether any future changes we make to the DefaultReplicationPolicy
> class
> > > would have to be ported over to the LegacyReplicationPolicy class as
> > well.
> > >
> > > Perhaps I'm missing something; are there other benefits of introducing
> a
> > > separate replication policy class?
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Wed, Jul 19, 2023 at 5:45 AM Omnia Ibrahim  >
> > > wrote:
> > >
> > > > Hi Federico,
> > > > I like the idea of implementing `LegacyReplicationPolicy` and
> avoiding
> > > bug
> > > > fixes properties. We can drop the idea of the property and just go
> > ahead
> > > > with introducing the `LegacyReplicationPolicy` and any user upgrade
> > from
> > > > pre-KIP-690 can use this policy instead of default and no impact will
> > > > happen to users upgrading from 3.x to post-KIP-949. We can mark
> > > > `LegacyReplicationPolicy` as deprecated later if we want (but not in
> > 4.0
> > > as
> > > > this is very soon) and we can drop it entirely at some point.
> > > >
> > > > If there's an agreement on this approach I can upgrade the KIP.
> > > >
> > > > Cheers.
> > > > Omnia
> > > >
> > > > On Wed, Jul 19, 2023 at 8:52 AM Federico Valeri <
> fedeval...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi, one way to avoid the "fix the bug property" would be to provide
> > > > > and document an additional LegacyReplicationPolicy, but still
> keeping
> > > > > the current DefaultReplicationPolicy as replication.policy.class
> > > > > default value, which is basically one of the workarounds suggested
> in
> > > > > the KIP.
> > > > >
> > > > > On Tue, Jul 18, 2023 at 9:49 PM Chris Egerton
> >  > > >
> > > > > wrote:
> > > > > >
> > > > > > Hi Federico/Omnia,
> > > > > >
> > > > > > Generally I like the idea of deprecating and eventually remov

Re: [DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-19 Thread Omnia Ibrahim
Hi Federico,
I like the idea of implementing `LegacyReplicationPolicy` and avoiding bug
fixes properties. We can drop the idea of the property and just go ahead
with introducing the `LegacyReplicationPolicy` and any user upgrade from
pre-KIP-690 can use this policy instead of default and no impact will
happen to users upgrading from 3.x to post-KIP-949. We can mark
`LegacyReplicationPolicy` as deprecated later if we want (but not in 4.0 as
this is very soon) and we can drop it entirely at some point.

If there's an agreement on this approach I can upgrade the KIP.

Cheers.
Omnia

On Wed, Jul 19, 2023 at 8:52 AM Federico Valeri 
wrote:

> Hi, one way to avoid the "fix the bug property" would be to provide
> and document an additional LegacyReplicationPolicy, but still keeping
> the current DefaultReplicationPolicy as replication.policy.class
> default value, which is basically one of the workarounds suggested in
> the KIP.
>
> On Tue, Jul 18, 2023 at 9:49 PM Chris Egerton 
> wrote:
> >
> > Hi Federico/Omnia,
> >
> > Generally I like the idea of deprecating and eventually removing "fix the
> > bug" properties like this, but 4.0 may be a bit soon. I'm also unsure of
> > how we would instruct users who are relying on the property being set to
> > "false" to migrate to a version of MM2 that doesn't support support it,
> > beyond simply implementing their own replication policy--at which point,
> > would we really be doing anyone a favor by forcing them to fork the
> default
> > policy just to preserve a property we removed?
> >
> > I guess right now I'd rather play it safe and not immediately deprecate
> the
> > property. If we can find an easy migration path for users who are relying
> > on it, then I'd be happy to deprecate and schedule for removal.
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Jul 18, 2023 at 12:54 PM Omnia Ibrahim 
> > wrote:
> >
> > > Hi Federico,
> > > I don't have any strong opinion one way or the other. The pro of
> > > deprecation is not adding more configs to MM2 as it has too many
> configs
> > > already. However, we need to think about old MM2 migrating their
> internal
> > > topics to 4.0 with less impact.
> > >
> > > @Chris what do you think?
> > >
> > > Cheers
> > > Omnia
> > >
> > > On Fri, Jul 14, 2023 at 2:38 PM Federico Valeri 
> > > wrote:
> > >
> > > > Hi Omnia and Chris, I agree with setting
> > > > "replication.policy.internal.topic.separator.enabled=true" by
> default,
> > > > but I was wondering if we should also deprecate and remove in Kafka
> 4.
> > > > If there is agreement in having the same behavior for internal and
> > > > non-internal topics, then it should be fine, and we won't need to
> keep
> > > > an additional configuration around. Wdyt?
> > > >
> > > > Cheers
> > > > Fede
> > > >
> > > > On Fri, Jul 14, 2023 at 1:51 PM Omnia Ibrahim <
> o.g.h.ibra...@gmail.com>
> > > > wrote:
> > > > >
> > > > > Hi Chris, I added a section for backport plan here
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy#KIP949:AddflagtoenabletheusageoftopicseparatorinMM2DefaultReplicationPolicy-Backportingplan
> > > > >
> > > > > Cheers,
> > > > > Omnia
> > > > >
> > > > > > On 13 Jul 2023, at 19:33, Chris Egerton  >
> > > > wrote:
> > > > > >
> > > > > > Hi Omnia,
> > > > > >
> > > > > > Yes, I think we ought to state the backport plan in the KIP,
> since
> > > it's
> > > > > > highly unusual for KIP changes or new configuration properties
> to be
> > > > > > backported and we should get the approval of the community
> (including
> > > > > > binding and non-binding voters) before implementing it.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Chris
> > > > > >
> > > > > > On Thu, Jul 13, 2023 at 7:13 AM Omnia Ibrahim <
> > > o.g.h.ibra...@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Chris,
> > > > > >> The implementation should be very small so backporting this to

Re: [DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-18 Thread Omnia Ibrahim
Hi Federico,
I don't have any strong opinion one way or the other. The pro of
deprecation is not adding more configs to MM2 as it has too many configs
already. However, we need to think about old MM2 migrating their internal
topics to 4.0 with less impact.

@Chris what do you think?

Cheers
Omnia

On Fri, Jul 14, 2023 at 2:38 PM Federico Valeri 
wrote:

> Hi Omnia and Chris, I agree with setting
> "replication.policy.internal.topic.separator.enabled=true" by default,
> but I was wondering if we should also deprecate and remove in Kafka 4.
> If there is agreement in having the same behavior for internal and
> non-internal topics, then it should be fine, and we won't need to keep
> an additional configuration around. Wdyt?
>
> Cheers
> Fede
>
> On Fri, Jul 14, 2023 at 1:51 PM Omnia Ibrahim 
> wrote:
> >
> > Hi Chris, I added a section for backport plan here
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy#KIP949:AddflagtoenabletheusageoftopicseparatorinMM2DefaultReplicationPolicy-Backportingplan
> >
> > Cheers,
> > Omnia
> >
> > > On 13 Jul 2023, at 19:33, Chris Egerton 
> wrote:
> > >
> > > Hi Omnia,
> > >
> > > Yes, I think we ought to state the backport plan in the KIP, since it's
> > > highly unusual for KIP changes or new configuration properties to be
> > > backported and we should get the approval of the community (including
> > > binding and non-binding voters) before implementing it.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Thu, Jul 13, 2023 at 7:13 AM Omnia Ibrahim  >
> > > wrote:
> > >
> > >> Hi Chris,
> > >> The implementation should be very small so backporting this to 3.1
> and 3.2
> > >> would be perfect for this case if you or any other committer are okay
> with
> > >> approving the backporting. Do we need to state this in KIP as well or
> not?
> > >>
> > >> Also, I’ll open a vote for the KIP today and prepare the pr for it so
> we
> > >> can merge it as soon as we can.
> > >>
> > >> Thanks,
> > >>
> > >> Omnia
> > >>
> > >> On Wed, Jul 12, 2023 at 4:31 PM Chris Egerton  >
> > >> wrote:
> > >>
> > >>> Hi Omnia,
> > >>>
> > >>> Thanks for changing the default, LGTM 
> > >>>
> > >>> As far as backporting goes, we probably won't be doing another
> release
> > >> for
> > >>> 3.1, and possibly not for 3.2 either; however, if we can make the
> > >>> implementation focused enough (which I don't think would be too
> > >> difficult,
> > >>> but correct me if I'm wrong), then we can still backport through 3.1.
> > >> Even
> > >>> if we don't do another release it can make life easier for people
> who are
> > >>> maintaining parallel forks. Obviously this shouldn't be taken as a
> > >> blanket
> > >>> precedent but in this case it seems like the benefits may outweigh
> the
> > >>> costs. What are your thoughts?
> > >>>
> > >>> Cheers,
> > >>>
> > >>> Chris
> > >>>
> > >>> On Wed, Jul 12, 2023 at 9:05 AM Omnia Ibrahim <
> o.g.h.ibra...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi Chris, thanks for the feedback.
> > >>>> 1. regarding the default value I had the same conflict of which
> version
> > >>> to
> > >>>> break the backward compatibility with. We can just say that this KIP
> > >>> gives
> > >>>> the release Pre KIP-690 the ability to keep the old behaviour with
> one
> > >>>> config and keep the backwards compatibility from post-KIP-690 the
> same
> > >> so
> > >>>> we don't break at least the last 3 versions. I will update the KIP
> to
> > >>>> switch the default value to true.
> > >>>> 2. For the backporting, which versions can we backport these to?
> > >> Usually,
> > >>>> Kafka supports bugfix releases as needed for the last 3 releases.
> Now
> > >> we
> > >>> @
> > >>>> 3.5 so the last 3 are 3.4, 3.3 and 3.2 is this correct?
> > >>>> 3. I'll add a Jira for updating the docs for this KIP so we don't
> > >

Re: [DISCUSS] KIP-936: Throttle number of active PIDs

2023-07-16 Thread Omnia Ibrahim
Thanks Claude for the feedback and the raising this implementation to
Apache commons-collections.
I had a look into your layered bloom filter and at first glance, I think it
would be a better improvement, however, regarding the following suggestion

> By hashing the principal and PID into the filter as a single hash only
one Bloom filter is required.

I am not sure how this will work when we have different producer-id-rate
for different KafkaPrincipal as proposed in the KIP.
For example `userA` had producer-id-rate of 1000 per hour while `user2` has
a quota of 100 producer ids per hour. How will we configure the max entries
for the Shape?

The only thing that comes to my mind to maintain this desired behavior in
the KIP is to NOT hash PID with KafkaPrincipal and keep a
Map
then each one of these bloom filters is controlled with
`Shape(, 0.1)`.

Does that make sense? WDYT?

Also regarding the eviction function
> (evict function)
> The evict function will determine if it has been a minute since the last
> time we created a new layer, if so create a new layer and evict all layers
> older than 1 hour.  Since the layers are a time ordered list this is
simply
> removing the elderly layers from the front of the list.

Maybe am missing something here but I can't find anything in the
`LayerManager` code that point to how often will the eviction function
runs. Do you mean that the eviction function runs every minute? If so, can
we control this?

Cheers,
Omnia

On Wed, Jun 21, 2023 at 11:43 AM Claude Warren  wrote:

> I think that the either using a Stable bloom filter or a Layered bloom
> filter constructed as follows:
>
>
>- Each layer is configured for the maximum number of principal-PID pairs
>expected in a single minute.
>- Expect 60 layers (one for each minute)
>- If the layer becomes fully populated add another layer.
>- When the time to insert into the current layer expires, remove the
>layers that are older than an hour.
>
> This will provide a sliding window of one hour with the ability to handle
> bursts above the expected rate of inserts without additional false
> positives.
>
> By hashing the principal and PID into the filter as a single hash only one
> Bloom filter is required.
>
> The code I pointed to earlier uses the common-collections4 Bloom filter
> implementation.  So a rough pseudo code for the implementation is:
>
> Shape shap = Shape.fromNP( 1000, 0.1 ); // 1000 entries, 0.1 false positive
> rate
> LayeredBloomFilter filter = LayeredBloomFilter( shape, 60, evictFunc ); //
> 60 initial layers, eviction function.
>
> (on PID)
>
> long[2] buff = Murmur3Hash.hash128x64( String.format("%s%s", principal, PID
> ).getBytes(java.nio.charset.Charset.UTF8));
> Hasher hasher = new EnhancedDoubleHasher( buff[0], buff[1] );
> if (filter.contains(hasher)) {
> // handle case where principal-pid was already seen
> }
> filter.merge( hasher ); // ensures that principal-pid remains in seen for
> the next hour.
>
> (evict function)
> The evict function will determine if it has been a minute since the last
> time we created a new layer, if so create a new layer and evict all layers
> older than 1 hour.  Since the layers are a time ordered list this is simply
> removing the elderly layers from the front of the list.
>
> if it has not been an hour and the current filter is full (e.g. has 1000
> entries) create a new layer
>
> This should be very fast and space efficient.
>
>
> On Wed, Jun 21, 2023 at 11:13 AM Claude Warren  wrote:
>
> > I have an implementation of a layered Bloom filter in [1] (note the
> > layered branch).  This should handle the layering Bloom filter and allow
> > for layers that
> >
> >1. Do not become over populated and thus yield too many false
> >positives.
> >2. Expire and are removed automatically.
> >
> > The layered Bloom filter also does not need another thread to remove the
> > old filters as this is amortized across all the inserts.
> >
> > In addition, the number of Layered Bloom filter instances can be reduced
> > by hashing the Kafka Principal and the ID together into the Bloom filter
> to
> > look for.
> >
> > [1] https://github.com/Claudenw/BloomFilters/tree/layered
> >
> > On Sun, Jun 18, 2023 at 10:18 PM Omnia Ibrahim 
> > wrote:
> >
> >> Hi Haruki, Thanks for having a look at the KIP.
> >> > 1. Do you have any memory-footprint estimation for
> >> TimeControlledBloomFilter?
> >> I don't at the moment have any estimate as I don't have a full
> >> implementation of this one at the moment. I can work on one if it's
> >> required.
> >>
> >> > * If I read the KIP correc

Re: [DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-14 Thread Omnia Ibrahim
Hi Chris, I added a section for backport plan here 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy#KIP949:AddflagtoenabletheusageoftopicseparatorinMM2DefaultReplicationPolicy-Backportingplan

Cheers, 
Omnia 

> On 13 Jul 2023, at 19:33, Chris Egerton  wrote:
> 
> Hi Omnia,
> 
> Yes, I think we ought to state the backport plan in the KIP, since it's
> highly unusual for KIP changes or new configuration properties to be
> backported and we should get the approval of the community (including
> binding and non-binding voters) before implementing it.
> 
> Cheers,
> 
> Chris
> 
> On Thu, Jul 13, 2023 at 7:13 AM Omnia Ibrahim 
> wrote:
> 
>> Hi Chris,
>> The implementation should be very small so backporting this to 3.1 and 3.2
>> would be perfect for this case if you or any other committer are okay with
>> approving the backporting. Do we need to state this in KIP as well or not?
>> 
>> Also, I’ll open a vote for the KIP today and prepare the pr for it so we
>> can merge it as soon as we can.
>> 
>> Thanks,
>> 
>> Omnia
>> 
>> On Wed, Jul 12, 2023 at 4:31 PM Chris Egerton 
>> wrote:
>> 
>>> Hi Omnia,
>>> 
>>> Thanks for changing the default, LGTM 
>>> 
>>> As far as backporting goes, we probably won't be doing another release
>> for
>>> 3.1, and possibly not for 3.2 either; however, if we can make the
>>> implementation focused enough (which I don't think would be too
>> difficult,
>>> but correct me if I'm wrong), then we can still backport through 3.1.
>> Even
>>> if we don't do another release it can make life easier for people who are
>>> maintaining parallel forks. Obviously this shouldn't be taken as a
>> blanket
>>> precedent but in this case it seems like the benefits may outweigh the
>>> costs. What are your thoughts?
>>> 
>>> Cheers,
>>> 
>>> Chris
>>> 
>>> On Wed, Jul 12, 2023 at 9:05 AM Omnia Ibrahim 
>>> wrote:
>>> 
>>>> Hi Chris, thanks for the feedback.
>>>> 1. regarding the default value I had the same conflict of which version
>>> to
>>>> break the backward compatibility with. We can just say that this KIP
>>> gives
>>>> the release Pre KIP-690 the ability to keep the old behaviour with one
>>>> config and keep the backwards compatibility from post-KIP-690 the same
>> so
>>>> we don't break at least the last 3 versions. I will update the KIP to
>>>> switch the default value to true.
>>>> 2. For the backporting, which versions can we backport these to?
>> Usually,
>>>> Kafka supports bugfix releases as needed for the last 3 releases. Now
>> we
>>> @
>>>> 3.5 so the last 3 are 3.4, 3.3 and 3.2 is this correct?
>>>> 3. I'll add a Jira for updating the docs for this KIP so we don't
>> forget
>>>> about it.
>>>> 
>>>> Thanks
>>>> Omnia
>>>> 
>>>> 
>>>> On Mon, Jul 10, 2023 at 5:33 PM Chris Egerton >> 
>>>> wrote:
>>>> 
>>>>> Hi Omnia,
>>>>> 
>>>>> Thanks for taking this on! I have some thoughts but the general
>>> approach
>>>>> looks good.
>>>>> 
>>>>> 1. Default value
>>>>> 
>>>>> One thing I'm wrestling with is what the default value of the new
>>>> property
>>>>> should be. I know on the Jira ticket I proposed that it should be
>>> false,
>>>>> but I'm having second thoughts. Technically we'd preserve backward
>>>>> compatibility with pre-KIP-690 releases by defaulting to false, but
>> at
>>>> the
>>>>> same time, we'd break compatibility with post-KIP-690 releases. And
>> if
>>> we
>>>>> default to true, the opposite would be true: compatibility would be
>>>> broken
>>>>> with pre-KIP-690 releases, but preserved with post-KIP-690 releases.
>>>>> 
>>>>> One argument against defaulting to false (which, again, would
>> preserve
>>>> the
>>>>> behavior of MM2 before we accidentally broke compatibility with
>>> KIP-690)
>>>> is
>>>>> that this change could possibly cause a single MM2 setup to break
>>>>> twice--once when upgrading from a pre-KIP-690 release to an existing

[Vote] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-13 Thread Omnia Ibrahim
Hi Everyone! I would like to start the vote on KIP-949 details is here  
https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy

Thanks
Omnia

Re: [DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-13 Thread Omnia Ibrahim
Hi Chris,
The implementation should be very small so backporting this to 3.1 and 3.2
would be perfect for this case if you or any other committer are okay with
approving the backporting. Do we need to state this in KIP as well or not?

Also, I’ll open a vote for the KIP today and prepare the pr for it so we
can merge it as soon as we can.

Thanks,

 Omnia

On Wed, Jul 12, 2023 at 4:31 PM Chris Egerton 
wrote:

> Hi Omnia,
>
> Thanks for changing the default, LGTM 
>
> As far as backporting goes, we probably won't be doing another release for
> 3.1, and possibly not for 3.2 either; however, if we can make the
> implementation focused enough (which I don't think would be too difficult,
> but correct me if I'm wrong), then we can still backport through 3.1. Even
> if we don't do another release it can make life easier for people who are
> maintaining parallel forks. Obviously this shouldn't be taken as a blanket
> precedent but in this case it seems like the benefits may outweigh the
> costs. What are your thoughts?
>
> Cheers,
>
> Chris
>
> On Wed, Jul 12, 2023 at 9:05 AM Omnia Ibrahim 
> wrote:
>
> > Hi Chris, thanks for the feedback.
> > 1. regarding the default value I had the same conflict of which version
> to
> > break the backward compatibility with. We can just say that this KIP
> gives
> > the release Pre KIP-690 the ability to keep the old behaviour with one
> > config and keep the backwards compatibility from post-KIP-690 the same so
> > we don't break at least the last 3 versions. I will update the KIP to
> > switch the default value to true.
> > 2. For the backporting, which versions can we backport these to? Usually,
> > Kafka supports bugfix releases as needed for the last 3 releases. Now we
> @
> > 3.5 so the last 3 are 3.4, 3.3 and 3.2 is this correct?
> > 3. I'll add a Jira for updating the docs for this KIP so we don't forget
> > about it.
> >
> > Thanks
> > Omnia
> >
> >
> > On Mon, Jul 10, 2023 at 5:33 PM Chris Egerton 
> > wrote:
> >
> > > Hi Omnia,
> > >
> > > Thanks for taking this on! I have some thoughts but the general
> approach
> > > looks good.
> > >
> > > 1. Default value
> > >
> > > One thing I'm wrestling with is what the default value of the new
> > property
> > > should be. I know on the Jira ticket I proposed that it should be
> false,
> > > but I'm having second thoughts. Technically we'd preserve backward
> > > compatibility with pre-KIP-690 releases by defaulting to false, but at
> > the
> > > same time, we'd break compatibility with post-KIP-690 releases. And if
> we
> > > default to true, the opposite would be true: compatibility would be
> > broken
> > > with pre-KIP-690 releases, but preserved with post-KIP-690 releases.
> > >
> > > One argument against defaulting to false (which, again, would preserve
> > the
> > > behavior of MM2 before we accidentally broke compatibility with
> KIP-690)
> > is
> > > that this change could possibly cause a single MM2 setup to break
> > > twice--once when upgrading from a pre-KIP-690 release to an existing
> > > release, and again when upgrading from that existing release to a
> version
> > > that reverted (by default) to pre-KIP-690 behavior. On the other hand,
> if
> > > we default to true (which would preserve the existing behavior that
> > breaks
> > > compatibility with pre-KIP-690 releases), then any given setup will
> only
> > be
> > > broken once.
> > >
> > > In addition, if we default to true right now, then we don't have to
> worry
> > > about changing that default in 4.0 to a more intuitive value (I hope we
> > can
> > > all agree that, for new clusters, it makes sense to set this property
> to
> > > true and not to distinguish between internal and non-internal topics).
> > >
> > > With that in mind, I'm now leaning more towards defaulting to true, but
> > > would be interested in your thoughts.
> > >
> > >
> > > 2. Backport?
> > >
> > > It's highly unlikely to backport changes for a KIP, but given the
> impact
> > of
> > > the compatibility break that we're trying to address here, and the
> > > extremely low risk of the proposed changes, I think we should consider
> > > backporting the proposed fix to all affected release branches (i.e.,
> 3.1
> > > through 3.5).
> > >
> > >
> > > 3. Extra steps
> > >
> > > I also think we can take these a

Re: [DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-12 Thread Omnia Ibrahim
Hi Chris, thanks for the feedback.
1. regarding the default value I had the same conflict of which version to
break the backward compatibility with. We can just say that this KIP gives
the release Pre KIP-690 the ability to keep the old behaviour with one
config and keep the backwards compatibility from post-KIP-690 the same so
we don't break at least the last 3 versions. I will update the KIP to
switch the default value to true.
2. For the backporting, which versions can we backport these to? Usually,
Kafka supports bugfix releases as needed for the last 3 releases. Now we @
3.5 so the last 3 are 3.4, 3.3 and 3.2 is this correct?
3. I'll add a Jira for updating the docs for this KIP so we don't forget
about it.

Thanks
Omnia


On Mon, Jul 10, 2023 at 5:33 PM Chris Egerton 
wrote:

> Hi Omnia,
>
> Thanks for taking this on! I have some thoughts but the general approach
> looks good.
>
> 1. Default value
>
> One thing I'm wrestling with is what the default value of the new property
> should be. I know on the Jira ticket I proposed that it should be false,
> but I'm having second thoughts. Technically we'd preserve backward
> compatibility with pre-KIP-690 releases by defaulting to false, but at the
> same time, we'd break compatibility with post-KIP-690 releases. And if we
> default to true, the opposite would be true: compatibility would be broken
> with pre-KIP-690 releases, but preserved with post-KIP-690 releases.
>
> One argument against defaulting to false (which, again, would preserve the
> behavior of MM2 before we accidentally broke compatibility with KIP-690) is
> that this change could possibly cause a single MM2 setup to break
> twice--once when upgrading from a pre-KIP-690 release to an existing
> release, and again when upgrading from that existing release to a version
> that reverted (by default) to pre-KIP-690 behavior. On the other hand, if
> we default to true (which would preserve the existing behavior that breaks
> compatibility with pre-KIP-690 releases), then any given setup will only be
> broken once.
>
> In addition, if we default to true right now, then we don't have to worry
> about changing that default in 4.0 to a more intuitive value (I hope we can
> all agree that, for new clusters, it makes sense to set this property to
> true and not to distinguish between internal and non-internal topics).
>
> With that in mind, I'm now leaning more towards defaulting to true, but
> would be interested in your thoughts.
>
>
> 2. Backport?
>
> It's highly unlikely to backport changes for a KIP, but given the impact of
> the compatibility break that we're trying to address here, and the
> extremely low risk of the proposed changes, I think we should consider
> backporting the proposed fix to all affected release branches (i.e., 3.1
> through 3.5).
>
>
> 3. Extra steps
>
> I also think we can take these additional steps to try to help prevent
> users from being bitten by this change:
>
> - Add a note to our upgrade instructions [1] for all affected versions that
> instructs users on how to safely upgrade to a post-KIP-690 release, for
> versions that both do and do not include the changes from this KIP
> - Log a warning message on MM2 startup if the config contains an explicit
> value for "replication.policy.separator" but does not contain an explicit
> value for "replication.policy.internal.topic.separator.enabled"
>
> These details don't necessarily have to be codified in the KIP, but they're
> worth taking into account when considering how to design any functional
> changes in order to better try to gauge how well this could go for our
> users.
>
> [1] - https://kafka.apache.org/documentation.html#upgrade
>
>
> Thanks again for the KIP!
>
> Cheers,
>
> Chris
>
> On Fri, Jul 7, 2023 at 10:12 AM Omnia Ibrahim 
> wrote:
>
> > Hi everyone,
> > I want to start the discussion of the KIP-949. The proposal is here
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy
> > >
> >
> > Thanks for your time and feedback.
> > Omnia
> >
>


[DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-07 Thread Omnia Ibrahim
Hi everyone,
I want to start the discussion of the KIP-949. The proposal is here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy


Thanks for your time and feedback.
Omnia


Re: [DISCUSS] KIP-936: Throttle number of active PIDs

2023-06-18 Thread Omnia Ibrahim
Hi Haruki, Thanks for having a look at the KIP.
> 1. Do you have any memory-footprint estimation for
TimeControlledBloomFilter?
I don't at the moment have any estimate as I don't have a full
implementation of this one at the moment. I can work on one if it's
required.

> * If I read the KIP correctly, TimeControlledBloomFilter will be
> allocated per KafkaPrincipal so the size should be reasonably small
> considering clusters which have a large number of users.
The Map stored in the cache has 2 dimensions one is vertical which is
KafkaPrincipal (producers only) and the second horizontal which is the time
of the windows.
- Horizontally we will add only PIDs to the TimeControlledBloomFilter only
if KafkaPrincipal didn't hit the quota and we control the bloom filter by
time to expire the oldest set at some point when it's not needed anymore.
- Vertically is the tricky one if the cluster has an insane number of
KafkaPrincipals used for producing. And if the number of KafkaPrincipals is
huge we can control the memory used by the cache by throttling more
aggressively and I would argue that they will never going to be an insane
number that could cause OOM.

 >* i.e. What false-positive rate do you plan to choose as the default?
Am planning on using 0.1 as default.

> 2. What do you think about rotating windows on produce-requests arrival
instead of scheduler?
> * If we do rotation in scheduler threads, my concern is potential
> scheduler threads occupation which could make other background tasks to
> delay
This is a valid concern. We can consider disposing of the oldest bloom when
we add a new PID to the TimeControlledBloomFilter. However, I would still
need a scheduler to clean up any inactive KafkaPrincipal from the cache
layer `i.e. ProducerIdQuotaManagerCache`. Do you have the same concern
about this one too?

> 3. Why the default producer.id.quota.window.size.seconds is 1 hour?
>  * Unlike other quota types (1 second)
Mostly because 1 sec doesn't make sense for this type of quota.
Misconfigured or misbehaving producers usually don't allocate new PIDs on
the leader every sec but over a period of time.

Thanks

On Tue, Jun 6, 2023 at 5:21 PM Haruki Okada  wrote:

> Hi, Omnia.
>
> Thanks for the KIP.
> The feature sounds indeed helpful and the strategy to use bloom-filter
> looks good.
>
> I have three questions:
>
> 1. Do you have any memory-footprint estimation
> for TimeControlledBloomFilter?
> * If I read the KIP correctly, TimeControlledBloomFilter will be
> allocated per KafkaPrincipal so the size should be reasonably small
> considering clusters which have a large number of users.
> * i.e. What false-positive rate do you plan to choose as the default?
> 2. What do you think about rotating windows on produce-requests arrival
> instead of scheduler?
> * If we do rotation in scheduler threads, my concern is potential
> scheduler threads occupation which could make other background tasks to
> delay
> 3. Why the default producer.id.quota.window.size.seconds is 1 hour?
> * Unlike other quota types (1 second)
>
> Thanks,
>
> 2023年6月6日(火) 23:55 Omnia Ibrahim :
>
> > Hi everyone,
> > I want to start the discussion of the KIP-936 to throttle the number of
> > active PIDs per KafkaPrincipal. The proposal is here
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
> > >
> >
> > Thanks for your time and feedback.
> > Omnia
> >
>
>
> --
> 
> Okada Haruki
> ocadar...@gmail.com
> 
>


[DISCUSS] KIP-936: Throttle number of active PIDs

2023-06-06 Thread Omnia Ibrahim
Hi everyone,
I want to start the discussion of the KIP-936 to throttle the number of
active PIDs per KafkaPrincipal. The proposal is here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs


Thanks for your time and feedback.
Omnia


[jira] [Created] (KAFKA-15063) Throttle number of active PIDs

2023-06-06 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-15063:
-

 Summary: Throttle number of active PIDs
 Key: KAFKA-15063
 URL: https://issues.apache.org/jira/browse/KAFKA-15063
 Project: Kafka
  Issue Type: New Feature
  Components: core, producer 
Affects Versions: 3.4.0, 3.2.0, 3.0.0, 3.1.0, 2.8.0, 3.3
Reporter: Omnia Ibrahim


{color:#172b4d}Ticket to track to track KIP-936. Since KIP-679 
i{color:#172b4d}dempotent{color} {color:#172b4d}producers became the default in 
Kafka {color:#172b4d}as a result of this all producer instances will be 
assigned PID. The increase of number of PIDs stored in Kafka brokers by 
{color}{{ProducerStateManager}}{color:#172b4d} exposes the broker to OOM errors 
if it has a high number of producers, rogue or misconfigured client(s).{color} 
{color}{color}

{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}The
 broker is still exposed to OOM{color}{color}{color} even after KIP-854 
introduced a separated config to expire PID from transaction IDs if there is 
high number of PID before {color}{{producer.id.expiration.ms}}{color:#172b4d} 
is exceeded. 
{color}{color}{color}

As a result of this the broker will keep experincing OOM and become offline. 
The only way to recover from this is to increase the heap.  

 

{color:#172b4d}KIP-936 is proposing throttling number of PIDs per 
KafkaPrincipal {color}

{color:#172b4d}See 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs]
 {color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] solutions for broker OOM caused by many producer IDs

2023-06-06 Thread Omnia Ibrahim
bloom filter.
> And when new PID created for a userA, we update the map to add PID into
> the cache value (i.e. the bloom filter)
> When the window passed half of the time, we created another bloom filter,
> and this time, when new PID comes, we check if this new PID existed in
> previous bloom filter, if not, we add into the new bloom filter. And in the
> meantime, we track the "new created" count (filtered by previous bloom
> filter) for throttling the users.
> Is my understanding correct?
>
> 2. what will user get when they can't allocate new PID due to throttling?
> You might need to address in the KIP.
>
> 3. This config: producer.id.quota.window.num is unclear to me?
> "The number of samples to retain in memory for alter producer id quotas"
> What's the "samples" mean here? Do you mean we only track the top 11
> kafkaPrinciple usage each window?
>
> Finally, I think this KIP is good to have an official KIP discuss thread
> for community review.
> Thanks for the KIP!
>
> Luke
>
>
>
>
> On Mon, Jun 5, 2023 at 11:44 PM Omnia Ibrahim 
> wrote:
>
>> Hi Justine, Thanks for having a look
>> > One question I have is how will we handle a scenario where potentially
>> each new client has a new Kafka Principal? Is that simply not covered by
>> throttling?
>> if any new client setup a new principal they will be throttled based on
>> the throttling for `/config/users/` or
>> /config/users/
>>
>>
>> On Wed, May 31, 2023 at 6:50 PM Justine Olshan 
>> wrote:
>>
>>> Hey Omnia,
>>>
>>> I was doing a bit of snooping (I actually just get updates for the KIP
>>> page) and I saw this draft was in progress. I shared it with some of my
>>> colleagues as well who I previously discussed the issue with.
>>>
>>> The initial look was pretty promising to me. I appreciate the detailing
>>> of the rejected options since we had quite a few we worked through :)
>>>
>>> One question I have is how will we handle a scenario where potentially
>>> each new client has a new Kafka Principal? Is that simply not covered by
>>> throttling?
>>>
>>> Thanks,
>>> Justine
>>>
>>> On Wed, May 31, 2023 at 10:08 AM Omnia Ibrahim 
>>> wrote:
>>>
>>>> Hi Justine and Luke,
>>>>
>>>> I started a KIP draft here
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
>>>> for a proposal would appreciate it if you could provide any initial
>>>> feedback before opening a broader discussion.
>>>>
>>>> Thanks
>>>>
>>>> On Wed, Feb 22, 2023 at 4:35 PM Omnia Ibrahim 
>>>> wrote:
>>>>
>>>>>
>>>>> *Hi Justine, *
>>>>>
>>>>> *My initial thought of throttling the initProducerId was to get ripped
>>>>> off the problem at the source (which creates too many PIDs per client) and
>>>>> fail faster but if having this on the produce request level is easier this
>>>>> should be fine. I am guessing it will be the same direction as we may
>>>>> ClientQuotaManage for Produce throttling with a different quota window 
>>>>> than
>>>>> `quota.window.size.seconds `. *
>>>>>
>>>>> *If this is good as an initial solution I can put start a KIP and see
>>>>> what the wider community feels about this. *
>>>>>
>>>>> *Also, I noticed that at some point one of us hit "Replay" instead of
>>>>> "Replay to All" :)  So here are the previous conversations*
>>>>>
>>>>> *On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan >>>> > wrote:*
>>>>>
>>>>>> Hey Omnia,
>>>>>>
>>>>>> Thanks for the response. I think I understand your explanations here
>>>>>> with respect to principal and clientId usage.
>>>>>>
>>>>>> For the throttling -- handleInitProducerIdRequest will allocate the
>>>>>> ID to the producer, but we don't actually store it on the broker or
>>>>>> increment our metric until the first produce request for that producer is
>>>>>> sent (or sent again after previously expiring). Would you consider
>>>>>> throttling the produce request instead? It may be hard to get any metrics
>>>>>> from the transaction coordinator where the initProducerId request is
>

Re: [DISCUSS] solutions for broker OOM caused by many producer IDs

2023-06-05 Thread Omnia Ibrahim
Hi Justine, Thanks for having a look
> One question I have is how will we handle a scenario where potentially
each new client has a new Kafka Principal? Is that simply not covered by
throttling?
if any new client setup a new principal they will be throttled based on the
throttling for `/config/users/` or
/config/users/


On Wed, May 31, 2023 at 6:50 PM Justine Olshan  wrote:

> Hey Omnia,
>
> I was doing a bit of snooping (I actually just get updates for the KIP
> page) and I saw this draft was in progress. I shared it with some of my
> colleagues as well who I previously discussed the issue with.
>
> The initial look was pretty promising to me. I appreciate the detailing of
> the rejected options since we had quite a few we worked through :)
>
> One question I have is how will we handle a scenario where potentially
> each new client has a new Kafka Principal? Is that simply not covered by
> throttling?
>
> Thanks,
> Justine
>
> On Wed, May 31, 2023 at 10:08 AM Omnia Ibrahim 
> wrote:
>
>> Hi Justine and Luke,
>>
>> I started a KIP draft here
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
>> for a proposal would appreciate it if you could provide any initial
>> feedback before opening a broader discussion.
>>
>> Thanks
>>
>> On Wed, Feb 22, 2023 at 4:35 PM Omnia Ibrahim 
>> wrote:
>>
>>>
>>> *Hi Justine, *
>>>
>>> *My initial thought of throttling the initProducerId was to get ripped
>>> off the problem at the source (which creates too many PIDs per client) and
>>> fail faster but if having this on the produce request level is easier this
>>> should be fine. I am guessing it will be the same direction as we may
>>> ClientQuotaManage for Produce throttling with a different quota window than
>>> `quota.window.size.seconds `. *
>>>
>>> *If this is good as an initial solution I can put start a KIP and see
>>> what the wider community feels about this. *
>>>
>>> *Also, I noticed that at some point one of us hit "Replay" instead of
>>> "Replay to All" :)  So here are the previous conversations*
>>>
>>> *On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan >> > wrote:*
>>>
>>>> Hey Omnia,
>>>>
>>>> Thanks for the response. I think I understand your explanations here
>>>> with respect to principal and clientId usage.
>>>>
>>>> For the throttling -- handleInitProducerIdRequest will allocate the ID
>>>> to the producer, but we don't actually store it on the broker or increment
>>>> our metric until the first produce request for that producer is sent (or
>>>> sent again after previously expiring). Would you consider throttling the
>>>> produce request instead? It may be hard to get any metrics from the
>>>> transaction coordinator where the initProducerId request is handled.
>>>>
>>>> Justine
>>>
>>>
>>> *On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim >> > wrote:*
>>>
>>>> Hey Justine,
>>>> > If I understand your message correctly, there are issues with
>>>> identifying the source of the rogue clients? So you propose to add a new
>>>> metric for that?
>>>> > And also proposing to throttle based on clientId as a potential
>>>> follow up?
>>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId)
>>>> similarly to how we identify clients in Fetch/Produce/Request
>>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability
>>>> to throttle later based on principal which is most likely to be a smaller
>>>> set than clientIds. My initial thought was to add a metrics that represent
>>>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId)
>>>> similar to Fetch/Produce QuotaManagers.
>>>> Then as a follow-up, we can throttle based on either KafkaPrinciple or
>>>> clientId (maybe default as well to align this with other QuotaManagers in
>>>> Kafka).
>>>>
>>>> >1. Does we rely on the client using the same ID? What if there are
>>>> many clients that all use different client IDs?
>>>> This is why I want to use the combination of KafkaPrincipal or clientId
>>>> similar to some other quotas we have in Kafka already. This will be a
>>>> similar risk to Fetch/Produce quota in Kafka which also relay on the client
>>>> to use the same clie

Re: [DISCUSS] solutions for broker OOM caused by many producer IDs

2023-05-31 Thread Omnia Ibrahim
Hi Justine and Luke,

I started a KIP draft here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
for a proposal would appreciate it if you could provide any initial
feedback before opening a broader discussion.

Thanks

On Wed, Feb 22, 2023 at 4:35 PM Omnia Ibrahim 
wrote:

>
> *Hi Justine, *
>
> *My initial thought of throttling the initProducerId was to get ripped off
> the problem at the source (which creates too many PIDs per client) and fail
> faster but if having this on the produce request level is easier this
> should be fine. I am guessing it will be the same direction as we may
> ClientQuotaManage for Produce throttling with a different quota window than
> `quota.window.size.seconds `. *
>
> *If this is good as an initial solution I can put start a KIP and see what
> the wider community feels about this. *
>
> *Also, I noticed that at some point one of us hit "Replay" instead of
> "Replay to All" :)  So here are the previous conversations*
>
> *On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan  > wrote:*
>
>> Hey Omnia,
>>
>> Thanks for the response. I think I understand your explanations here with
>> respect to principal and clientId usage.
>>
>> For the throttling -- handleInitProducerIdRequest will allocate the ID to
>> the producer, but we don't actually store it on the broker or increment our
>> metric until the first produce request for that producer is sent (or sent
>> again after previously expiring). Would you consider throttling the produce
>> request instead? It may be hard to get any metrics from the transaction
>> coordinator where the initProducerId request is handled.
>>
>> Justine
>
>
> *On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim  > wrote:*
>
>> Hey Justine,
>> > If I understand your message correctly, there are issues with
>> identifying the source of the rogue clients? So you propose to add a new
>> metric for that?
>> > And also proposing to throttle based on clientId as a potential follow
>> up?
>> I want to identify rogue clients by KafkaPrincipal (and/or clientId)
>> similarly to how we identify clients in Fetch/Produce/Request
>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability
>> to throttle later based on principal which is most likely to be a smaller
>> set than clientIds. My initial thought was to add a metrics that represent
>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId)
>> similar to Fetch/Produce QuotaManagers.
>> Then as a follow-up, we can throttle based on either KafkaPrinciple or
>> clientId (maybe default as well to align this with other QuotaManagers in
>> Kafka).
>>
>> >1. Does we rely on the client using the same ID? What if there are many
>> clients that all use different client IDs?
>> This is why I want to use the combination of KafkaPrincipal or clientId
>> similar to some other quotas we have in Kafka already. This will be a
>> similar risk to Fetch/Produce quota in Kafka which also relay on the client
>> to use the same clientId and KafkaPrincipal.
>>
>> >2. Are there places where high cardinality of this metric is a concern?
>> I can imagine many client IDs in the system. Would we treat this as a rate
>> metric (ie, when we get an init producer ID and return a new producer ID we
>> emit a count for that client id?) Or something else?
>> My initial thought here was to follow the steps of ClientQuotaManager and
>> ClientRequestQuotaManager and use a rate metric. However, I think we can
>> emit it either
>>
>>1. when we return the new PID. However, I have concerns that we may
>>circle back to the previous concerns with OMM due to keeping track of
>>ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this
>>would be the first time Kafka throttle IDs for any client.
>>2. or once we recieve initProducerIDRequest and throttle before even
>>hitting `handleInitProducerIdRequest`. Going this direction we may need to
>>throttle it within a different quota window than `
>>quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request
>>per second wouldn't be efficient for most cases. I personally think this
>>direction is easier as it seems more consistent with the existing quota
>>implementation. Specially that Kafka has already the concept of throttling
>>subset of requests (in ControllerMutationQuotaManager) but never had any
>>concept of throttling active IDs for any client.
>>
>>
>> What do you think?
>>
>> Than

Re: [DISCUSS] solutions for broker OOM caused by many producer IDs

2023-02-22 Thread Omnia Ibrahim
*Hi Justine, *

*My initial thought of throttling the initProducerId was to get ripped off
the problem at the source (which creates too many PIDs per client) and fail
faster but if having this on the produce request level is easier this
should be fine. I am guessing it will be the same direction as we may
ClientQuotaManage for Produce throttling with a different quota window than
`quota.window.size.seconds `. *

*If this is good as an initial solution I can put start a KIP and see what
the wider community feels about this. *

*Also, I noticed that at some point one of us hit "Replay" instead of
"Replay to All" :)  So here are the previous conversations*

*On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan > wrote:*

> Hey Omnia,
>
> Thanks for the response. I think I understand your explanations here with
> respect to principal and clientId usage.
>
> For the throttling -- handleInitProducerIdRequest will allocate the ID to
> the producer, but we don't actually store it on the broker or increment our
> metric until the first produce request for that producer is sent (or sent
> again after previously expiring). Would you consider throttling the produce
> request instead? It may be hard to get any metrics from the transaction
> coordinator where the initProducerId request is handled.
>
> Justine


*On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim > wrote:*

> Hey Justine,
> > If I understand your message correctly, there are issues with
> identifying the source of the rogue clients? So you propose to add a new
> metric for that?
> > And also proposing to throttle based on clientId as a potential follow
> up?
> I want to identify rogue clients by KafkaPrincipal (and/or clientId)
> similarly to how we identify clients in Fetch/Produce/Request
> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability
> to throttle later based on principal which is most likely to be a smaller
> set than clientIds. My initial thought was to add a metrics that represent
> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId)
> similar to Fetch/Produce QuotaManagers.
> Then as a follow-up, we can throttle based on either KafkaPrinciple or
> clientId (maybe default as well to align this with other QuotaManagers in
> Kafka).
>
> >1. Does we rely on the client using the same ID? What if there are many
> clients that all use different client IDs?
> This is why I want to use the combination of KafkaPrincipal or clientId
> similar to some other quotas we have in Kafka already. This will be a
> similar risk to Fetch/Produce quota in Kafka which also relay on the client
> to use the same clientId and KafkaPrincipal.
>
> >2. Are there places where high cardinality of this metric is a concern? I
> can imagine many client IDs in the system. Would we treat this as a rate
> metric (ie, when we get an init producer ID and return a new producer ID we
> emit a count for that client id?) Or something else?
> My initial thought here was to follow the steps of ClientQuotaManager and
> ClientRequestQuotaManager and use a rate metric. However, I think we can
> emit it either
>
>1. when we return the new PID. However, I have concerns that we may
>circle back to the previous concerns with OMM due to keeping track of
>ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this
>would be the first time Kafka throttle IDs for any client.
>2. or once we recieve initProducerIDRequest and throttle before even
>hitting `handleInitProducerIdRequest`. Going this direction we may need to
>throttle it within a different quota window than `
>quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request per
>second wouldn't be efficient for most cases. I personally think this
>direction is easier as it seems more consistent with the existing quota
>implementation. Specially that Kafka has already the concept of throttling
>subset of requests (in ControllerMutationQuotaManager) but never had any
>concept of throttling active IDs for any client.
>
>
> What do you think?
>
> Thanks
> Omnia
>

*On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan > wrote:*

> Hey Omnia,
> Sorry for losing track of this.
>
> If I understand your message correctly, there are issues with identifying
> the source of the rogue clients? So you propose to add a new metric for
> that?
> And also proposing to throttle based on clientId as a potential follow up?
>
> I think both of these make sense. The only things I can think of for the
> metric are:
> 1. Does we rely on the client using the same ID? What if there are many
> clients that all use different client IDs?
> 2. Are there places where high cardinality of this metric i

Re: [DISCUSS] solutions for broker OOM caused by many producer IDs

2023-02-02 Thread Omnia Ibrahim
Hi Luke and Justine,
Are there any thoughts or updates on this? I would love to help with this
as we are hitting this more frequently now.

best,

On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim 
wrote:

> Hi Luke and Justine,
>
>> For (3), you said:
>> > - I have some concerns about the impact of this option on the
>> transactional
>> producers, for example, what will happen to an ongoing transaction
>> associated with an expired PID? Would this leave the transactions in a
>> "hanging" state?
>>
>> - How will we notify the client that the transaction can't continue due to
>> an expired PID?
>>
>> - If PID got marked as `expired` this will mean that
>> `admin.DescribeProducers` will not list them which will make
>> *`kafka-transactions.sh
>> --list`* a bit tricky as we can't identify if there are transactions
>> linked
>> to this expired PID or not. The same concern applies to
>> *`kafka-transactions.sh
>> --find-hanging`*.
>>
>> --> Yes, you're right. Those are also concerns for this solution.
>> Currently, there's no way to notify clients about the expiration.
>> Also, the ongoing transactions will be hanging. For the admin cli, we've
>> never thought about it. Good point.
>> In summary, to adopt this solution, there are many issues needed to get
>> fixed.
>>
>
> Justin already clarified that if PID is attached to a transaction it will
> not expire so identifying the transactions shouldn't be a concern anymore.
> The only concern here will be that this solution will not solve the
> problem if the rouge producer is a transactional producer with hanging
> transactions.
> If anyone faced this situation they will need to abort the hanging
> transactions manually and then the solution to expire a PID can then work.
>
> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id is
>> also
>> workable.
>> It's just these 2 attributes are not required.
>> That is, it's possible we take all clients as the same one: {default
>> KafkaPrinciple + default clientID}, and apply throttling on it.
>> Do you have any thoughts about it?
>> Maybe skip throttling for {default KafkaPrinciple + default clientID}
>>
>
> Throttling for default KafkaPrinciple and default ClientID is useful when
> we need to have a hard limit on the whole cluster and whoever is running
> the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is reused
> between different producer applications.
> I usually find it helpful to have a way to apply throttling only on the
> rough clients only once I identify them without punishing everyone on the
> cluster. However, there are two problems with this
> - There's no easy way at the moment to link PIDs to clientId or
> KafkaPrinciple. This need to be addressed first.
> - Is Justin's comment on the throttling, and the fact that will mean we
> either block all requests or have to store the request in memory which in
> both cases has side downs on the producer experince.
>
> I recently had another discussion with my team and it does seem like there
>> should be a way to make it more clear to the clients what is going on. A
>> lot of this protocol is implicit. I'm wondering if maybe there is a way to
>> improve the story for newer clients. (Ie if we choose to expire based on a
>> size limit, we should include a response indicating the ID has expired.)
>> We
>> also discussed ways to redefine the guarantees so that users who have
>> stronger idempotency requirements can ensure them (over
>> availability/memory
>> concerns). Let me know if you have any ideas here.
>>
>
> It may be easier to improve the experience for new clients. However, if we
> improved only the new clients we may need a way to help teams who run Kafka
> with rough clients on old versions by at least giving them an easy way to
> identify the clientId/ or KafkaPrinciple that generated these PIDs.
>
> For context, it's very tricky to even identify which clientId is creating
> all these PIDs that caused OOM, which is a contributing part of the issue
> at the moment. So maybe one option here could be adding a new metric that
> tracks the number of generated PIDs per clientId. This will help the team
> who runs the Kafka cluster to
> - contact these rough clients and ask them to fix their clients or upgrade
> to a new client if the new client version has a better experience.
> - or if ended with a throttling solution this may help identify which
> clientId needs to be throttled.
>
> Maybe we can start with a solution for identifying the rough clients first

[jira] [Created] (KAFKA-14616) Topic recreation with offline broker causes permanent URPs

2023-01-12 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-14616:
-

 Summary: Topic recreation with offline broker causes permanent URPs
 Key: KAFKA-14616
 URL: https://issues.apache.org/jira/browse/KAFKA-14616
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.3.1
Reporter: Omnia Ibrahim


We are facing an odd situation when we delete and recreate a topic while broker 
is offline in KRAFT mode. 
Here’s what we saw step by step
 # Created topic {{foo.test}} with 10 partitions and 4 replicas — Topic 
{{foo.test}} was created with topic ID {{MfuZbwdmSMaiSa0g6__TPg}}
 # Took broker 4 offline — which held replicas for partitions __ {{0, 3, 4, 5, 
7, 8, 9}}
 # Deleted topic {{foo.test}} — The deletion process was successful, despite 
the fact that broker 4 still held replicas for partitions {{0, 3, 4, 5, 7, 8, 
9}} on local disk.
 # Recreated topic {{foo.test}} with 10 partitions and 4 replicas. — Topic 
{{foo.test}} was created with topic ID {{RzalpqQ9Q7ub2M2afHxY4Q}} and 
partitions {{0, 1, 2, 7, 8, 9}} got assigned to broker 4 (which was still 
offline). Notice here that partitions {{0, 7, 8, 9}} are common between the 
assignment of the deleted topic ({{{}topic_id: MfuZbwdmSMaiSa0g6__TPg{}}}) and 
the recreated topic {{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}}).

 # Brough broker 4 back online.
 # Broker started to create new partition replicas for the recreated topic 
{{foo.test}} ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}})
 # The broker hit the following error {{Tried to assign topic ID 
RzalpqQ9Q7ub2M2afHxY4Q to log for topic partition foo.test-9,but log already 
contained topic ID MfuZbwdmSMaiSa0g6__TPg}} . As a result of this error the 
broker decided to rename log dir for partitions {{0, 3, 4, 5, 7, 8, 9}} to 
{{{}-.-delete{}}}.
 # Ran {{ls }}

{code:java}
foo.test-0.658f87fb9a2e42a590b5d7dcc28862b5-delete/
foo.test-1/
foo.test-2/
foo.test-3.a68f05d05bcc4e579087551b539af311-delete/
foo.test-4.79ce30a5310d4950ad1b28f226f74895-delete/
foo.test-5.76ed04da75bf46c3a63342be1eb44450-delete/
foo.test-6/
foo.test-7.c2d33db3bf844e9ebbcd9ef22f5270da-delete/
foo.test-8.33836969ac714b41b69b5334a5068ce0-delete/
foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/{code}
 # Waited until the deletion of the old topic was done and ran {{ls 
}} again, now we were expecting to see log dir for partitions 
{{0, 1, 2, 7, 8, 9}} however the result is:

{code:java}
foo.test-1/
foo.test-2/
foo.test-6/{code}
 # Ran {{kafka-topics.sh --command-config cmd.properties --bootstrap-server 
 --describe --topic foo.test}}

{code:java}
Topic: foo.test TopicId: RzalpqQ9Q7ub2M2afHxY4Q PartitionCount: 10 
ReplicationFactor: 4 Configs: 
min.insync.replicas=2,segment.bytes=1073741824,max.message.bytes=3145728,unclean.leader.election.enable=false,retention.bytes=10
Topic: foo.test Partition: 0 Leader: 2 Replicas: 2,3,4,5 Isr: 2,3,5
Topic: foo.test Partition: 1 Leader: 3 Replicas: 3,4,5,6 Isr: 3,5,6,4
Topic: foo.test Partition: 2 Leader: 5 Replicas: 5,4,6,1 Isr: 5,6,1,4
Topic: foo.test Partition: 3 Leader: 5 Replicas: 5,6,1,2 Isr: 5,6,1,2
Topic: foo.test Partition: 4 Leader: 6 Replicas: 6,1,2,3 Isr: 6,1,2,3
Topic: foo.test Partition: 5 Leader: 1 Replicas: 1,6,2,5 Isr: 1,6,2,5
Topic: foo.test Partition: 6 Leader: 6 Replicas: 6,2,5,4 Isr: 6,2,5,4
Topic: foo.test Partition: 7 Leader: 2 Replicas: 2,5,4,3 Isr: 2,5,3
Topic: foo.test Partition: 8 Leader: 5 Replicas: 5,4,3,1 Isr: 5,3,1
Topic: foo.test Partition: 9 Leader: 3 Replicas: 3,4,1,6 Isr: 3,1,6{code}
Here’s a sample of broker logs

 
{code:java}
{"timestamp":"2023-01-11T15:19:53,620Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
 log for partition foo.test-9 in 
/kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete.","logger":"kafka.log.LogManager"}
{"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
 time index 
/kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/.timeindex.deleted.","logger":"kafka.log.LogSegment"}
{"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
 offset index 
/kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/.index.deleted.","logger":"kafka.log.LogSegment"}
{"timestamp":"2023-01-11T15:19:53,615Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
 log 
/kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/.log.deleted.","logger&qu

Re: [DISCUSS] solutions for broker OOM caused by many producer IDs

2022-10-31 Thread Omnia Ibrahim
saction is ongoing and do not expire the
> producer ID if it has an ongoing transaction. I suspect we will continue to
> do this with any solution we pick.
>
> My team members and I looked a bit into the throttling case and it can get
> a bit tricky since it means we need to throttle the produce request before
> it is processed. This means we either block all requests or have to store
> the request in memory (which is not great if we are trying to save memory).
>
> I recently had another discussion with my team and it does seem like there
> should be a way to make it more clear to the clients what is going on. A
> lot of this protocol is implicit. I'm wondering if maybe there is a way to
> improve the story for newer clients. (Ie if we choose to expire based on a
> size limit, we should include a response indicating the ID has expired.) We
> also discussed ways to redefine the guarantees so that users who have
> stronger idempotency requirements can ensure them (over availability/memory
> concerns). Let me know if you have any ideas here.
>
> Thanks again for commenting here, hopefully we can come to a good solution.
>
> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen  wrote:
>
> > Hi Omnia,
> >
> > Thanks for your reply.
> >
> > For (3), you said:
> > > - I have some concerns about the impact of this option on the
> > transactional
> > producers, for example, what will happen to an ongoing transaction
> > associated with an expired PID? Would this leave the transactions in a
> > "hanging" state?
> >
> > - How will we notify the client that the transaction can't continue due
> to
> > an expired PID?
> >
> > - If PID got marked as `expired` this will mean that
> > `admin.DescribeProducers` will not list them which will make
> > *`kafka-transactions.sh
> > --list`* a bit tricky as we can't identify if there are transactions
> linked
> > to this expired PID or not. The same concern applies to
> > *`kafka-transactions.sh
> > --find-hanging`*.
> >
> > --> Yes, you're right. Those are also concerns for this solution.
> > Currently, there's no way to notify clients about the expiration.
> > Also, the ongoing transactions will be hanging. For the admin cli, we've
> > never thought about it. Good point.
> > In summary, to adopt this solution, there are many issues needed to get
> > fixed.
> >
> >
> > For (5), you said:
> > > I am assuming you mean KafkaPrincipal here! If so is your concern here
> > that
> > those good clients that use the same principal as a rogue one will get
> > throttled?
> >
> > If this is the case, then I believe it should be okay as other throttling
> > in Kafka on *`/config/users/`* has the same behaviour.
> >
> >
> > What about applying limit/throttling to
> > *`/config/users//clients/`
> > *similar to what we have with client quota? This should reduce the
> concern
> > about throttling good clients, right?
> >
> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
> > Yes, We were thinking about throttling by KafkaPrinciple. Client Id is
> > also workable.
> > It's just these 2 attributes are not required.
> > That is, it's possible we take all clients as the same one: {default
> > KafkaPrinciple + default clientID}, and apply throttling on it.
> > Do you have any thoughts about it?
> > Maybe skip throttling for {default KafkaPrinciple + default clientID} ?
> >
> > Luke
> >
> >
> >
> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim 
> > wrote:
> >
> >> Hi Luke & Justine,
> >> Thanks for looking into this issue, we have been experiencing this
> because
> >> of rouge clients as well.
> >>
> >> > 3. Having a limit to the number of active producer IDs (sort of like
> an
> >> LRU
> >> >cache)
> >> >-> The idea here is that if we hit a misconfigured client, we will
> expire
> >> >the older entries. The concern here is we have risks to lose
> idempotency
> >> >guarantees, and currently, we don't have a way to notify clients about
> >> >losing idempotency guarantees. Besides, the least  recently used
> entries
> >> >got removed are not always from the "bad" clients.
> >>
> >> - I have some concerns about the impact of this option on the
> >> transactional
> >> producers, for example, what will happen to an ongoing transaction
> >> associated with an expired PID? Would this leave the transactions in a
> >> "hangin

[jira] [Created] (KAFKA-14344) Build EmbeddedKafkaCluster with common configs used for all clients

2022-10-31 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-14344:
-

 Summary: Build EmbeddedKafkaCluster with common configs used for 
all clients
 Key: KAFKA-14344
 URL: https://issues.apache.org/jira/browse/KAFKA-14344
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect, unit tests
Reporter: Omnia Ibrahim
Assignee: Omnia Ibrahim


Connect and MirrorMaker's integration test use `EmbeddedKafkaCluster` and 
`EmbeddedConnectCluster` to setup connect cluster during testing. Both classes 
are easy to setup if the test needs vanilla clusters, however, it's a lot of 
work to make it set it up with more advanced config (for example authentication 
and authorization) where admin, consumer and producer clients need more 
configuration. 

1. I am proposing adding extra parameter `additionalClientConfigs` to 
`EmbeddedKafkaCluster` constructor. The new parameter will be used
 - Setup Producer Client in `EmbeddedKafkaCluster.doStart` which is 
initializing `producer` client that is used in `EmbeddedKafkaCluster.produce`

 - Setup Producer Client in `EmbeddedKafkaCluster.createProducer` used in 
`EmbeddedKafkaCluster.transactionalProducer`

 - Setup Admin Client in `EmbeddedKafkaCluster.createAdminClient` used in 
`EmbeddedKafkaCluster.createTopic`, `EmbeddedKafkaCluster.consumeAll`, 
`EmbeddedKafkaCluster.describeTopics` and `EmbeddedKafkaCluster.deleteTopic`

 - Setup Consumer Client in `EmbeddedKafkaCluster.createConsumer` used in 
`EmbeddedKafkaCluster.createConsumerAndSubscribeTo` and 
`EmbeddedKafkaCluster.consumeAll`

2. And add `EmbeddedConnectCluster.Builder.additionalKafkaClusterClientConfigs`.

 

Tests impacted by this 
- MirrorMaker integration tests
- `org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] solutions for broker OOM caused by many producer IDs

2022-10-14 Thread Omnia Ibrahim
Hi Luke & Justine,
Thanks for looking into this issue, we have been experiencing this because
of rouge clients as well.

> 3. Having a limit to the number of active producer IDs (sort of like an
LRU
>cache)
>-> The idea here is that if we hit a misconfigured client, we will expire
>the older entries. The concern here is we have risks to lose idempotency
>guarantees, and currently, we don't have a way to notify clients about
>losing idempotency guarantees. Besides, the least  recently used entries
>got removed are not always from the "bad" clients.

- I have some concerns about the impact of this option on the transactional
producers, for example, what will happen to an ongoing transaction
associated with an expired PID? Would this leave the transactions in a
"hanging" state?

- How will we notify the client that the transaction can't continue due to
an expired PID?

- If PID got marked as `expired` this will mean that
`admin.DescribeProducers` will not list them which will make
*`kafka-transactions.sh
--list`* a bit tricky as we can't identify if there are transactions linked
to this expired PID or not. The same concern applies to *`kafka-transactions.sh
--find-hanging`*.


>5. limit/throttling the producer id based on the principle
>-> Although we can limit the impact to a certain principle with this idea,
>same concern still exists as solution #1 #2.

I am assuming you mean KafkaPrincipal here! If so is your concern here that
those good clients that use the same principal as a rogue one will get
throttled?

If this is the case, then I believe it should be okay as other throttling
in Kafka on *`/config/users/`* has the same behaviour.


What about applying limit/throttling to
*`/config/users//clients/`
*similar to what we have with client quota? This should reduce the concern
about throttling good clients, right?

best,

Omnia

On Tue, Oct 11, 2022 at 4:18 AM Luke Chen  wrote:

> Bump this thread to see if there are any comments/thoughts.
> Thanks.
>
> Luke
>
> On Mon, Sep 26, 2022 at 11:06 AM Luke Chen  wrote:
>
> > Hi devs,
> >
> > As stated in the motivation section in KIP-854
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry
> >:
> >
> > With idempotent producers becoming the default in Kafka, this means that
> > unless otherwise specified, all new producers will be given producer IDs.
> > Some (inefficient) applications may now create many non-transactional
> > idempotent producers. Each of these producers will be assigned a producer
> > ID and these IDs and their metadata are stored in the broker memory,
> which
> > might cause brokers out of memory.
> >
> > Justine (in cc.) and I and some other team members are working on the
> > solutions for this issue. But none of them solves it completely without
> > side effects. Among them, "availability" VS "idempotency guarantees" is
> > what we can't decide which to sacrifice. Some of these solutions
> sacrifice
> > availability of produce (1,2,5) and others sacrifice idempotency
> guarantees
> > (3). It could be useful to know if people generally have a preference one
> > way or the other. Or what other better solutions there might be.
> >
> > Here are the proposals we came up with:
> >
> > 1. Limit the total active producer ID allocation number.
> > -> This is the simplest solution. But since the OOM issue is usually
> > caused by a rogue or misconfigured client, and this solution might
> "punish"
> > the good client from sending messages.
> >
> > 2. Throttling the producer ID allocation rate
> > -> Same concern as the solution #1.
> >
> > 3. Having a limit to the number of active producer IDs (sort of like an
> > LRU cache)
> > -> The idea here is that if we hit a misconfigured client, we will expire
> > the older entries. The concern here is we have risks to lose idempotency
> > guarantees, and currently, we don't have a way to notify clients about
> > losing idempotency guarantees. Besides, the least  recently used entries
> > got removed are not always from the "bad" clients.
> >
> > 4. allow clients to "close" the producer ID usage
> > -> We can provide a way for producer to "close" producerID usage.
> > Currently, we only have a way to INIT_PRODUCER_ID requested to allocate
> > one. After that, we'll keep the producer ID metadata in broker even if
> the
> > producer is "closed". Having a closed API (ex: END_PRODUCER_ID), we can
> > remove the entry from broker side. In client side, we can send it when
> > producer closing. The concern is, the old clients (including non-java
> > clients) will still suffer from the OOM issue.
> >
> > 5. limit/throttling the producer id based on the principle
> > -> Although we can limit the impact to a certain principle with this
> idea,
> > same concern still exists as solution #1 #2.
> >
> > Any thoughts/feedback are welcomed.
> >
> > Thank you.
> > Luke
> >
>


Re: [Vote] KIP-787 - MM2 Interface to manage Kafka resources

2022-07-07 Thread Omnia Ibrahim
Close the voting with 3 +1 binding and +1 non-binding.

Thanks, everyone

On Wed, Jul 6, 2022 at 8:16 PM David Jacot 
wrote:

> Thanks for the KIP, Omnia! +1 (binding)
>
> On Wed, Jul 6, 2022 at 5:02 PM Omnia Ibrahim 
> wrote:
> >
> > Hi,
> > Can we have one last binding vote for this KIP, please?
> >
> > Omnia
> >
> > On Tue, Jun 28, 2022 at 3:36 PM Omnia Ibrahim 
> > wrote:
> >
> > > Thanks, Tom, I have updated the KIP to reflect these minor points.
> > >
> > > On Tue, Jun 28, 2022 at 10:58 AM Tom Bentley 
> wrote:
> > >
> > >> Hi again Omnia,
> > >>
> > >> I left a couple of really minor points in the discussion thread.
> Assuming
> > >> you're happy with those I am +1 (binding).
> > >>
> > >> Thanks for your patience on this. Kind regards,
> > >>
> > >> Tom
> > >>
> > >> On Tue, 28 Jun 2022 at 10:14, Omnia Ibrahim 
> > >> wrote:
> > >>
> > >> > Hi,
> > >> > I did a small change to the KIP interface to address some
> discussions on
> > >> > the KIP. If all is good now can I get more votes on this, please?
> > >> >
> > >> > Thanks
> > >> > Omnia
> > >> >
> > >> > On Tue, Jun 21, 2022 at 10:34 AM Omnia Ibrahim <
> o.g.h.ibra...@gmail.com
> > >> >
> > >> > wrote:
> > >> >
> > >> > > Hi,
> > >> > > Can I get more votes on this, please?
> > >> > >
> > >> > > Thanks
> > >> > >
> > >> > > On Sun, Jun 12, 2022 at 2:40 PM Federico Valeri <
> fedeval...@gmail.com
> > >> >
> > >> > > wrote:
> > >> > >
> > >> > >> Hi Omnia, this will be really useful, especially in cloud
> > >> environment.
> > >> > >>
> > >> > >> +1 non binding
> > >> > >>
> > >> > >> Thanks
> > >> > >> Fede
> > >> > >>
> > >> > >> On Tue, May 31, 2022 at 5:28 PM Mickael Maison <
> > >> > mickael.mai...@gmail.com>
> > >> > >> wrote:
> > >> > >> >
> > >> > >> > Hi Omnia,
> > >> > >> >
> > >> > >> > I think the approach you settled on is the best option, this
> will
> > >> > >> > allow integrating MirrorMaker in more environments.
> > >> > >> >
> > >> > >> > +1 binding
> > >> > >> >
> > >> > >> > Thanks for the KIP (and your persistence!)
> > >> > >> > Mickael
> > >> > >> >
> > >> > >> > On Mon, May 30, 2022 at 12:09 PM Omnia Ibrahim <
> > >> > o.g.h.ibra...@gmail.com>
> > >> > >> wrote:
> > >> > >> > >
> > >> > >> > > Hi,
> > >> > >> > > Can I get a vote on this, please?
> > >> > >> > > Thanks
> > >> > >> > >
> > >> > >> > > On Wed, May 25, 2022 at 11:15 PM Omnia Ibrahim <
> > >> > >> o.g.h.ibra...@gmail.com>
> > >> > >> > > wrote:
> > >> > >> > >
> > >> > >> > > > Hi,
> > >> > >> > > > I'd like to start a vote on KIP-787
> > >> > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-787
> > >> > >> > > > %3A+MM2+Interface+to+manage+Kafka+resources
> > >> > >> > > > <
> > >> > >>
> > >> >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
> > >> > >> >
> > >> > >> > > >
> > >> > >> > > > Thanks
> > >> > >> > > > Omnia
> > >> > >> > > >
> > >> > >>
> > >> > >
> > >> >
> > >>
> > >
>


Re: [Vote] KIP-787 - MM2 Interface to manage Kafka resources

2022-07-06 Thread Omnia Ibrahim
Hi,
Can we have one last binding vote for this KIP, please?

Omnia

On Tue, Jun 28, 2022 at 3:36 PM Omnia Ibrahim 
wrote:

> Thanks, Tom, I have updated the KIP to reflect these minor points.
>
> On Tue, Jun 28, 2022 at 10:58 AM Tom Bentley  wrote:
>
>> Hi again Omnia,
>>
>> I left a couple of really minor points in the discussion thread. Assuming
>> you're happy with those I am +1 (binding).
>>
>> Thanks for your patience on this. Kind regards,
>>
>> Tom
>>
>> On Tue, 28 Jun 2022 at 10:14, Omnia Ibrahim 
>> wrote:
>>
>> > Hi,
>> > I did a small change to the KIP interface to address some discussions on
>> > the KIP. If all is good now can I get more votes on this, please?
>> >
>> > Thanks
>> > Omnia
>> >
>> > On Tue, Jun 21, 2022 at 10:34 AM Omnia Ibrahim > >
>> > wrote:
>> >
>> > > Hi,
>> > > Can I get more votes on this, please?
>> > >
>> > > Thanks
>> > >
>> > > On Sun, Jun 12, 2022 at 2:40 PM Federico Valeri > >
>> > > wrote:
>> > >
>> > >> Hi Omnia, this will be really useful, especially in cloud
>> environment.
>> > >>
>> > >> +1 non binding
>> > >>
>> > >> Thanks
>> > >> Fede
>> > >>
>> > >> On Tue, May 31, 2022 at 5:28 PM Mickael Maison <
>> > mickael.mai...@gmail.com>
>> > >> wrote:
>> > >> >
>> > >> > Hi Omnia,
>> > >> >
>> > >> > I think the approach you settled on is the best option, this will
>> > >> > allow integrating MirrorMaker in more environments.
>> > >> >
>> > >> > +1 binding
>> > >> >
>> > >> > Thanks for the KIP (and your persistence!)
>> > >> > Mickael
>> > >> >
>> > >> > On Mon, May 30, 2022 at 12:09 PM Omnia Ibrahim <
>> > o.g.h.ibra...@gmail.com>
>> > >> wrote:
>> > >> > >
>> > >> > > Hi,
>> > >> > > Can I get a vote on this, please?
>> > >> > > Thanks
>> > >> > >
>> > >> > > On Wed, May 25, 2022 at 11:15 PM Omnia Ibrahim <
>> > >> o.g.h.ibra...@gmail.com>
>> > >> > > wrote:
>> > >> > >
>> > >> > > > Hi,
>> > >> > > > I'd like to start a vote on KIP-787
>> > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-787
>> > >> > > > %3A+MM2+Interface+to+manage+Kafka+resources
>> > >> > > > <
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
>> > >> >
>> > >> > > >
>> > >> > > > Thanks
>> > >> > > > Omnia
>> > >> > > >
>> > >>
>> > >
>> >
>>
>


Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-06-28 Thread Omnia Ibrahim
Hi Tom,
I added Configuration Properties sections that document the properties and
a couple of examples.

On Tue, Jun 28, 2022 at 10:57 AM Tom Bentley  wrote:

> Hi Omnia,
>
> Please could you add to the KIP the documentation that forward.admin.class
> config will have, and include that the signature of the class named by
> forward.admin.class must have a constructor that takes a `Map Object>`?
>
> Many thanks,
>
> Tom
>
> On Wed, 22 Jun 2022 at 11:30,  wrote:
>
> > Hi Tom
> > My initial thought was that the constructor with config and delegator
> > would be for testing while the one with config is the one that will be
> used
> > by MM2.
> > I can removed one of them and keep only one.
> >
> > Sent from my iPhone
> >
> > > On 22 Jun 2022, at 08:15, Tom Bentley  wrote:
> > >
> > > Hi Omnia,
> > >
> > > Thanks for those answers. I'm a bit confused by the changes you've made
> > for
> > > 1 though. It's MM2 that's going to instantiate the class named in
> > > forwarding.admin.class, so it's MM2 that needs to know the constructor
> > > signature. The easiest way of doing this is to specify the signature as
> > > part of the contract for forwarding.admin.class. But ForwardingAdmin
> now
> > > has two constructors, which is confusing for someone who wants to
> > subclass
> > > it. Surely only one is needed? It would also be a good idea to show the
> > > documentation that forward.admin.class will have.
> > >
> > > Kind regards,
> > >
> > > Tom
> > >
> > >> On Tue, 21 Jun 2022 at 17:06, Omnia Ibrahim 
> > wrote:
> > >>
> > >> Hi Tom
> > >>
> > >>> 1. I assume there's an undocumented requirement in the KIP that
> > whatever
> > >>> class is named for forwarding.admin.class it has a public single
> > argument
> > >>> constructor that takes an Admin instance?
> > >>>
> > >> you are right, I updated the KIP to reflect the missing one.
> > >> 3. What if the implementation needs to distinguish between creating
> > ACLs on
> > >> the source cluster and creating them on the destination cluster? E.g.
> > the
> > >> one should be done one way, but the other using a different mechanism?
> > >> Users will need to define 2 implementations, one for source and
> another
> > for
> > >> target and configure each using
> .forwarding.admin.class.
> > for
> > >> example
> > >> - target.forwarding.admin.class = TargetAdmin
> > >> - source.forwarding.admin.class = SourceAdmin
> > >>
> > >>> On Tue, Jun 21, 2022 at 2:14 PM Tom Bentley 
> > wrote:
> > >>>
> > >>> Hi Omnia,
> > >>>
> > >>> Thanks for the KIP! I'm sorry for the delay in this response. I have
> a
> > >> few
> > >>> questions:
> > >>>
> > >>> 1. I assume there's an undocumented requirement in the KIP that
> > whatever
> > >>> class is named for forwarding.admin.class it has a public single
> > argument
> > >>> constructor that takes an Admin instance?
> > >>> 2. If 1 is correct then what about an implementation that requires
> > extra
> > >>> configuration, e.g. for whatever infra-as-code API it needs to use
> > >> (instead
> > >>> of using an Admin instance directly) how does it learn about that
> > >> non-Kafka
> > >>> config when it's only receiving an Admin instance?
> > >>> 3. What if the implementation needs to distinguish between creating
> > ACLs
> > >> on
> > >>> the source cluster and creating them on the destination cluster? E.g.
> > the
> > >>> one should be done one way, but the other using a different
> mechanism?
> > >>>
> > >>> Kind regards,
> > >>>
> > >>> Tom
> > >>>
> > >>>
> > >>> On Mon, 20 Jun 2022 at 17:13, Omnia Ibrahim  >
> > >>> wrote:
> > >>>
> > >>>> Hi Chris, sorry for the late reply.
> > >>>> ..Hi,
> > >>>>
> > >>>>> 1. I might be missing something, but can you give a concrete Java
> > >>> example
> > >>>>> of how the proposed ForwardingAdmin class is more convenient than
> > >>>>> subclassing the KafkaAdmi

Re: [Vote] KIP-787 - MM2 Interface to manage Kafka resources

2022-06-28 Thread Omnia Ibrahim
Thanks, Tom, I have updated the KIP to reflect these minor points.

On Tue, Jun 28, 2022 at 10:58 AM Tom Bentley  wrote:

> Hi again Omnia,
>
> I left a couple of really minor points in the discussion thread. Assuming
> you're happy with those I am +1 (binding).
>
> Thanks for your patience on this. Kind regards,
>
> Tom
>
> On Tue, 28 Jun 2022 at 10:14, Omnia Ibrahim 
> wrote:
>
> > Hi,
> > I did a small change to the KIP interface to address some discussions on
> > the KIP. If all is good now can I get more votes on this, please?
> >
> > Thanks
> > Omnia
> >
> > On Tue, Jun 21, 2022 at 10:34 AM Omnia Ibrahim 
> > wrote:
> >
> > > Hi,
> > > Can I get more votes on this, please?
> > >
> > > Thanks
> > >
> > > On Sun, Jun 12, 2022 at 2:40 PM Federico Valeri 
> > > wrote:
> > >
> > >> Hi Omnia, this will be really useful, especially in cloud environment.
> > >>
> > >> +1 non binding
> > >>
> > >> Thanks
> > >> Fede
> > >>
> > >> On Tue, May 31, 2022 at 5:28 PM Mickael Maison <
> > mickael.mai...@gmail.com>
> > >> wrote:
> > >> >
> > >> > Hi Omnia,
> > >> >
> > >> > I think the approach you settled on is the best option, this will
> > >> > allow integrating MirrorMaker in more environments.
> > >> >
> > >> > +1 binding
> > >> >
> > >> > Thanks for the KIP (and your persistence!)
> > >> > Mickael
> > >> >
> > >> > On Mon, May 30, 2022 at 12:09 PM Omnia Ibrahim <
> > o.g.h.ibra...@gmail.com>
> > >> wrote:
> > >> > >
> > >> > > Hi,
> > >> > > Can I get a vote on this, please?
> > >> > > Thanks
> > >> > >
> > >> > > On Wed, May 25, 2022 at 11:15 PM Omnia Ibrahim <
> > >> o.g.h.ibra...@gmail.com>
> > >> > > wrote:
> > >> > >
> > >> > > > Hi,
> > >> > > > I'd like to start a vote on KIP-787
> > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-787
> > >> > > > %3A+MM2+Interface+to+manage+Kafka+resources
> > >> > > > <
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
> > >> >
> > >> > > >
> > >> > > > Thanks
> > >> > > > Omnia
> > >> > > >
> > >>
> > >
> >
>


Re: [Vote] KIP-787 - MM2 Interface to manage Kafka resources

2022-06-28 Thread Omnia Ibrahim
Hi,
I did a small change to the KIP interface to address some discussions on
the KIP. If all is good now can I get more votes on this, please?

Thanks
Omnia

On Tue, Jun 21, 2022 at 10:34 AM Omnia Ibrahim 
wrote:

> Hi,
> Can I get more votes on this, please?
>
> Thanks
>
> On Sun, Jun 12, 2022 at 2:40 PM Federico Valeri 
> wrote:
>
>> Hi Omnia, this will be really useful, especially in cloud environment.
>>
>> +1 non binding
>>
>> Thanks
>> Fede
>>
>> On Tue, May 31, 2022 at 5:28 PM Mickael Maison 
>> wrote:
>> >
>> > Hi Omnia,
>> >
>> > I think the approach you settled on is the best option, this will
>> > allow integrating MirrorMaker in more environments.
>> >
>> > +1 binding
>> >
>> > Thanks for the KIP (and your persistence!)
>> > Mickael
>> >
>> > On Mon, May 30, 2022 at 12:09 PM Omnia Ibrahim 
>> wrote:
>> > >
>> > > Hi,
>> > > Can I get a vote on this, please?
>> > > Thanks
>> > >
>> > > On Wed, May 25, 2022 at 11:15 PM Omnia Ibrahim <
>> o.g.h.ibra...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi,
>> > > > I'd like to start a vote on KIP-787
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-787
>> > > > %3A+MM2+Interface+to+manage+Kafka+resources
>> > > > <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
>> >
>> > > >
>> > > > Thanks
>> > > > Omnia
>> > > >
>>
>


Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-06-21 Thread Omnia Ibrahim
Hi Tom

> 1. I assume there's an undocumented requirement in the KIP that whatever
> class is named for forwarding.admin.class it has a public single argument
> constructor that takes an Admin instance?
>
you are right, I updated the KIP to reflect the missing one.
3. What if the implementation needs to distinguish between creating ACLs on
the source cluster and creating them on the destination cluster? E.g. the
one should be done one way, but the other using a different mechanism?
Users will need to define 2 implementations, one for source and another for
target and configure each using .forwarding.admin.class. for
example
- target.forwarding.admin.class = TargetAdmin
- source.forwarding.admin.class = SourceAdmin

On Tue, Jun 21, 2022 at 2:14 PM Tom Bentley  wrote:

> Hi Omnia,
>
> Thanks for the KIP! I'm sorry for the delay in this response. I have a few
> questions:
>
> 1. I assume there's an undocumented requirement in the KIP that whatever
> class is named for forwarding.admin.class it has a public single argument
> constructor that takes an Admin instance?
> 2. If 1 is correct then what about an implementation that requires extra
> configuration, e.g. for whatever infra-as-code API it needs to use (instead
> of using an Admin instance directly) how does it learn about that non-Kafka
> config when it's only receiving an Admin instance?
> 3. What if the implementation needs to distinguish between creating ACLs on
> the source cluster and creating them on the destination cluster? E.g. the
> one should be done one way, but the other using a different mechanism?
>
> Kind regards,
>
> Tom
>
>
> On Mon, 20 Jun 2022 at 17:13, Omnia Ibrahim 
> wrote:
>
> > Hi Chris, sorry for the late reply.
> > ..Hi,
> >
> > > 1. I might be missing something, but can you give a concrete Java
> example
> > > of how the proposed ForwardingAdmin class is more convenient than
> > > subclassing the KafkaAdminClient class? AFAICT the two would be
> virtually
> > > identical.
> >
> > I might be misunderstanding exactly how that class will be used;
> > > I'm envisioning it as the pluggable class that users will implement for
> > > custom administration logic and specify as the value for the
> > > "forwarding.admin.class" (or ".forwarding.admin.class")
> > property,
> > > and that it will be instantiated with a KafkaAdminClient instance that
> > can
> > > be used to get the same logic that MM2 provides today. In the case you
> > > mentioned (KafkaAdminClient for read/describe, custom Admin for
> > > create/update), I'd imagine one could override the createTopics,
> > > deleteTopics, createAcls, deleteAcls (maybe?), alterConfigs (maybe?),
> > etc.
> > > methods, and then leave other methods such as listTopics,
> describeTopics,
> > > describeCluster, etc. as they are.
> > >
> >  The Forwarding decorator is one alternative for inheritance so you are
> > right to say that they look identical. However, I would like to point out
> > few points
> > 1. that Kafka codebase has few wrappers or decorators around AdminClient
> > instead of inheritance so using decorator over inheritance isn't new
> > proposal for example
> >
> >- -
> `org.apache.kafka.streams.processor.internals.InternalTopicManager`
> >which has AdminClient as parameter instead of inheatance
> >- - `org.apache.kafka.connect.util.SharedTopicAdmin` and
> >`org.apache.kafka.connect.util.TopicAdmin` don't inherit
> > KafkaAdminClient
> >instead initialize KafkaAdminClient.
> >
> > 2. Using ForwardingAdmin will make it easier to test which methods use
> > KafkaAdminClient and which don't this make the test for any customized
> > implementation easier. We can't have this with inheatcancs
> > 3. Inhearting KafkaAdminClient has the following limitation
> >a. KafkaAdminClient doesn't have a public default constructor, so
> > isn't gonna be easy to have contractor that initialize both
> > KafkaAdminClient (to be used for read/describe/list) and customized
> > fedrated client (to be used for create/update). (Note I don't want to
> touch
> > anything in KafkaAdminClient code base to change that)
> >   b. the only way to initialize instance is by using`createInternal`
> > which is static and can't be overridden to include creating customized
> > fedrated client. This is the method used by `Admin.create` to
> > initialize `KafkaAdminClient`in
> > most MM2 and Kafka codebase.
> >
> > 3. KIP-158 deals with the topics that source connectors write to, not the
> > > internal topic

Re: [Vote] KIP-787 - MM2 Interface to manage Kafka resources

2022-06-21 Thread Omnia Ibrahim
Hi,
Can I get more votes on this, please?

Thanks

On Sun, Jun 12, 2022 at 2:40 PM Federico Valeri 
wrote:

> Hi Omnia, this will be really useful, especially in cloud environment.
>
> +1 non binding
>
> Thanks
> Fede
>
> On Tue, May 31, 2022 at 5:28 PM Mickael Maison 
> wrote:
> >
> > Hi Omnia,
> >
> > I think the approach you settled on is the best option, this will
> > allow integrating MirrorMaker in more environments.
> >
> > +1 binding
> >
> > Thanks for the KIP (and your persistence!)
> > Mickael
> >
> > On Mon, May 30, 2022 at 12:09 PM Omnia Ibrahim 
> wrote:
> > >
> > > Hi,
> > > Can I get a vote on this, please?
> > > Thanks
> > >
> > > On Wed, May 25, 2022 at 11:15 PM Omnia Ibrahim <
> o.g.h.ibra...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > > I'd like to start a vote on KIP-787
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-787
> > > > %3A+MM2+Interface+to+manage+Kafka+resources
> > > > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
> >
> > > >
> > > > Thanks
> > > > Omnia
> > > >
>


Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-06-20 Thread Omnia Ibrahim
 custom administration logic and specify as the value for the
> "forwarding.admin.class" (or ".forwarding.admin.class") property,
> and that it will be instantiated with a KafkaAdminClient instance that can
> be used to get the same logic that MM2 provides today. In the case you
> mentioned (KafkaAdminClient for read/describe, custom Admin for
> create/update), I'd imagine one could override the createTopics,
> deleteTopics, createAcls, deleteAcls (maybe?), alterConfigs (maybe?), etc.
> methods, and then leave other methods such as listTopics, describeTopics,
> describeCluster, etc. as they are.
>
> 3. KIP-158 deals with the topics that source connectors write to, not the
> internal topics used by the Connect framework. IIUC this includes the
> topics that MM2 mirrors. I think it's still fine if we want to leave
> support for this out and say that this KIP only addresses code that lives
> inside the connect/mirror (and possibly connect/mirror-client?) modules, I
> just want to make sure that whatever behavior we settle on is specified
> clearly in the KIP and user-facing documentation.
>
> 4. That's fine  Just out of curiosity, is the motivation for this
> decision to simplify the implementation? I can imagine it'd be easier to
> modify the MM2 codebase exclusively and not have to worry about touching
> the Connect framework as well given the possibility for unintended
> consequences for other connectors with the latter. Wondering if there's a
> distinction on the feature front as well that makes MM2 internal topics
> different from Connect internal topics.
>
> Cheers,
>
> Chris
>
> On Mon, Jun 6, 2022 at 6:36 AM Omnia Ibrahim 
> wrote:
>
> > Hi Chris, Thanks for having the time to look into this.
> >
> > 1. Is the introduction of the new "ForwardingAdmin" class necessary, or
> can
> > > the same behavior can be achieved by subclassing the existing
> > > KafkaAdminClient class?
> >
> > forwarding decorators give more flexibility than the inheritance, in this
> > case, ForwardingAdmin gives the ability to use the default
> KafkaAdminClient
> > for reading/describing resources and at the same time configure another
> > client to connect to the federated solution to create and update
> resources.
> > Using forwarding seems cleaner and more flexible for this use case than
> > inheritance.
> >
> > 2. Would it be just as accurate to name the new Mirror Maker 2 property
> > > "admin.class" instead of "forwarding.admin.class"? I think brevity may
> > work
> > > in our favor here
> > >
> > I don't mind renaming it to "admin.class" if this is better.
> >
> > 3. Would the admin class specified by the user also take effect for
> KIP-158
> > > [1] style automatic topic creation? (Forgive me if this isn't
> applicable
> > > for Mirror Maker 2; I'm asking solely based on the knowledge that MM2
> can
> > > be run as a source connector and has its own source task class [2].)
> >
> > No this only control creating/updating mirrored topics and MM2 internal
> > topics. And not going to affect connect runtime's internal topics that
> are
> > needed by connect cluster that runs MM2.
> >
> > 4. Would the admin class specified by the user also take effect for
> > > internal topics created by the Connect framework (i.e., the statue,
> > config,
> > > and offsets topics)?
> >
> >  No this wouldn't address connect as connect "realtime" uses TopicAdmin
> to
> > manage topics needed for KafkaOffsetBackingStore, KafkaStatusBackingStore
> > and KafkaConfigBackingStore directly. These 3 topics aren't a huge
> concern
> > for MM2 as they are a small set of topics and can be created up-front as
> a
> > one-time job. However, the main concern of the KIP is addressing the
> > mirrored topics, synced configs, synced ACLs and the internal topics of
> MM2
> > which are needed for MM2 features like heartbeat and offset mapping
> between
> > Kafka clusters.
> >
> > The KIP states that Mirror Maker 2 will "Use
> > > ForwardingAdmin in MirrorUtils instead of TopicAdmin to create internal
> > > compacted topics", but IIUC these topics (the ones created with the
> > > MirrorUtils class) are Mirror Maker 2-specific and different from the
> > > Connect framework's internal topics.
> >
> > MM2 has been using 2 patterns to create topics
> > 1- Use AdminClient directly to create/update mirrored topics and their
> ACLs
> > 2- Use TopicAdmin in MirrorUtils to create MM2 internal topics which are
> > heartbeat, mm2-o

Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-06-06 Thread Omnia Ibrahim
Hi Chris, Thanks for having the time to look into this.

1. Is the introduction of the new "ForwardingAdmin" class necessary, or can
> the same behavior can be achieved by subclassing the existing
> KafkaAdminClient class?

forwarding decorators give more flexibility than the inheritance, in this
case, ForwardingAdmin gives the ability to use the default KafkaAdminClient
for reading/describing resources and at the same time configure another
client to connect to the federated solution to create and update resources.
Using forwarding seems cleaner and more flexible for this use case than
inheritance.

2. Would it be just as accurate to name the new Mirror Maker 2 property
> "admin.class" instead of "forwarding.admin.class"? I think brevity may work
> in our favor here
>
I don't mind renaming it to "admin.class" if this is better.

3. Would the admin class specified by the user also take effect for KIP-158
> [1] style automatic topic creation? (Forgive me if this isn't applicable
> for Mirror Maker 2; I'm asking solely based on the knowledge that MM2 can
> be run as a source connector and has its own source task class [2].)

No this only control creating/updating mirrored topics and MM2 internal
topics. And not going to affect connect runtime's internal topics that are
needed by connect cluster that runs MM2.

4. Would the admin class specified by the user also take effect for
> internal topics created by the Connect framework (i.e., the statue, config,
> and offsets topics)?

 No this wouldn't address connect as connect "realtime" uses TopicAdmin to
manage topics needed for KafkaOffsetBackingStore, KafkaStatusBackingStore
and KafkaConfigBackingStore directly. These 3 topics aren't a huge concern
for MM2 as they are a small set of topics and can be created up-front as a
one-time job. However, the main concern of the KIP is addressing the
mirrored topics, synced configs, synced ACLs and the internal topics of MM2
which are needed for MM2 features like heartbeat and offset mapping between
Kafka clusters.

The KIP states that Mirror Maker 2 will "Use
> ForwardingAdmin in MirrorUtils instead of TopicAdmin to create internal
> compacted topics", but IIUC these topics (the ones created with the
> MirrorUtils class) are Mirror Maker 2-specific and different from the
> Connect framework's internal topics.

MM2 has been using 2 patterns to create topics
1- Use AdminClient directly to create/update mirrored topics and their ACLs
2- Use TopicAdmin in MirrorUtils to create MM2 internal topics which are
heartbeat, mm2-offset-syncs..internal and
.checkpoints.internal

This KIP will only replace AdminClient and TopicAdmin in the MM2 codebase
by ForwardingAdmin and not connect related topics.
As Colin mentioned before we can have a feature KIP where we use
ForwardingAdmin outside MM2 but this is not addressed in this KIP.

Hope this answered your questions.

Best
Omnia



On Wed, Jun 1, 2022 at 2:14 AM Chris Egerton 
wrote:

> Hi Omnia,
>
> Thank you for your patience with this KIP! I have a few quick thoughts:
>
> 1. Is the introduction of the new "ForwardingAdmin" class necessary, or can
> the same behavior can be achieved by subclassing the existing
> KafkaAdminClient class?
>
> 2. Would it be just as accurate to name the new Mirror Maker 2 property
> "admin.class" instead of "forwarding.admin.class"? I think brevity may work
> in our favor here
>
> 3. Would the admin class specified by the user also take effect for KIP-158
> [1] style automatic topic creation? (Forgive me if this isn't applicable
> for Mirror Maker 2; I'm asking solely based on the knowledge that MM2 can
> be run as a source connector and has its own source task class [2].)
>
> 4. Would the admin class specified by the user also take effect for
> internal topics created by the Connect framework (i.e., the statue, config,
> and offsets topics)? The KIP states that Mirror Maker 2 will "Use
> ForwardingAdmin in MirrorUtils instead of TopicAdmin to create internal
> compacted topics", but IIUC these topics (the ones created with the
> MirrorUtils class) are Mirror Maker 2-specific and different from the
> Connect framework's internal topics.
>
> [1] -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics
> [2] -
>
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
>
> Cheers,
>
> Chris
>
> On Wed, May 25, 2022 at 5:26 AM Omnia Ibrahim 
> wrote:
>
> > Hi everyone, If there's no major concern anymore, I'll start the
> > voting process.
> >
> > On Fri, May 20, 2022 at 

Re: [Vote] KIP-787 - MM2 Interface to manage Kafka resources

2022-05-30 Thread Omnia Ibrahim
Hi,
Can I get a vote on this, please?
Thanks

On Wed, May 25, 2022 at 11:15 PM Omnia Ibrahim 
wrote:

> Hi,
> I'd like to start a vote on KIP-787
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787
> %3A+MM2+Interface+to+manage+Kafka+resources
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources>
>
> Thanks
> Omnia
>


[Vote] KIP-787 - MM2 Interface to manage Kafka resources

2022-05-25 Thread Omnia Ibrahim
Hi,
I'd like to start a vote on KIP-787
https://cwiki.apache.org/confluence/display/KAFKA/KIP-787
%3A+MM2+Interface+to+manage+Kafka+resources


Thanks
Omnia


Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-05-25 Thread Omnia Ibrahim
Hi everyone, If there's no major concern anymore, I'll start the
voting process.

On Fri, May 20, 2022 at 5:58 PM Omnia Ibrahim 
wrote:

> Hi Colin,
>
> >Thanks for the clarification. I agree it's reasonable for people to want
> to use their own implementations of Admin. And we could have a config for
> this, so that it becomes pluggable (possibly in other places than
> MirrorMaker, although we don't have to do that in this KIP).
> >
> Allowing people to plug custom implementation of Admin in other places
> sounds like a neat idea indeed. It can be nice addition for example `
> org.apache.kafka.connect.util.SharedTopicAdmin` in Connect to use custom
> Admin as well. But agree no need to have it in this KIP.
> >We could even try to make this easier on developers. For example, we
> could provide a public ForwardingAdmin class that forwards all requests to
> the regular KafkaAdminClient. Then, people could make their custom class
> inherit from ForwardingAdmin and override >just the specific methods that
> they wanted to override. So they don't have to implement all the methods,
> but just the ones that are different for them.
> >
> >I just wanted to make sure we weren't creating a second Admin client
> interface -- I think that would really be hard for us to support long-term.
>
> Forwarding would defiantly make it easier. I have updated the KIP to
> introduce ForwardingAdmin as well.
>
> regards,
> Omnia
>
> On Mon, May 16, 2022 at 9:31 PM Colin McCabe  wrote:
>
>> On Mon, May 16, 2022, at 10:24, Omnia Ibrahim wrote:
>> > Hi Colin,
>> >
>> > Thanks for your reply.
>> >
>> > This KIP doesn’t aim to solve any security concerns, but rather a
>> conflict
>> > of responsibilities within any Kafka ecosystem that includes MM2 and any
>> > resource management solution. I’m not sure that was clear, so I’m
>> concerned
>> > about the motivation for your suggestion to close this KIP.
>> >
>>
>> Hi Omnia,
>>
>> Thanks for the clarification. I agree it's reasonable for people to want
>> to use their own implementations of Admin. And we could have a config for
>> this, so that it becomes pluggable (possibly in other places than
>> MirrorMaker, although we don't have to do that in this KIP).
>>
>> We could even try to make this easier on developers. For example, we
>> could provide a public ForwardingAdmin class that forwards all requests to
>> the regular KafkaAdminClient. Then, people could make their custom class
>> inherit from ForwardingAdmin and override just the specific methods that
>> they wanted to override. So they don't have to implement all the methods,
>> but just the ones that are different for them.
>>
>> I just wanted to make sure we weren't creating a second Admin client
>> interface -- I think that would really be hard for us to support long-term.
>>
>> >
>> > It is generally accepted that resource management should be centralized,
>> > especially on the scale of mirroring N number of clusters. The point of
>> > this KIP is that any sort of topic management / federate solution /
>> > up-front capacity planning system will be at odds with MM2 if MM2 keeps
>> > using the Admin client directly.
>> >
>>
>> Thanks for the explanation. That makes sense.
>>
>> >
>> > I understand your concern that the interface proposed in the first
>> approach
>> > may become too similar to the existing Admin interface. I’ll update the
>> > proposal by moving Ryanne’s previous suggestion to re-use the Admin
>> > interface and add configuration to accept a custom implementation.
>> >
>>
>> +1.
>>
>> >
>> > If you still feel this KIP should be closed but can understand its
>> > motivation I can close this one and create a new one.
>> >
>>
>> I think it's reasonable to keep this one open and make the changes you
>> talked about above.
>>
>> regards,
>> Colin
>>
>> >
>> > Thanks,
>> > Omnia
>> >
>> > On Fri, May 13, 2022 at 6:10 PM Colin McCabe 
>> wrote:
>> >
>> >> On Wed, May 11, 2022, at 15:07, Omnia Ibrahim wrote:
>> >> > Hi Colin,
>> >> > I don't mind the idea of MM2 users implementing the AdminClient
>> >> interface.
>> >> > However, there're two disadvantages to this.
>> >> >
>> >> >1. Having around 70 methods definitions to have "NotImplemented"
>> is
>> >> one
>> >> >

Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-05-20 Thread Omnia Ibrahim
Hi Colin,

>Thanks for the clarification. I agree it's reasonable for people to want
to use their own implementations of Admin. And we could have a config for
this, so that it becomes pluggable (possibly in other places than
MirrorMaker, although we don't have to do that in this KIP).
>
Allowing people to plug custom implementation of Admin in other places
sounds like a neat idea indeed. It can be nice addition for example `
org.apache.kafka.connect.util.SharedTopicAdmin` in Connect to use custom
Admin as well. But agree no need to have it in this KIP.
>We could even try to make this easier on developers. For example, we could
provide a public ForwardingAdmin class that forwards all requests to the
regular KafkaAdminClient. Then, people could make their custom class
inherit from ForwardingAdmin and override >just the specific methods that
they wanted to override. So they don't have to implement all the methods,
but just the ones that are different for them.
>
>I just wanted to make sure we weren't creating a second Admin client
interface -- I think that would really be hard for us to support long-term.

Forwarding would defiantly make it easier. I have updated the KIP to
introduce ForwardingAdmin as well.

regards,
Omnia

On Mon, May 16, 2022 at 9:31 PM Colin McCabe  wrote:

> On Mon, May 16, 2022, at 10:24, Omnia Ibrahim wrote:
> > Hi Colin,
> >
> > Thanks for your reply.
> >
> > This KIP doesn’t aim to solve any security concerns, but rather a
> conflict
> > of responsibilities within any Kafka ecosystem that includes MM2 and any
> > resource management solution. I’m not sure that was clear, so I’m
> concerned
> > about the motivation for your suggestion to close this KIP.
> >
>
> Hi Omnia,
>
> Thanks for the clarification. I agree it's reasonable for people to want
> to use their own implementations of Admin. And we could have a config for
> this, so that it becomes pluggable (possibly in other places than
> MirrorMaker, although we don't have to do that in this KIP).
>
> We could even try to make this easier on developers. For example, we could
> provide a public ForwardingAdmin class that forwards all requests to the
> regular KafkaAdminClient. Then, people could make their custom class
> inherit from ForwardingAdmin and override just the specific methods that
> they wanted to override. So they don't have to implement all the methods,
> but just the ones that are different for them.
>
> I just wanted to make sure we weren't creating a second Admin client
> interface -- I think that would really be hard for us to support long-term.
>
> >
> > It is generally accepted that resource management should be centralized,
> > especially on the scale of mirroring N number of clusters. The point of
> > this KIP is that any sort of topic management / federate solution /
> > up-front capacity planning system will be at odds with MM2 if MM2 keeps
> > using the Admin client directly.
> >
>
> Thanks for the explanation. That makes sense.
>
> >
> > I understand your concern that the interface proposed in the first
> approach
> > may become too similar to the existing Admin interface. I’ll update the
> > proposal by moving Ryanne’s previous suggestion to re-use the Admin
> > interface and add configuration to accept a custom implementation.
> >
>
> +1.
>
> >
> > If you still feel this KIP should be closed but can understand its
> > motivation I can close this one and create a new one.
> >
>
> I think it's reasonable to keep this one open and make the changes you
> talked about above.
>
> regards,
> Colin
>
> >
> > Thanks,
> > Omnia
> >
> > On Fri, May 13, 2022 at 6:10 PM Colin McCabe  wrote:
> >
> >> On Wed, May 11, 2022, at 15:07, Omnia Ibrahim wrote:
> >> > Hi Colin,
> >> > I don't mind the idea of MM2 users implementing the AdminClient
> >> interface.
> >> > However, there're two disadvantages to this.
> >> >
> >> >1. Having around 70 methods definitions to have "NotImplemented" is
> >> one
> >> >downside, and keep up with these if the AdminClient interface
> changes.
> >> >2. It makes it hard to list what admin functionality MM2 uses as
> MM2
> >> >interactions with AdminClient in the codebase are in many places.
> >> >
> >> > I guess it's OK for MM2 users who want to build their admin client to
> >> carry
> >> > this burden, as I explained in my previous response to the discussion
> >> > thread. And we can do some cleanup to the codebase to have all Admin
> >> > interactions in MM2 in 

Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-05-16 Thread Omnia Ibrahim
Hi Colin,

Thanks for your reply.

This KIP doesn’t aim to solve any security concerns, but rather a conflict
of responsibilities within any Kafka ecosystem that includes MM2 and any
resource management solution. I’m not sure that was clear, so I’m concerned
about the motivation for your suggestion to close this KIP.

It is generally accepted that resource management should be centralized,
especially on the scale of mirroring N number of clusters. The point of
this KIP is that any sort of topic management / federate solution /
up-front capacity planning system will be at odds with MM2 if MM2 keeps
using the Admin client directly.

I understand your concern that the interface proposed in the first approach
may become too similar to the existing Admin interface. I’ll update the
proposal by moving Ryanne’s previous suggestion to re-use the Admin
interface and add configuration to accept a custom implementation.

If you still feel this KIP should be closed but can understand its
motivation I can close this one and create a new one.

Thanks,
Omnia

On Fri, May 13, 2022 at 6:10 PM Colin McCabe  wrote:

> On Wed, May 11, 2022, at 15:07, Omnia Ibrahim wrote:
> > Hi Colin,
> > I don't mind the idea of MM2 users implementing the AdminClient
> interface.
> > However, there're two disadvantages to this.
> >
> >1. Having around 70 methods definitions to have "NotImplemented" is
> one
> >downside, and keep up with these if the AdminClient interface changes.
> >2. It makes it hard to list what admin functionality MM2 uses as MM2
> >interactions with AdminClient in the codebase are in many places.
> >
> > I guess it's OK for MM2 users who want to build their admin client to
> carry
> > this burden, as I explained in my previous response to the discussion
> > thread. And we can do some cleanup to the codebase to have all Admin
> > interactions in MM2 in a utils class or something like that to make it
> > easier to navigate what MM2 needs from the Admin interface.
> >
>
> Hi Omnia,
>
> Anyone who wants to extend Kafka with proprietary tooling does need to
> keep up with the Kafka API. We have done everything we can to make this
> easier. We rigorously define what the API is through the KIP process, and
> make it possible to extend by making it an interface rather than concrete
> class. We also have a pretty lengthy deprecation process for these APIs.
>
> >
> > Maybe I'm misunderstanding the use-case you're describing here. But it
> >> seems to me that if you create a proxy that has the ability to do any
> admin
> >> operation, and give MM2 access to that proxy, the security model is the
> >> same as just giving MM2 admin access. (Or it may be worse if the
> sysadmin
> >> doesn't know what this proxy is doing, and doesn't lock it down...)
> >>
> >
> > MM2 runs with the assumption that it has
> >
> >- "CREATE" ACLs for topics on the source clusters to create
> `heartbeat`
> >topics.
> >- "CREATE"  and "ALTER" ACLs to create topics, add partitions, update
> >topics' config and topics' ACLs (in future, will also include group
> ACLS as
> >Mikael mentioned before in the thread) on the destination clusters.
> >
> > Most organisations have some resource management or federated solutions
> > (some would even have a budget system as part of these systems) to manage
> > Kafka resources, and these systems are usually the only application
> allowed
> > to initializing a client with "CREATE" and "ALTER" ACLs. They don't grant
> > these ACLs to any other teams/groups/applications to create such a client
> > outside these systems, so assuming MM2 can bypass these systems and use
> the
> > AdminClient directly to create/update resources isn't valid. This is the
> > primary concern here.
> >
> > The KIP is trying to give MM2 more flexibility to allow organisations to
> > integrate MM2 with their resource management system as they see fit
> without
> > forcing them to disable most MM2 features.
> >
> > Hope this make sense and clear it up.
> >
>
> The point I was trying to make is that there is no additional security
> here. If you have some agent that has all the permissions, and MM2 can talk
> to that agent and tell it what to do, then that is equivalent to just
> giving MM2 all the permissions. So while there may be other reasons to use
> this kind of agent-based architecture, added security isn't one.
>
> In any case, I think we should close this KIP since we already have an
> Admin API. There isn't a need to create a public API for admin operations.
>
> best,
>

Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-05-12 Thread Omnia Ibrahim
I updated the KIP to reflect the options we have been discussing since Oct
2021 for people who didn't read the discussion thread.

On Wed, May 11, 2022 at 11:07 PM Omnia Ibrahim 
wrote:

> Hi Colin,
> I don't mind the idea of MM2 users implementing the AdminClient interface.
> However, there're two disadvantages to this.
>
>1. Having around 70 methods definitions to have "NotImplemented" is
>one downside, and keep up with these if the AdminClient interface changes.
>2. It makes it hard to list what admin functionality MM2 uses as MM2
>interactions with AdminClient in the codebase are in many places.
>
> I guess it's OK for MM2 users who want to build their admin client to
> carry this burden, as I explained in my previous response to the discussion
> thread. And we can do some cleanup to the codebase to have all Admin
> interactions in MM2 in a utils class or something like that to make it
> easier to navigate what MM2 needs from the Admin interface.
>
> Maybe I'm misunderstanding the use-case you're describing here. But it
>> seems to me that if you create a proxy that has the ability to do any admin
>> operation, and give MM2 access to that proxy, the security model is the
>> same as just giving MM2 admin access. (Or it may be worse if the sysadmin
>> doesn't know what this proxy is doing, and doesn't lock it down...)
>>
>
> MM2 runs with the assumption that it has
>
>- "CREATE" ACLs for topics on the source clusters to create
>`heartbeat` topics.
>- "CREATE"  and "ALTER" ACLs to create topics, add partitions, update
>topics' config and topics' ACLs (in future, will also include group ACLS as
>Mikael mentioned before in the thread) on the destination clusters.
>
> Most organisations have some resource management or federated solutions
> (some would even have a budget system as part of these systems) to manage
> Kafka resources, and these systems are usually the only application allowed
> to initializing a client with "CREATE" and "ALTER" ACLs. They don't grant
> these ACLs to any other teams/groups/applications to create such a client
> outside these systems, so assuming MM2 can bypass these systems and use the
> AdminClient directly to create/update resources isn't valid. This is the
> primary concern here.
>
> The KIP is trying to give MM2 more flexibility to allow organisations to
> integrate MM2 with their resource management system as they see fit without
> forcing them to disable most MM2 features.
>
> Hope this make sense and clear it up.
>
>
> On Wed, May 11, 2022 at 9:09 PM Colin McCabe  wrote:
>
>> Hi Omnia Ibrahim,
>>
>> I'm sorry, but I am -1 on adding competing Admin interfaces. This would
>> create confusion and a heavier maintenance burden for the project.
>>
>> Since the org.apache.kafka.clients.admin.Admin interface is a Java
>> interface, any third-party software that wants to insert its own
>> implementation of the interface can do so already.
>>
>> A KIP to make the Admin class used pluggable for MM2 would be reasonable.
>> Adding a competing admin API is not.
>>
>> It's true that there are many Admin methods, but you do not need to
>> implement all of them -- just the ones that MirrorMaker uses. The other
>> ones can throw a NotImplementedException or similar.
>>
>> > The current approach also assumes that the user running MM2 has the
>> Admin right to
>> > create/update topics, which is only valid if the user who runs MM2 also
>> manages both
>> > source and destination clusters.
>>
>> Maybe I'm misunderstanding the use-case you're describing here. But it
>> seems to me that if you create a proxy that has the ability to do any admin
>> operation, and give MM2 access to that proxy, the security model is the
>> same as just giving MM2 admin access. (Or it may be worse if the sysadmin
>> doesn't know what this proxy is doing, and doesn't lock it down...)
>>
>> best,
>> Colin
>>
>>
>> On Mon, May 9, 2022, at 13:21, Omnia Ibrahim wrote:
>> > Hi, I gave the KIP another look after talking to some people at the
>> Kafka
>> > Summit in London. And I would like to clear up the motivation of this
>> KIP.
>> >
>> >
>> > At the moment, MM2 has some opinionated decisions that are creating
>> issues
>> > for teams that use IaC, federated solutions or have a capacity/budget
>> > planning system for Kafka destination clusters. To explain it better,
>> let's
>> > assume we have MM2 with the following configurations to highlight these
>> > pro

Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-05-11 Thread Omnia Ibrahim
Hi Colin,
I don't mind the idea of MM2 users implementing the AdminClient interface.
However, there're two disadvantages to this.

   1. Having around 70 methods definitions to have "NotImplemented" is one
   downside, and keep up with these if the AdminClient interface changes.
   2. It makes it hard to list what admin functionality MM2 uses as MM2
   interactions with AdminClient in the codebase are in many places.

I guess it's OK for MM2 users who want to build their admin client to carry
this burden, as I explained in my previous response to the discussion
thread. And we can do some cleanup to the codebase to have all Admin
interactions in MM2 in a utils class or something like that to make it
easier to navigate what MM2 needs from the Admin interface.

Maybe I'm misunderstanding the use-case you're describing here. But it
> seems to me that if you create a proxy that has the ability to do any admin
> operation, and give MM2 access to that proxy, the security model is the
> same as just giving MM2 admin access. (Or it may be worse if the sysadmin
> doesn't know what this proxy is doing, and doesn't lock it down...)
>

MM2 runs with the assumption that it has

   - "CREATE" ACLs for topics on the source clusters to create `heartbeat`
   topics.
   - "CREATE"  and "ALTER" ACLs to create topics, add partitions, update
   topics' config and topics' ACLs (in future, will also include group ACLS as
   Mikael mentioned before in the thread) on the destination clusters.

Most organisations have some resource management or federated solutions
(some would even have a budget system as part of these systems) to manage
Kafka resources, and these systems are usually the only application allowed
to initializing a client with "CREATE" and "ALTER" ACLs. They don't grant
these ACLs to any other teams/groups/applications to create such a client
outside these systems, so assuming MM2 can bypass these systems and use the
AdminClient directly to create/update resources isn't valid. This is the
primary concern here.

The KIP is trying to give MM2 more flexibility to allow organisations to
integrate MM2 with their resource management system as they see fit without
forcing them to disable most MM2 features.

Hope this make sense and clear it up.


On Wed, May 11, 2022 at 9:09 PM Colin McCabe  wrote:

> Hi Omnia Ibrahim,
>
> I'm sorry, but I am -1 on adding competing Admin interfaces. This would
> create confusion and a heavier maintenance burden for the project.
>
> Since the org.apache.kafka.clients.admin.Admin interface is a Java
> interface, any third-party software that wants to insert its own
> implementation of the interface can do so already.
>
> A KIP to make the Admin class used pluggable for MM2 would be reasonable.
> Adding a competing admin API is not.
>
> It's true that there are many Admin methods, but you do not need to
> implement all of them -- just the ones that MirrorMaker uses. The other
> ones can throw a NotImplementedException or similar.
>
> > The current approach also assumes that the user running MM2 has the
> Admin right to
> > create/update topics, which is only valid if the user who runs MM2 also
> manages both
> > source and destination clusters.
>
> Maybe I'm misunderstanding the use-case you're describing here. But it
> seems to me that if you create a proxy that has the ability to do any admin
> operation, and give MM2 access to that proxy, the security model is the
> same as just giving MM2 admin access. (Or it may be worse if the sysadmin
> doesn't know what this proxy is doing, and doesn't lock it down...)
>
> best,
> Colin
>
>
> On Mon, May 9, 2022, at 13:21, Omnia Ibrahim wrote:
> > Hi, I gave the KIP another look after talking to some people at the Kafka
> > Summit in London. And I would like to clear up the motivation of this
> KIP.
> >
> >
> > At the moment, MM2 has some opinionated decisions that are creating
> issues
> > for teams that use IaC, federated solutions or have a capacity/budget
> > planning system for Kafka destination clusters. To explain it better,
> let's
> > assume we have MM2 with the following configurations to highlight these
> > problems.
> >
> > ```
> >
> > topics = .*
> >
> > refresh.topics.enabled = true
> >
> > sync.topic.configs.enabled = true
> >
> > sync.topic.acls.enabled = true
> >
> > // Maybe in futrue we can have sync.group.acls.enabled = true
> >
> > ```
> >
> >
> > These configurations allow us to run MM2 with the value of its full
> > features. However, there are two main concerns when we run on a scale
> with
> > these configs:
> >
> > 1. *Capacity/Budgeting Planning:*
>

Re: [DISCUSS] KIP-781: Allow MirrorMaker 2 to override the client configurations

2022-05-11 Thread Omnia Ibrahim
Hi Dongjin, nice spot for this bug. I wonder if this really needs a KIP as
it seems like a clear bug in MM2's interpretation for the config. If you're
going to keep the KIP can you please add some details in the Public
interface or Proposed changes to list which part of MM2 codebase will
change to fix this issue?

Thanks

On Mon, Oct 11, 2021 at 9:43 PM Dongjin Lee  wrote:

> Hi Kafka dev,
>
> I hope to initiate the discussion of KIP-781: Allow MirrorMaker 2 to
> override the client configurations.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-781%3A+Allow+MirrorMaker+2+to+override+the+client+configurations
>
> I found this problem while testing the MirrorMaker 2 deployments; in short,
> the configurations like `us-east.producer.acks = all` are not working now.
> It seems like to make it working like the documentation, a configuration
> overriding policy is needed.
>
> All kinds of feedbacks are greatly appreciated!
>
> Thanks,
> Dongjin
>
> --
> *Dongjin Lee*
>
> *A hitchhiker in the mathematical world.*
>
>
>
> *github:  github.com/dongjinleekr
> keybase: https://keybase.io/dongjinleekr
> linkedin: kr.linkedin.com/in/dongjinleekr
> speakerdeck:
> speakerdeck.com/dongjin
> *
>


Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-05-09 Thread Omnia Ibrahim
 a specific topic's configuration from being
   synced.


I hope this clears up the problem better. Please let me know what do you
think.


On Mon, Feb 7, 2022 at 3:26 PM Omnia Ibrahim 
wrote:

> Hi Mickael. Thanks for the feedback. I address some of your points below.
>
> *> This seems to address a relatively advanced and specific use case*
> The main point of the KIP is that MM2 is making a massive assumption that
> it has the right to alter/create resources. This assumption isn't valid in
> the world of Infra-as-Code, federated solutions and popularity of OS Kafka
> Kubernetes operators; these infra/resource management solutions aren't
> advanced use-cases anymore nowadays. These concerns had been raised in the
> past, especially regarding the assumption that MM2 can create topics on the
> destination cluster. For example,
> https://issues.apache.org/jira/browse/KAFKA-12753 and
> https://www.mail-archive.com/dev@kafka.apache.org/msg119340.html.
> The primary motivation is giving some power to data
> platform/infrastructure team to make MM2 part of their internal Kafka
> ecosystem without dropping the main features that make MM2 valuable, like
> syncing topic configs. For example, if someone uses any OS Kafka k8s
> operator, they can implement the class to interact with the k8s operator to
> create these resources.
>
> *> My initial concern is this may make it hard to evolve MirrorMaker as
> we'll likely need to update this new interface if new features are added.*
> I agree it's a disadvantage to adding a new interface however adding more
> admin interactions from MM2 to alter/create resources and access will feed
> the main issue as I mentioned above with the popularity of IaC and
> federated solutions; most data platform/infrastructure teams will endup
> disabling these new features.
> Also, at the moment, most MM2 interactions with the admin client are
> scattered across the codebase so having one place where all admin
> interactions are listed isn't a bad thing.
>
> *> For example if we wanted to sync group ACLs.*
> As I mentioned before, altering any resource's configurations with MM2 is
> the one main concern for any data platform/infrastructure team that wants
> to have control over their clusters and use MM2. So the main question with
> adding any new altering feature like sync group ACLs will raise the same
> question of how many teams will actually enable this feature.
>
>
>
>
>
>
>
>
>
> *>Regarding the proposed API, I have a few suggestions: >- What about
> using configure() instead of the constructor to pass the >configuration,
> especially as it's implementing Configurable >- It's not clear what all the
> arguments of createTopicPartitions()>are. What's the difference between
> partitionCounts and newPartitions?>Should we have separate methods for
> creating topics and partitions? >- Do we really need
> createCompactedTopic()? >- Instead of updateTopicConfigs() and updateAcls()
> should we use the >"alter" prefix to stay consistent with Admin?*
>
> These are good suggestions that will update the KIP to address these.
> Regarding the `createCompactedTopic` MM2 is using this method to create
> internal topics.
>
> Thanks
>
> On Wed, Jan 26, 2022 at 1:55 PM Mickael Maison 
> wrote:
>
>> Hi Omnia,
>>
>> Thanks for the KIP, sorry for taking so long to comment. I've only had
>> time to take a quick look so far.
>>
>> This seems to address a relatively advanced and specific use case. My
>> initial concern is this may make it hard to evolve MirrorMaker as
>> we'll likely need to update this new interface if new features are
>> added. For example if we wanted to sync group ACLs.
>> I'm wondering if it's something you've thought about. I'm not saying
>> it's a blocker but we have to weigh the pros and cons when introducing
>> new features.
>>
>> Regarding the proposed API, I have a few suggestions:
>> - What about using configure() instead of the constructor to pass the
>> configuration, especially as it's implementing Configurable
>> - It's not clear what all the arguments of createTopicPartitions()
>> are. What's the difference between partitionCounts and newPartitions?
>> Should we have separate methods for creating topics and partitions?
>> - Do we really need createCompactedTopic()?
>> - Instead of updateTopicConfigs() and updateAcls() should we use the
>> "alter" prefix to stay consistent with Admin?
>>
>> Thanks,
>> Mickael
>>
>> On Wed, Jan 26, 2022 at 11:26 AM Omnia Ibrahim 
>> wrote:
>> >
>> > Hi,
>> > If there are no more concerns regarding the propo

Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-02-07 Thread Omnia Ibrahim
Hi Mickael. Thanks for the feedback. I address some of your points below.

*> This seems to address a relatively advanced and specific use case*
The main point of the KIP is that MM2 is making a massive assumption that
it has the right to alter/create resources. This assumption isn't valid in
the world of Infra-as-Code, federated solutions and popularity of OS Kafka
Kubernetes operators; these infra/resource management solutions aren't
advanced use-cases anymore nowadays. These concerns had been raised in the
past, especially regarding the assumption that MM2 can create topics on the
destination cluster. For example,
https://issues.apache.org/jira/browse/KAFKA-12753 and
https://www.mail-archive.com/dev@kafka.apache.org/msg119340.html.
The primary motivation is giving some power to data platform/infrastructure
team to make MM2 part of their internal Kafka ecosystem without dropping
the main features that make MM2 valuable, like syncing topic configs. For
example, if someone uses any OS Kafka k8s operator, they can implement the
class to interact with the k8s operator to create these resources.

*> My initial concern is this may make it hard to evolve MirrorMaker as
we'll likely need to update this new interface if new features are added.*
I agree it's a disadvantage to adding a new interface however adding more
admin interactions from MM2 to alter/create resources and access will feed
the main issue as I mentioned above with the popularity of IaC and
federated solutions; most data platform/infrastructure teams will endup
disabling these new features.
Also, at the moment, most MM2 interactions with the admin client are
scattered across the codebase so having one place where all admin
interactions are listed isn't a bad thing.

*> For example if we wanted to sync group ACLs.*
As I mentioned before, altering any resource's configurations with MM2 is
the one main concern for any data platform/infrastructure team that wants
to have control over their clusters and use MM2. So the main question with
adding any new altering feature like sync group ACLs will raise the same
question of how many teams will actually enable this feature.









*>Regarding the proposed API, I have a few suggestions: >- What about using
configure() instead of the constructor to pass the >configuration,
especially as it's implementing Configurable >- It's not clear what all the
arguments of createTopicPartitions()>are. What's the difference between
partitionCounts and newPartitions?>Should we have separate methods for
creating topics and partitions? >- Do we really need
createCompactedTopic()? >- Instead of updateTopicConfigs() and updateAcls()
should we use the >"alter" prefix to stay consistent with Admin?*

These are good suggestions that will update the KIP to address these.
Regarding the `createCompactedTopic` MM2 is using this method to create
internal topics.

Thanks

On Wed, Jan 26, 2022 at 1:55 PM Mickael Maison 
wrote:

> Hi Omnia,
>
> Thanks for the KIP, sorry for taking so long to comment. I've only had
> time to take a quick look so far.
>
> This seems to address a relatively advanced and specific use case. My
> initial concern is this may make it hard to evolve MirrorMaker as
> we'll likely need to update this new interface if new features are
> added. For example if we wanted to sync group ACLs.
> I'm wondering if it's something you've thought about. I'm not saying
> it's a blocker but we have to weigh the pros and cons when introducing
> new features.
>
> Regarding the proposed API, I have a few suggestions:
> - What about using configure() instead of the constructor to pass the
> configuration, especially as it's implementing Configurable
> - It's not clear what all the arguments of createTopicPartitions()
> are. What's the difference between partitionCounts and newPartitions?
> Should we have separate methods for creating topics and partitions?
> - Do we really need createCompactedTopic()?
> - Instead of updateTopicConfigs() and updateAcls() should we use the
> "alter" prefix to stay consistent with Admin?
>
> Thanks,
> Mickael
>
> On Wed, Jan 26, 2022 at 11:26 AM Omnia Ibrahim 
> wrote:
> >
> > Hi,
> > If there are no more concerns regarding the proposal can I get some
> votes on the KIP here
> https://lists.apache.org/thread/950lpxjb5kbjm8qdszlpxm9h4j4sfyjx
> >
> > Thanks
> >
> > On Wed, Oct 27, 2021 at 3:54 PM Ryanne Dolan 
> wrote:
> >>
> >> Well I'm convinced! Thanks for looking into it.
> >>
> >> Ryanne
> >>
> >> On Wed, Oct 27, 2021, 8:49 AM Omnia Ibrahim 
> wrote:
> >>
> >> > I checked the difference between the number of methods in the Admin
> >> > interface and the number of methods MM2 invokes from Admin, and this
> diff
> &

Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2022-01-26 Thread Omnia Ibrahim
Hi,
If there are no more concerns regarding the proposal can I get some votes
on the KIP here
https://lists.apache.org/thread/950lpxjb5kbjm8qdszlpxm9h4j4sfyjx
<https://lists.apache.org/thread/950lpxjb5kbjm8qdszlpxm9h4j4sfyjx>

Thanks

On Wed, Oct 27, 2021 at 3:54 PM Ryanne Dolan  wrote:

> Well I'm convinced! Thanks for looking into it.
>
> Ryanne
>
> On Wed, Oct 27, 2021, 8:49 AM Omnia Ibrahim 
> wrote:
>
> > I checked the difference between the number of methods in the Admin
> > interface and the number of methods MM2 invokes from Admin, and this diff
> > is enormous (it's more than 70 methods).
> > As far as I can see, the following methods MM2 depends on (in
> > MirrorSourceConnector, MirrorMaker, MirrorCheckpointTask and
> > MirrorCheckpointConnector), this will leave 73 methods on the Admin
> > interface that customer will need to return dummy data for,
> >
> >1. create(conf)
> >2. close
> >3. listTopics()
> >4. createTopics(newTopics, createTopicsOptions)
> >5. createPartitions(newPartitions)
> >6. alterConfigs(configs)  // this method is marked for deprecation in
> >Admin and the ConfigResource MM2 use is only TOPIC
> >7. createAcls(aclBindings) // the list of bindings always filtered by
> >TOPIC
> >8. describeAcls(aclBindingFilter) // filter is always ANY_TOPIC_ACL
> >9. describeConfigs(configResources) // Always for TOPIC resources
> >10. listConsumerGroupOffsets(groupId)
> >11. listConsumerGroups()
> >12. alterConsumerGroupOffsets(groupId, offsets)
> >13. describeCluster() // this is invoked from
> > ConnectUtils.lookupKafkaClusterId(conf),
> >but MM2 isn't the one that initialize the AdminClient
> >
> > Going with the Admin interface in practice will make any custom Admin
> > implementation humongous even for a fringe use case because of the number
> > of methods that need to return dummy data,
> >
> > I am still leaning toward a new interface as it abstract all MM2's
> > interaction with Kafka Resources in one place; this makes it easier to
> > maintain and make it easier for the use cases where customers wish to
> > provide a different method to handle resources.
> >
> > Omnia
> >
> > On Tue, Oct 26, 2021 at 4:10 PM Ryanne Dolan 
> > wrote:
> >
> > > I like the idea of failing-fast whenever a custom impl is provided,
> but I
> > > suppose that that could be done for Admin as well. I agree your
> proposal
> > is
> > > more ergonomic, but maybe it's okay to have a little friction in such
> > > fringe use-cases.
> > >
> > > Ryanne
> > >
> > >
> > > On Tue, Oct 26, 2021, 6:23 AM Omnia Ibrahim 
> > > wrote:
> > >
> > > > Hey Ryanne, Thanks fo the quick feedback.
> > > > Using the Admin interface would make everything easier, as MM2 will
> > need
> > > > only to configure the classpath for the new implementation and use it
> > > > instead of AdminClient.
> > > > However, I have two concerns
> > > > 1. The Admin interface is enormous, and the MM2 users will need to
> know
> > > the
> > > > list of methods MM2 depends on and override these only instead of
> > > > implementing the whole Admin interface.
> > > > 2. MM2 users will need keep an eye on any changes to Admin interface
> > that
> > > > impact MM2 for example deprecating methods.
> > > > Am not sure if adding these concerns on the users is acceptable or
> not.
> > > > One solution to address these concerns could be adding some checks to
> > > make
> > > > sure the methods MM2 uses from the Admin interface exists to fail
> > faster.
> > > > What do you think
> > > >
> > > > Omnia
> > > >
> > > >
> > > > On Mon, Oct 25, 2021 at 11:24 PM Ryanne Dolan  >
> > > > wrote:
> > > >
> > > > > Thanks Omnia, neat idea. I wonder if we could use the existing
> Admin
> > > > > interface instead of defining a new one?
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Mon, Oct 25, 2021, 12:54 PM Omnia Ibrahim <
> > o.g.h.ibra...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hey everyone,
> > > > > > Please take a look at KIP-787
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
> > > > > > >
> > > > > >
> > > > > > Thanks for the feedback and support
> > > > > > Omnia
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [Vote] KIP-787 - MM2 Interface to manage Kafka resources Kafka/KIPs

2022-01-11 Thread Omnia Ibrahim
Hi
Can I get a vote on this, please?

Best

On Mon, Nov 22, 2021 at 4:59 PM Omnia Ibrahim 
wrote:

> Hi All,
>
> Just thought of bumping this voting thread again to see if we can form a
> consensus around this.
>
> Thanks
>
>
> On Tue, Nov 16, 2021 at 2:16 PM Omnia Ibrahim 
> wrote:
>
>> Hi,
>> I'd like to start a vote on KIP-787
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources>
>>
>> Thanks
>> Omnia
>>
>


Re: [Vote] KIP-787 - MM2 Interface to manage Kafka resources Kafka/KIPs

2021-11-22 Thread Omnia Ibrahim
Hi All,

Just thought of bumping this voting thread again to see if we can form a
consensus around this.

Thanks


On Tue, Nov 16, 2021 at 2:16 PM Omnia Ibrahim 
wrote:

> Hi,
> I'd like to start a vote on KIP-787
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources>
>
> Thanks
> Omnia
>


Re: Track topic deletion state without ZK

2021-11-18 Thread Omnia Ibrahim
Hi,
To clarify here, what I meant by "stuck" for deletion is regarding the
state of LogDir deletion. When the cluster receives a topic deletion
request, the KRAFT will delete the topic's metadata and eventually delete
all LogDirs on any broker/disk at some point (correct me if I am wrong).
My question is how to confirm the deletion state of these LogDirs. For
example, what will happen for the following case?

   - Topic-A has one partition with four replicas on broker1, 2, 3, 4
   - The owner of Topic-A fired a topic deletion request.
   - Broker1,2,3 have no issues; however, the disk on broker 4 isn't
   reachable.
   - Broker1,2,3 deleted any LogDir they had for Topic-A.
   - Broker 4 can't delete the LogDir (for example, the disk went to
   read-only mode).
   - Now Topic-A's data isn't entirely deleted until this last LogDir on
   broker-4 is deleted.
   - For data compliance, we need to confirm the deletion of the data.


Will KRAFT keeps listing the Topic-A until the last LogDir on broker-4 gets
deleted? If this is the case, then this topic should be marked "Pending for
deletion" for transparency; If KRAFT does not list the topic anymore, how
will we confirm that all topic's LogDirs have been deleted?

Thanks,
Omnia

On Tue, Nov 16, 2021 at 9:19 PM Colin McCabe  wrote:

> Hi Omnia,
>
> Topic deletion doesn't get stuck if a broker is down, when using KRaft.
> There is no "deleting" state, only deleted or not deleted.
>
> best,
> Colin
>
> On Tue, Nov 2, 2021, at 09:24, Omnia Ibrahim wrote:
> > Hi Colin, thanks for your response.
> > Regards your point that the topic gets deleted immediately, I got that we
> > do this if the cluster is healthy.
> > However, if there's a hardware failure with the disk or the broker is
> > unreachable and has a replica; In these cases, deleting the log files
> from
> > the failed disk or unreachable broker will be impossible to delete until
> we
> > fix the hardware issue,
> > So during troubleshooting, how will we know which topic is stuck for
> > deletion because we can't delete some replicas because of hardware
> > failures?
> >
> > Thanks
> >
> >
> > On Mon, Nov 1, 2021 at 8:57 PM Colin McCabe  wrote:
> >
> >> Hi Omnia,
> >>
> >> It is not necessary to know which topics are marked for deletion in when
> >> in KRaft mode, because topic deletion happens immediately.
> >>
> >> best,
> >> Colin
> >>
> >> On Thu, Oct 28, 2021, at 06:57, Omnia Ibrahim wrote:
> >> > Hi,
> >> >
> >> > Kafka topicCommand used to report which topic is marked for deletion
> by
> >> > checking the znode on zookeeper; this feature has been deprecated
> without
> >> > replacement as part of KAFKA-12596 Remove deprecated --zookeeper in
> >> > topicCommands <https://issues.apache.org/jira/browse/KAFKA-12596>.
> >> >
> >> > Also as far as I can see, there's no equivalent for this with KIP-500
> as
> >> > well.
> >> >
> >> > Is there any other way to know the state of deletion for Kafka with ZK
> >> and
> >> > Without ZK?
> >> >
> >> > Is possible to leverage `RemoveTopicRecord` on the metadata topic to
> >> > provide the same feature?
> >> >
> >> >
> >> > Thanks
> >> >
> >> > Omnia
> >>
>


[Vote] KIP-787 - MM2 Interface to manage Kafka resources Kafka/KIPs

2021-11-16 Thread Omnia Ibrahim
Hi,
I'd like to start a vote on KIP-787
https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources


Thanks
Omnia


Re: Track topic deletion state without ZK

2021-11-02 Thread Omnia Ibrahim
Hi Colin, thanks for your response.
Regards your point that the topic gets deleted immediately, I got that we
do this if the cluster is healthy.
However, if there's a hardware failure with the disk or the broker is
unreachable and has a replica; In these cases, deleting the log files from
the failed disk or unreachable broker will be impossible to delete until we
fix the hardware issue,
So during troubleshooting, how will we know which topic is stuck for
deletion because we can't delete some replicas because of hardware
failures?

Thanks


On Mon, Nov 1, 2021 at 8:57 PM Colin McCabe  wrote:

> Hi Omnia,
>
> It is not necessary to know which topics are marked for deletion in when
> in KRaft mode, because topic deletion happens immediately.
>
> best,
> Colin
>
> On Thu, Oct 28, 2021, at 06:57, Omnia Ibrahim wrote:
> > Hi,
> >
> > Kafka topicCommand used to report which topic is marked for deletion by
> > checking the znode on zookeeper; this feature has been deprecated without
> > replacement as part of KAFKA-12596 Remove deprecated --zookeeper in
> > topicCommands <https://issues.apache.org/jira/browse/KAFKA-12596>.
> >
> > Also as far as I can see, there's no equivalent for this with KIP-500 as
> > well.
> >
> > Is there any other way to know the state of deletion for Kafka with ZK
> and
> > Without ZK?
> >
> > Is possible to leverage `RemoveTopicRecord` on the metadata topic to
> > provide the same feature?
> >
> >
> > Thanks
> >
> > Omnia
>


[jira] [Created] (KAFKA-13416) Add topic ids to any metrics that has topic and partition tags

2021-10-28 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-13416:
-

 Summary: Add topic ids to any metrics that has topic and partition 
tags
 Key: KAFKA-13416
 URL: https://issues.apache.org/jira/browse/KAFKA-13416
 Project: Kafka
  Issue Type: Bug
Reporter: Omnia Ibrahim


With 
[KIP-516|https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers]
 introducing Topic ID to more APIs, broker and client' metrics will need to 
expose topic UUID as part of any metric with the topic and partition tags.

Currently, metrics will be a bit confusing when the topic gets re-created as it 
wouldn't be evident that the topic got re-created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13415) Track topic deletion state without ZK

2021-10-28 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-13415:
-

 Summary: Track topic deletion state without ZK
 Key: KAFKA-13415
 URL: https://issues.apache.org/jira/browse/KAFKA-13415
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 3.0.0
Reporter: Omnia Ibrahim


Kafka topicCommand used to report which topic is marked for deletion by 
checking the znode on zookeeper; this feature has been silently deprecated 
without replacement as part of KAFKA-12596 Remove deprecated --zookeeper in 
topicCommands.

Also as far as I can see, there's no equivalent for this with KIP-500 as well.

 

Is there any other way to know the state of deletion for Kafka with ZK and 
Without ZK?

Is possible to leverage `RemoveTopicRecord` on the metadata topic to provide 
the same feature?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Track topic deletion state without ZK

2021-10-28 Thread Omnia Ibrahim
Hi,

Kafka topicCommand used to report which topic is marked for deletion by
checking the znode on zookeeper; this feature has been deprecated without
replacement as part of KAFKA-12596 Remove deprecated --zookeeper in
topicCommands .

Also as far as I can see, there's no equivalent for this with KIP-500 as
well.

Is there any other way to know the state of deletion for Kafka with ZK and
Without ZK?

Is possible to leverage `RemoveTopicRecord` on the metadata topic to
provide the same feature?


Thanks

Omnia


Re: [DISCUSS] KIP-787 - MM2 Interface to manage Kafka resources

2021-10-27 Thread Omnia Ibrahim
I checked the difference between the number of methods in the Admin
interface and the number of methods MM2 invokes from Admin, and this diff
is enormous (it's more than 70 methods).
As far as I can see, the following methods MM2 depends on (in
MirrorSourceConnector, MirrorMaker, MirrorCheckpointTask and
MirrorCheckpointConnector), this will leave 73 methods on the Admin
interface that customer will need to return dummy data for,

   1. create(conf)
   2. close
   3. listTopics()
   4. createTopics(newTopics, createTopicsOptions)
   5. createPartitions(newPartitions)
   6. alterConfigs(configs)  // this method is marked for deprecation in
   Admin and the ConfigResource MM2 use is only TOPIC
   7. createAcls(aclBindings) // the list of bindings always filtered by
   TOPIC
   8. describeAcls(aclBindingFilter) // filter is always ANY_TOPIC_ACL
   9. describeConfigs(configResources) // Always for TOPIC resources
   10. listConsumerGroupOffsets(groupId)
   11. listConsumerGroups()
   12. alterConsumerGroupOffsets(groupId, offsets)
   13. describeCluster() // this is invoked from
ConnectUtils.lookupKafkaClusterId(conf),
   but MM2 isn't the one that initialize the AdminClient

Going with the Admin interface in practice will make any custom Admin
implementation humongous even for a fringe use case because of the number
of methods that need to return dummy data,

I am still leaning toward a new interface as it abstract all MM2's
interaction with Kafka Resources in one place; this makes it easier to
maintain and make it easier for the use cases where customers wish to
provide a different method to handle resources.

Omnia

On Tue, Oct 26, 2021 at 4:10 PM Ryanne Dolan  wrote:

> I like the idea of failing-fast whenever a custom impl is provided, but I
> suppose that that could be done for Admin as well. I agree your proposal is
> more ergonomic, but maybe it's okay to have a little friction in such
> fringe use-cases.
>
> Ryanne
>
>
> On Tue, Oct 26, 2021, 6:23 AM Omnia Ibrahim 
> wrote:
>
> > Hey Ryanne, Thanks fo the quick feedback.
> > Using the Admin interface would make everything easier, as MM2 will need
> > only to configure the classpath for the new implementation and use it
> > instead of AdminClient.
> > However, I have two concerns
> > 1. The Admin interface is enormous, and the MM2 users will need to know
> the
> > list of methods MM2 depends on and override these only instead of
> > implementing the whole Admin interface.
> > 2. MM2 users will need keep an eye on any changes to Admin interface that
> > impact MM2 for example deprecating methods.
> > Am not sure if adding these concerns on the users is acceptable or not.
> > One solution to address these concerns could be adding some checks to
> make
> > sure the methods MM2 uses from the Admin interface exists to fail faster.
> > What do you think
> >
> > Omnia
> >
> >
> > On Mon, Oct 25, 2021 at 11:24 PM Ryanne Dolan 
> > wrote:
> >
> > > Thanks Omnia, neat idea. I wonder if we could use the existing Admin
> > > interface instead of defining a new one?
> > >
> > > Ryanne
> > >
> > > On Mon, Oct 25, 2021, 12:54 PM Omnia Ibrahim 
> > > wrote:
> > >
> > > > Hey everyone,
> > > > Please take a look at KIP-787
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
> > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-787%3A+MM2+Interface+to+manage+Kafka+resources
> > > > >
> > > >
> > > > Thanks for the feedback and support
> > > > Omnia
> > > >
> > >
> >
>


  1   2   >