Request for Create KIP permission

2018-07-31 Thread Xiongqi Wu
Hi,
This is Xiongqi (Wesley) Wu from Linkedin Kafka dev team.
I want to request permission to Create KIP for the log compaction project we 
are currently working on here at Linkedin.

My wiki id is :  xiongqiwu

--Xiongqi (Wesley) Wu



Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-13 Thread xiongqi wu
Hi Kafka,

Just updated the confluence page to include the link to this KIP.

Any comment will be appreciated:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy

Thank you.

Xiongqi (Wesley) Wu

On Thu, Aug 9, 2018 at 4:18 PM, xiongqi wu  wrote:

> Hi Kafka,
>
> This KIP tries to address GDPR concern to fulfill deletion request on time
> through time-based log compaction on a compaction enabled topic:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 354%3A+Time-based+log+compaction+policy
>
> Any feedback will be appreciated.
>
>
> Xiongqi (Wesley) Wu
>


Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-15 Thread xiongqi wu
Eno, Dong,

I have updated the KIP.  We decide not to address the issue that we might
have for both compaction and time retention enabled topics (see the
rejected alternative item 2).  This KIP will only ensure log can be
compacted after a specified time-interval.

As suggested by Dong,  we will also enforce "max.compaction.lag.ms" is not
less than "min.compaction.lag.ms".

https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy


On Tue, Aug 14, 2018 at 5:01 PM, xiongqi wu  wrote:

>
> Per discussion with Dong,  he made a very good point that if compaction
> and time based retention are both enabled on a topic, the compaction might
> prevent records from being deleted on time.  The reason is when compacting
> multiple segments into one single segment, the newly created segment will
> have same lastmodified timestamp as latest original segment. We lose the
> timestamp of all original segments except the last one. As a result,
> records might not be deleted as it should be through time based retention.
>
> With the current KIP proposal,   if we want to ensure timely deletion,  we
> have the following configurations:
> 1) enable time based log compaction only :  deletion is done though
> overriding the same key
> 2) enable time based log retention only: deletion is done though
> time-based retention
> 3) enable both log compaction and time based retention:Deletion is not
> guaranteed.
>
> Not sure if we have use case 3 and also want deletion to happen on time.
> There are several options to address deletion issue when enable both
> compaction and retention:
> A) During log compaction, looking into record timestamp to delete expired
> records. This can be done in compaction logic itself or use
> AdminClient.deleteRecords() . But this assumes we have record timestamp.
> B) retain the lastModifed time of original segments during log compaction.
> This requires extra meta data to record the information or not grouping
> multiple segments into one during compaction.
>
> If we have use case 3 in general,  I would prefer solution A and rely on
> record timestamp.
>
>
> Two questions:
> Do we have use case 3? Is it nice to have or must have?
> If we have use case 3 and want to go with solution A,  should we introduce
> a new configuration to enforce deletion by timestamp?
>
>
> On Tue, Aug 14, 2018 at 1:52 PM, xiongqi wu  wrote:
>
>> Dong,
>>
>> Thanks for the comment.
>>
>> There are two retention policy: log compaction and time based retention.
>>
>> Log compaction:
>>
>> we have use cases to keep infinite retention of a topic (only
>> compaction).  GDPR cares about deletion of PII  (personal identifiable
>> information) data.
>> Since Kafka doesn't know what records contain PII, it relies on upper
>> layer to delete those records.
>> For those infinite retention uses uses,  kafka needs to provide a way to
>> enforce compaction on time. This is what we try to address in this KIP.
>>
>> Time based retention,
>>
>> There are also use cases that users of Kafka might want to expire all
>> their data.
>> In those cases, they can use time based retention of their topics.
>>
>>
>> Regarding your first question,  if a user wants to delete a key in the
>> log compaction topic,  the user has to send a deletion using the same key.
>> Kafka only makes sure the deletion will happen under a certain time
>> periods (like 2 days/7 days).
>>
>> Regarding your second question.  In most cases, we might want to delete
>> all duplicated keys at the same time.
>> Compaction might be more efficient since we need to scan the log and find
>> all duplicates.  However,  the expected use case is to set the time based
>> compaction interval on the order of days,  and be larger than 'min
>> compaction lag".  We don't want log compaction to happen frequently since
>> it is expensive.  The purpose is to help low production rate topic to get
>> compacted on time.  For the topic with "normal" incoming message message
>> rate, the "min dirty ratio" might have triggered the compaction before this
>> time based compaction policy takes effect.
>>
>>
>> Eno,
>>
>> For your question,  like I mentioned we have long time retention use case
>> for log compacted topic, but we want to provide ability to delete certain
>> PII records on time.
>> Kafka itself doesn't know whether a record contains sensitive information
>> and relies on the user for deletion.
>>
>>
>> On Mon, Aug 13, 2018 at 6:58 PM, Dong Lin  wrote:
>>
>>> Hey Xiongqi,
>&

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-15 Thread xiongqi wu
Brett,

Thank you for your comments.
I was thinking since we already has immediate compaction setting by setting
min dirty ratio to 0, so I decide to use "0" as disabled state.
I am ok to go with -1(disable), 0 (immediate) options.

For the implementation, there are a few differences between mine and
"Xiaohe Dong"'s :
1) I used the estimated creation time of a log segment instead of largest
timestamp of a log to determine the compaction eligibility, because a log
segment might stay as an active segment up to "max compaction lag". (see
the KIP for detail).
2) I measure how much bytes that we must clean to follow the "max
compaction lag" rule, and use that to determine the order of compaction.
3) force active segment to roll to follow the "max compaction lag"

I can share my code so we can coordinate.

I haven't think about a new API to force a compaction. what is the use case
for this one?


On Wed, Aug 15, 2018 at 5:33 PM, Brett Rann 
wrote:

> We've been looking into this too.
>
> Mailing list:
> https://lists.apache.org/thread.html/ed7f6a6589f94e8c2a705553f364ef
> 599cb6915e4c3ba9b561e610e4@%3Cdev.kafka.apache.org%3E
> jira wish: https://issues.apache.org/jira/browse/KAFKA-7137
> confluent slack discussion:
> https://confluentcommunity.slack.com/archives/C49R61XMM/p153076012139
>
> A person on my team has started on code so you might want to coordinate:
> https://github.com/dongxiaohe/kafka/tree/dongxiaohe/log-
> cleaner-compaction-max-lifetime-2.0
>
>  He's been working with Jason Gustafson and James Chen around the changes.
> You can ping him on confluent slack as Xiaohe Dong.
>
> It's great to know others are thinking on it as well.
>
> You've added the requirement to force a segment roll which we hadn't gotten
> to yet, which is great. I was content with it not including the active
> segment.
>
> > Adding topic level configuration "max.compaction.lag.ms",  and
> corresponding broker configuration "log.cleaner.max.compaction.lag.ms",
> which is set to 0 (disabled) by default.
>
> Glancing at some other settings convention seems to me to be -1 for
> disabled (or infinite, which is more meaningful here).  0 to me implies
> instant, a little quicker than 1.
>
> We've been trying to think about a way to trigger compaction as well
> through an API call, which would need to be flagged somewhere (ZK admin/
> space?) but we're struggling to think how that would be coordinated across
> brokers and partitions.  Have you given any thought to that?
>
>
>
>
>
>
> On Thu, Aug 16, 2018 at 8:44 AM xiongqi wu  wrote:
>
> > Eno, Dong,
> >
> > I have updated the KIP. We decide not to address the issue that we might
> > have for both compaction and time retention enabled topics (see the
> > rejected alternative item 2). This KIP will only ensure log can be
> > compacted after a specified time-interval.
> >
> > As suggested by Dong, we will also enforce "max.compaction.lag.ms" is
> not
> > less than "min.compaction.lag.ms".
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-354: Time-based
> log
> > compaction policy
> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-354: Time-based
> log compaction policy>
> >
> >
> > On Tue, Aug 14, 2018 at 5:01 PM, xiongqi wu  wrote:
> >
> > >
> > > Per discussion with Dong, he made a very good point that if compaction
> > > and time based retention are both enabled on a topic, the compaction
> > might
> > > prevent records from being deleted on time. The reason is when
> compacting
> > > multiple segments into one single segment, the newly created segment
> will
> > > have same lastmodified timestamp as latest original segment. We lose
> the
> > > timestamp of all original segments except the last one. As a result,
> > > records might not be deleted as it should be through time based
> > retention.
> > >
> > > With the current KIP proposal, if we want to ensure timely deletion, we
> > > have the following configurations:
> > > 1) enable time based log compaction only : deletion is done though
> > > overriding the same key
> > > 2) enable time based log retention only: deletion is done though
> > > time-based retention
> > > 3) enable both log compaction and time based retention: Deletion is not
> > > guaranteed.
> > >
> > > Not sure if we have use case 3 and also want deletion to happen on
> time.
> > > There are several options to address deletion issue when enable both
> > > compaction and retention:
&

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-13 Thread xiongqi wu
Yes.  we want to enforce a max time interval from a message arrival time to
the time the corresponding log segment needs to be compacted.

Today, if the message arriving rate is low for a log compacted topic, the
dirty ratio increases very slowly. As a result, a log segment might be
un-compacted for a long time.

Xiongqi (Welsey) Wu

On Mon, Aug 13, 2018 at 2:46 PM, Guozhang Wang  wrote:

> Guess I need to carefully read the wiki page before asking :) Thanks!
>
> Another qq after reading the proposal: is it a complimentary to KIP-58 (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 58+-+Make+Log+Compaction+Point+Configurable),
> just that KIP-58 is a "upper-bound" on what messages can be compacted, and
> this is for a "lower-bound" on what messages NEED to be compacted?
>
>
> Guozhang
>
> On Mon, Aug 13, 2018 at 2:31 PM, xiongqi wu  wrote:
>
> > HI Guozhang,
> >
> > As I mentioned in the motivation section, KIP-280 focuses on how to
> compact
> > the log segment to resolve the out of order messages compaction issue.
> > The issue we try to address in this KIP is different:  we want to
> introduce
> > a compaction policy so that a log segment can be pickup for compaction
> > after a specified time interval.  One use case is for GDPR to ensure
> timely
> > deletion of user record.
> >
> > There is no conflict and overlapping between this KIP and KIP-280.
> >
> > Thank you!
> >
> >
> > On Mon, Aug 13, 2018 at 1:33 PM, Guozhang Wang 
> wrote:
> >
> > > Hello Xiongqi,
> > >
> > > I think this KIP is already been covered in KIP-280? Could you check
> out
> > > that one and see if it is the case.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Aug 13, 2018 at 1:23 PM, xiongqi wu 
> wrote:
> > >
> > > > Hi Kafka,
> > > >
> > > > Just updated the confluence page to include the link to this KIP.
> > > >
> > > > Any comment will be appreciated:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A
> > > > +Time-based+log+compaction+policy
> > > >
> > > > Thank you.
> > > >
> > > > Xiongqi (Wesley) Wu
> > > >
> > > > On Thu, Aug 9, 2018 at 4:18 PM, xiongqi wu 
> > wrote:
> > > >
> > > > > Hi Kafka,
> > > > >
> > > > > This KIP tries to address GDPR concern to fulfill deletion request
> on
> > > > time
> > > > > through time-based log compaction on a compaction enabled topic:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 354%3A+Time-based+log+compaction+policy
> > > > >
> > > > > Any feedback will be appreciated.
> > > > >
> > > > >
> > > > > Xiongqi (Wesley) Wu
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-13 Thread xiongqi wu
Hi Eno,

The GDPR request we are getting here at linkedin is if we get a request to
delete a record through a null key on a log compacted topic,
we want to delete the record via compaction in a given time period like 2
days (whatever is required by the policy).

There might be other issues (such as orphan log segments under certain
conditions)  that lead to GDPR problem but they are more like something we
need to fix anyway regardless of GDPR.


-- Xiongqi (Wesley) Wu

On Mon, Aug 13, 2018 at 2:56 PM, Eno Thereska 
wrote:

> Hello,
>
> Thanks for the KIP. I'd like to see a more precise definition of what part
> of GDPR you are targeting as well as some sort of verification that this
> KIP actually addresses the problem. Right now I find this a bit vague:
>
> "Ability to delete a log message through compaction in a timely manner has
> become an important requirement in some use cases (e.g., GDPR)"
>
>
> Is there any guarantee that after this KIP the GDPR problem is solved or do
> we need to do something else as well, e.g., more KIPs?
>
>
> Thanks
>
> Eno
>
>
>
> On Thu, Aug 9, 2018 at 4:18 PM, xiongqi wu  wrote:
>
> > Hi Kafka,
> >
> > This KIP tries to address GDPR concern to fulfill deletion request on
> time
> > through time-based log compaction on a compaction enabled topic:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 354%3A+Time-based+log+compaction+policy
> >
> > Any feedback will be appreciated.
> >
> >
> > Xiongqi (Wesley) Wu
> >
>


Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-13 Thread xiongqi wu
HI Guozhang,

As I mentioned in the motivation section, KIP-280 focuses on how to compact
the log segment to resolve the out of order messages compaction issue.
The issue we try to address in this KIP is different:  we want to introduce
a compaction policy so that a log segment can be pickup for compaction
after a specified time interval.  One use case is for GDPR to ensure timely
deletion of user record.

There is no conflict and overlapping between this KIP and KIP-280.

Thank you!


On Mon, Aug 13, 2018 at 1:33 PM, Guozhang Wang  wrote:

> Hello Xiongqi,
>
> I think this KIP is already been covered in KIP-280? Could you check out
> that one and see if it is the case.
>
>
> Guozhang
>
>
> On Mon, Aug 13, 2018 at 1:23 PM, xiongqi wu  wrote:
>
> > Hi Kafka,
> >
> > Just updated the confluence page to include the link to this KIP.
> >
> > Any comment will be appreciated:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A
> > +Time-based+log+compaction+policy
> >
> > Thank you.
> >
> > Xiongqi (Wesley) Wu
> >
> > On Thu, Aug 9, 2018 at 4:18 PM, xiongqi wu  wrote:
> >
> > > Hi Kafka,
> > >
> > > This KIP tries to address GDPR concern to fulfill deletion request on
> > time
> > > through time-based log compaction on a compaction enabled topic:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 354%3A+Time-based+log+compaction+policy
> > >
> > > Any feedback will be appreciated.
> > >
> > >
> > > Xiongqi (Wesley) Wu
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-14 Thread xiongqi wu
Dong,

Thanks for the comment.

There are two retention policy: log compaction and time based retention.

Log compaction:

we have use cases to keep infinite retention of a topic (only compaction).
GDPR cares about deletion of PII  (personal identifiable information) data.
Since Kafka doesn't know what records contain PII, it relies on upper layer
to delete those records.
For those infinite retention uses uses,  kafka needs to provide a way to
enforce compaction on time. This is what we try to address in this KIP.

Time based retention,

There are also use cases that users of Kafka might want to expire all their
data.
In those cases, they can use time based retention of their topics.


Regarding your first question,  if a user wants to delete a key in the log
compaction topic,  the user has to send a deletion using the same key.
Kafka only makes sure the deletion will happen under a certain time periods
(like 2 days/7 days).

Regarding your second question.  In most cases, we might want to delete all
duplicated keys at the same time.
Compaction might be more efficient since we need to scan the log and find
all duplicates.  However,  the expected use case is to set the time based
compaction interval on the order of days,  and be larger than 'min
compaction lag".  We don't want log compaction to happen frequently since
it is expensive.  The purpose is to help low production rate topic to get
compacted on time.  For the topic with "normal" incoming message message
rate, the "min dirty ratio" might have triggered the compaction before this
time based compaction policy takes effect.


Eno,

For your question,  like I mentioned we have long time retention use case
for log compacted topic, but we want to provide ability to delete certain
PII records on time.
Kafka itself doesn't know whether a record contains sensitive information
and relies on the user for deletion.


On Mon, Aug 13, 2018 at 6:58 PM, Dong Lin  wrote:

> Hey Xiongqi,
>
> Thanks for the KIP. I have two questions regarding the use-case for meeting
> GDPR requirement.
>
> 1) If I recall correctly, one of the GDPR requirement is that we can not
> keep messages longer than e.g. 30 days in storage (e.g. Kafka). Say there
> exists a partition p0 which contains message1 with key1 and message2 with
> key2. And then user keeps producing messages with key=key2 to this
> partition. Since message1 with key1 is never overridden, sooner or later we
> will want to delete message1 and keep the latest message with key=key2. But
> currently it looks like log compact logic in Kafka will always put these
> messages in the same segment. Will this be an issue?
>
> 2) The current KIP intends to provide the capability to delete a given
> message in log compacted topic. Does such use-case also require Kafka to
> keep the messages produced before the given message? If yes, then we can
> probably just use AdminClient.deleteRecords() or time-based log retention
> to meet the use-case requirement. If no, do you know what is the GDPR's
> requirement on time-to-deletion after user explicitly requests the deletion
> (e.g. 1 hour, 1 day, 7 day)?
>
> Thanks,
> Dong
>
>
> On Mon, Aug 13, 2018 at 3:44 PM, xiongqi wu  wrote:
>
> > Hi Eno,
> >
> > The GDPR request we are getting here at linkedin is if we get a request
> to
> > delete a record through a null key on a log compacted topic,
> > we want to delete the record via compaction in a given time period like 2
> > days (whatever is required by the policy).
> >
> > There might be other issues (such as orphan log segments under certain
> > conditions)  that lead to GDPR problem but they are more like something
> we
> > need to fix anyway regardless of GDPR.
> >
> >
> > -- Xiongqi (Wesley) Wu
> >
> > On Mon, Aug 13, 2018 at 2:56 PM, Eno Thereska 
> > wrote:
> >
> > > Hello,
> > >
> > > Thanks for the KIP. I'd like to see a more precise definition of what
> > part
> > > of GDPR you are targeting as well as some sort of verification that
> this
> > > KIP actually addresses the problem. Right now I find this a bit vague:
> > >
> > > "Ability to delete a log message through compaction in a timely manner
> > has
> > > become an important requirement in some use cases (e.g., GDPR)"
> > >
> > >
> > > Is there any guarantee that after this KIP the GDPR problem is solved
> or
> > do
> > > we need to do something else as well, e.g., more KIPs?
> > >
> > >
> > > Thanks
> > >
> > > Eno
> > >
> > >
> > >
> > > On Thu, Aug 9, 2018 at 4:18 PM, xiongqi wu 
> wrote:
> > >
> > > > Hi Kafka,
> > > >
> > > > This KIP tries to address GDPR concern to fulfill deletion request on
> > > time
> > > > through time-based log compaction on a compaction enabled topic:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 354%3A+Time-based+log+compaction+policy
> > > >
> > > > Any feedback will be appreciated.
> > > >
> > > >
> > > > Xiongqi (Wesley) Wu
> > > >
> > >
> >
>



-- 
Xiongqi (Wesley) Wu


Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-14 Thread xiongqi wu
Per discussion with Dong,  he made a very good point that if compaction and
time based retention are both enabled on a topic, the compaction might
prevent records from being deleted on time.  The reason is when compacting
multiple segments into one single segment, the newly created segment will
have same lastmodified timestamp as latest original segment. We lose the
timestamp of all original segments except the last one. As a result,
records might not be deleted as it should be through time based retention.

With the current KIP proposal,   if we want to ensure timely deletion,  we
have the following configurations:
1) enable time based log compaction only :  deletion is done though
overriding the same key
2) enable time based log retention only: deletion is done though time-based
retention
3) enable both log compaction and time based retention:Deletion is not
guaranteed.

Not sure if we have use case 3 and also want deletion to happen on time.
There are several options to address deletion issue when enable both
compaction and retention:
A) During log compaction, looking into record timestamp to delete expired
records. This can be done in compaction logic itself or use
AdminClient.deleteRecords() . But this assumes we have record timestamp.
B) retain the lastModifed time of original segments during log compaction.
This requires extra meta data to record the information or not grouping
multiple segments into one during compaction.

If we have use case 3 in general,  I would prefer solution A and rely on
record timestamp.


Two questions:
Do we have use case 3? Is it nice to have or must have?
If we have use case 3 and want to go with solution A,  should we introduce
a new configuration to enforce deletion by timestamp?


On Tue, Aug 14, 2018 at 1:52 PM, xiongqi wu  wrote:

> Dong,
>
> Thanks for the comment.
>
> There are two retention policy: log compaction and time based retention.
>
> Log compaction:
>
> we have use cases to keep infinite retention of a topic (only
> compaction).  GDPR cares about deletion of PII  (personal identifiable
> information) data.
> Since Kafka doesn't know what records contain PII, it relies on upper
> layer to delete those records.
> For those infinite retention uses uses,  kafka needs to provide a way to
> enforce compaction on time. This is what we try to address in this KIP.
>
> Time based retention,
>
> There are also use cases that users of Kafka might want to expire all
> their data.
> In those cases, they can use time based retention of their topics.
>
>
> Regarding your first question,  if a user wants to delete a key in the log
> compaction topic,  the user has to send a deletion using the same key.
> Kafka only makes sure the deletion will happen under a certain time
> periods (like 2 days/7 days).
>
> Regarding your second question.  In most cases, we might want to delete
> all duplicated keys at the same time.
> Compaction might be more efficient since we need to scan the log and find
> all duplicates.  However,  the expected use case is to set the time based
> compaction interval on the order of days,  and be larger than 'min
> compaction lag".  We don't want log compaction to happen frequently since
> it is expensive.  The purpose is to help low production rate topic to get
> compacted on time.  For the topic with "normal" incoming message message
> rate, the "min dirty ratio" might have triggered the compaction before this
> time based compaction policy takes effect.
>
>
> Eno,
>
> For your question,  like I mentioned we have long time retention use case
> for log compacted topic, but we want to provide ability to delete certain
> PII records on time.
> Kafka itself doesn't know whether a record contains sensitive information
> and relies on the user for deletion.
>
>
> On Mon, Aug 13, 2018 at 6:58 PM, Dong Lin  wrote:
>
>> Hey Xiongqi,
>>
>> Thanks for the KIP. I have two questions regarding the use-case for
>> meeting
>> GDPR requirement.
>>
>> 1) If I recall correctly, one of the GDPR requirement is that we can not
>> keep messages longer than e.g. 30 days in storage (e.g. Kafka). Say there
>> exists a partition p0 which contains message1 with key1 and message2 with
>> key2. And then user keeps producing messages with key=key2 to this
>> partition. Since message1 with key1 is never overridden, sooner or later
>> we
>> will want to delete message1 and keep the latest message with key=key2.
>> But
>> currently it looks like log compact logic in Kafka will always put these
>> messages in the same segment. Will this be an issue?
>>
>> 2) The current KIP intends to provide the capability to delete a given
>> message in log compacted topic. Does such use-case als

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-16 Thread xiongqi wu
Hi Brett,

In my opinion, we don't need very accurate estimation of first record
timestamp because the compaction processing itself might take
some time. There is no strictly guaranteed that after the time interval
"T", the compaction is finished.  Rather than,
after time "T" + some compaction job interval,  the compaction will start
processing.
For this reason,  using the largest timestamp of previous segment to
estimate creation time of next segments looks sufficient to me.

One major principal is to not to introduce a lot IOs to those partitions
that doesn't require to be compacted.
I want to avoid looking into records or timestamp as much as possible.

Partition movement can alter the segment modification time.  However I see
this as the data is in a transit state.
After another time interval T when data is stabilized in new location, the
compaction will be launched.
We also use largestTimestamp in the time based retention.

On Wed, Aug 15, 2018 at 9:08 PM, Brett Rann 
wrote:

> > (segment.largestTimestamp is lastModified time of the log segment or max
> timestamp we see for the log segment. Due to the lack of record timestamp,
> segment.largestTimestamp might be earlier than the actual timestamp of
> latest record of that segment.).
>
> I'm curious about the mention of last modified time of the segment.  As
> noted back in here
> https://cwiki-test.apache.org/confluence/display/KAFKA/KIP-
> 33+-+Add+a+time+based+log+index
> using the creation/modified time of files is unreliable in cases of
> partitions being moved. It's why all those .timeindex files for each
> partition appeared in 0.10.* I wonder if there's a better way using that to
> get at a timestamp?
>
> On Thu, Aug 16, 2018 at 11:30 AM Brett Rann  wrote:
>
> > An API was suggested by Gwen and James when I discussed it with them. For
> > me I can think of it as a use case for scheduling compaction rather than
> > relying on an config based time trigger.  We're looking at creating some
> > potentially very large compacted topics for event sourcing and from an
> > operators perspective I'm concerned about potential impact of long
> running
> > compaction, especially if multiple topics run back to back.  Having the
> > ability to schedule them at my own time gives some peace of mind for that
> > concern.  Another use case might be a more urgent delete. That could be
> > done manually now by just modifying the max time and waiting for
> compaction
> > to run. But hitting an API end point is a bit cleaner.
> >
> > But in thinking about what that mechanism would be it started to feel
> like
> > it would be a complicated implementation so we've put it aside for now.
> But
> > maybe we just haven't seen the clean way yet.
> >
> >
> >
> > On Thu, Aug 16, 2018 at 11:22 AM xiongqi wu  wrote:
> >
> >> Brett,
> >>
> >> Thank you for your comments.
> >> I was thinking since we already has immediate compaction setting by
> >> setting
> >> min dirty ratio to 0, so I decide to use "0" as disabled state.
> >> I am ok to go with -1(disable), 0 (immediate) options.
> >>
> >> For the implementation, there are a few differences between mine and
> >> "Xiaohe Dong"'s :
> >> 1) I used the estimated creation time of a log segment instead of
> largest
> >> timestamp of a log to determine the compaction eligibility, because a
> log
> >> segment might stay as an active segment up to "max compaction lag". (see
> >> the KIP for detail).
> >> 2) I measure how much bytes that we must clean to follow the "max
> >> compaction lag" rule, and use that to determine the order of compaction.
> >> 3) force active segment to roll to follow the "max compaction lag"
> >>
> >> I can share my code so we can coordinate.
> >>
> >> I haven't think about a new API to force a compaction. what is the use
> >> case
> >> for this one?
> >>
> >>
> >> On Wed, Aug 15, 2018 at 5:33 PM, Brett Rann 
> >> wrote:
> >>
> >> > We've been looking into this too.
> >> >
> >> > Mailing list:
> >> > https://lists.apache.org/thread.html/ed7f6a6589f94e8c2a705553f364ef
> >> <https://lists.apache.org/thread.html/ed7f6a6589f94e8c2a705553f364ef>
> >> > 599cb6915e4c3ba9b561e610e4@%3Cdev.kafka.apache.org%3E
> >> > jira wish: https://issues.apache.org/jira/browse/KAFKA-7137
> >> <https://issues.apache.org/jira/browse/KAFKA-7137>
> >> > confluent slack discussion:
> >> >
> >>

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-16 Thread xiongqi wu
Hi Xiaohe,

Quick note:
1) Use minimum of segment.ms and max.compaction.lag.ms
<http://max.compaction.ms>

2) I am not sure if I get your second question.  first, we have jitter when
we roll the active segment. second, on each  compaction, we compact upto
the offsetmap could allow.  Those will not lead to perfect compaction storm
overtime.  In addition, I expect we are setting max.compaction.lag.ms on
the order of days.

3) I don't have access to the confluent community slack for now. I am
reachable via the google handle out.
To avoid the double effort, here is my plan:
a) Collect more feedback and feature requriement on the KIP.
b) Wait unitl this KIP is approved.
c) I will address any additional requirements in the implementation.  (My
current implementation only complies to whatever described in the KIP now)
d) I can share the code with the you and community see you want to add
anything.
e) submission through committee


On Wed, Aug 15, 2018 at 11:42 PM, XIAOHE DONG  wrote:

> Hi Xiongqi
>
> Thanks for thinking about implementing this as well. :)
>
> I was thinking about using `segment.ms` to trigger the segment roll.
> Also, its value can be the largest time bias for the record deletion. For
> example, if the `segment.ms` is 1 day and `max.compaction.ms` is 30 days,
> the compaction may happen around 31 days.
>
> For my curiosity, is there a way we can do some performance test for this
> and any tools you can recommend. As you know, previously, it is cleaned up
> by respecting dirty ratio, but now it may happen anytime if max lag has
> passed for each message. I wonder what would happen if clients send huge
> amount of tombstone records at the same time.
>
> I am looking forward to have a quick chat with you to avoid double effort
> on this. I am in confluent community slack during the work time. My name is
> Xiaohe Dong. :)
>
> Rgds
> Xiaohe Dong
>
>
>
> On 2018/08/16 01:22:22, xiongqi wu  wrote:
> > Brett,
> >
> > Thank you for your comments.
> > I was thinking since we already has immediate compaction setting by
> setting
> > min dirty ratio to 0, so I decide to use "0" as disabled state.
> > I am ok to go with -1(disable), 0 (immediate) options.
> >
> > For the implementation, there are a few differences between mine and
> > "Xiaohe Dong"'s :
> > 1) I used the estimated creation time of a log segment instead of largest
> > timestamp of a log to determine the compaction eligibility, because a log
> > segment might stay as an active segment up to "max compaction lag". (see
> > the KIP for detail).
> > 2) I measure how much bytes that we must clean to follow the "max
> > compaction lag" rule, and use that to determine the order of compaction.
> > 3) force active segment to roll to follow the "max compaction lag"
> >
> > I can share my code so we can coordinate.
> >
> > I haven't think about a new API to force a compaction. what is the use
> case
> > for this one?
> >
> >
> > On Wed, Aug 15, 2018 at 5:33 PM, Brett Rann 
> > wrote:
> >
> > > We've been looking into this too.
> > >
> > > Mailing list:
> > > https://lists.apache.org/thread.html/ed7f6a6589f94e8c2a705553f364ef
> > > 599cb6915e4c3ba9b561e610e4@%3Cdev.kafka.apache.org%3E
> > > jira wish: https://issues.apache.org/jira/browse/KAFKA-7137
> > > confluent slack discussion:
> > > https://confluentcommunity.slack.com/archives/C49R61XMM/
> p153076012139
> > >
> > > A person on my team has started on code so you might want to
> coordinate:
> > > https://github.com/dongxiaohe/kafka/tree/dongxiaohe/log-
> > > cleaner-compaction-max-lifetime-2.0
> > >
> > >  He's been working with Jason Gustafson and James Chen around the
> changes.
> > > You can ping him on confluent slack as Xiaohe Dong.
> > >
> > > It's great to know others are thinking on it as well.
> > >
> > > You've added the requirement to force a segment roll which we hadn't
> gotten
> > > to yet, which is great. I was content with it not including the active
> > > segment.
> > >
> > > > Adding topic level configuration "max.compaction.lag.ms",  and
> > > corresponding broker configuration "log.cleaner.max.compaction.lag.ms
> ",
> > > which is set to 0 (disabled) by default.
> > >
> > > Glancing at some other settings convention seems to me to be -1 for
> > > disabled (or infinite, which is more meaningful here).  0 to me implies
> > > instant, a little quicker than 1.
> > >
> > > We've been

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-16 Thread xiongqi wu
1,  Owner of data (in this sense, kafka is the not the owner of data)
should keep track of lifecycle of the data in some external storage/DB.
The owner determines when to delete the data and send the delete request to
kafka. Kafka doesn't know about the content of data but to provide a mean
for deletion.

2 ,  each time compaction runs, it will start from first segments (no
matter if it is compacted or not).  The time estimation here is only used
to determine whether we should run compaction on this log partition.  So we
only need to estimate uncompacted segments.

On Thu, Aug 16, 2018 at 5:35 PM, Dong Lin  wrote:

> Hey Xiongqi,
>
> Thanks for the update. I have two questions for the latest KIP.
>
> 1) The motivation section says that one use case is to delete PII (Personal
> Identifiable information) data within 7 days while keeping non-PII
> indefinitely in compacted format. I suppose the use-case depends on the
> application to determine when to delete those PII data. Could you explain
> how can application reliably determine the set of keys that should be
> deleted? Is application required to always messages from the topic after
> every restart and determine the keys to be deleted by looking at message
> timestamp, or is application supposed to persist the key-> timstamp
> information in a separate persistent storage system?
>
> 2) It is mentioned in the KIP that "we only need to estimate earliest
> message timestamp for un-compacted log segments because the deletion
> requests that belong to compacted segments have already been processed".
> Not sure if it is correct. If a segment is compacted before user sends
> message to delete a key in this segment, it seems that we still need to
> ensure that the segment will be compacted again within the given time after
> the deletion is requested, right?
>
> Thanks,
> Dong
>
> On Thu, Aug 16, 2018 at 10:27 AM, xiongqi wu  wrote:
>
> > Hi Xiaohe,
> >
> > Quick note:
> > 1) Use minimum of segment.ms and max.compaction.lag.ms
> > <http://max.compaction.ms>
> >
> > 2) I am not sure if I get your second question.  first, we have jitter
> when
> > we roll the active segment. second, on each  compaction, we compact upto
> > the offsetmap could allow.  Those will not lead to perfect compaction
> storm
> > overtime.  In addition, I expect we are setting max.compaction.lag.ms on
> > the order of days.
> >
> > 3) I don't have access to the confluent community slack for now. I am
> > reachable via the google handle out.
> > To avoid the double effort, here is my plan:
> > a) Collect more feedback and feature requriement on the KIP.
> > b) Wait unitl this KIP is approved.
> > c) I will address any additional requirements in the implementation.  (My
> > current implementation only complies to whatever described in the KIP
> now)
> > d) I can share the code with the you and community see you want to add
> > anything.
> > e) submission through committee
> >
> >
> > On Wed, Aug 15, 2018 at 11:42 PM, XIAOHE DONG 
> > wrote:
> >
> > > Hi Xiongqi
> > >
> > > Thanks for thinking about implementing this as well. :)
> > >
> > > I was thinking about using `segment.ms` to trigger the segment roll.
> > > Also, its value can be the largest time bias for the record deletion.
> For
> > > example, if the `segment.ms` is 1 day and `max.compaction.ms` is 30
> > days,
> > > the compaction may happen around 31 days.
> > >
> > > For my curiosity, is there a way we can do some performance test for
> this
> > > and any tools you can recommend. As you know, previously, it is cleaned
> > up
> > > by respecting dirty ratio, but now it may happen anytime if max lag has
> > > passed for each message. I wonder what would happen if clients send
> huge
> > > amount of tombstone records at the same time.
> > >
> > > I am looking forward to have a quick chat with you to avoid double
> effort
> > > on this. I am in confluent community slack during the work time. My
> name
> > is
> > > Xiaohe Dong. :)
> > >
> > > Rgds
> > > Xiaohe Dong
> > >
> > >
> > >
> > > On 2018/08/16 01:22:22, xiongqi wu  wrote:
> > > > Brett,
> > > >
> > > > Thank you for your comments.
> > > > I was thinking since we already has immediate compaction setting by
> > > setting
> > > > min dirty ratio to 0, so I decide to use "0" as disabled state.
> > > > I am ok to go with -1(disable), 0 (immediate) options.
> >

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-16 Thread xiongqi wu
on 2)
The offsetmap is built starting from dirty segment.
The compaction starts from the beginning of the log partition.  That's how
it ensure the deletion of tomb keys.
I will double check tomorrow.

Xiongqi (Wesley) Wu


On Thu, Aug 16, 2018 at 6:46 PM Brett Rann 
wrote:

> To just clarify a bit on 1.  whether there's an external storage/DB isn't
> relevant here.
> Compacted topics allow a tombstone record to be sent (a null value for a
> key) which
> currently will result in old values for that key being deleted if some
> conditions are met.
> There are existing controls to make sure the old values will stay around
> for a minimum
> time at least, but no dedicated control to ensure the tombstone will delete
> within a
> maximum time.
>
> One popular reason that maximum time for deletion is desirable right now is
> GDPR with
> PII. But we're not proposing any GDPR awareness in kafka, just being able
> to guarantee
> a max time where a tombstoned key will be removed from the compacted topic.
>
> on 2)
> huh, i thought it kept track of the first dirty segment and didn't
> recompact older "clean" ones.
> But I didn't look at code or test for that.
>
> On Fri, Aug 17, 2018 at 10:57 AM xiongqi wu  wrote:
>
> > 1, Owner of data (in this sense, kafka is the not the owner of data)
> > should keep track of lifecycle of the data in some external storage/DB.
> > The owner determines when to delete the data and send the delete request
> to
> > kafka. Kafka doesn't know about the content of data but to provide a mean
> > for deletion.
> >
> > 2 , each time compaction runs, it will start from first segments (no
> > matter if it is compacted or not). The time estimation here is only used
> > to determine whether we should run compaction on this log partition. So
> we
> > only need to estimate uncompacted segments.
> >
> > On Thu, Aug 16, 2018 at 5:35 PM, Dong Lin  wrote:
> >
> > > Hey Xiongqi,
> > >
> > > Thanks for the update. I have two questions for the latest KIP.
> > >
> > > 1) The motivation section says that one use case is to delete PII
> > (Personal
> > > Identifiable information) data within 7 days while keeping non-PII
> > > indefinitely in compacted format. I suppose the use-case depends on the
> > > application to determine when to delete those PII data. Could you
> explain
> > > how can application reliably determine the set of keys that should be
> > > deleted? Is application required to always messages from the topic
> after
> > > every restart and determine the keys to be deleted by looking at
> message
> > > timestamp, or is application supposed to persist the key-> timstamp
> > > information in a separate persistent storage system?
> > >
> > > 2) It is mentioned in the KIP that "we only need to estimate earliest
> > > message timestamp for un-compacted log segments because the deletion
> > > requests that belong to compacted segments have already been
> processed".
> > > Not sure if it is correct. If a segment is compacted before user sends
> > > message to delete a key in this segment, it seems that we still need to
> > > ensure that the segment will be compacted again within the given time
> > after
> > > the deletion is requested, right?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Thu, Aug 16, 2018 at 10:27 AM, xiongqi wu 
> > wrote:
> > >
> > > > Hi Xiaohe,
> > > >
> > > > Quick note:
> > > > 1) Use minimum of segment.ms and max.compaction.lag.ms
> > > > <http://max.compaction.ms
> > <http://max.compaction.ms>>
> > > >
> > > > 2) I am not sure if I get your second question. first, we have jitter
> > > when
> > > > we roll the active segment. second, on each compaction, we compact
> upto
> > > > the offsetmap could allow. Those will not lead to perfect compaction
> > > storm
> > > > overtime. In addition, I expect we are setting max.compaction.lag.ms
> > on
> > > > the order of days.
> > > >
> > > > 3) I don't have access to the confluent community slack for now. I am
> > > > reachable via the google handle out.
> > > > To avoid the double effort, here is my plan:
> > > > a) Collect more feedback and feature requriement on the KIP.
> > > > b) Wait unitl this KIP is approved.
> > > > c) I will address any additional requirements in the implementation.
> > (My
> > > > curren

[DISCUSS] KIP-354 Time-based log compaction policy

2018-08-09 Thread xiongqi wu
Hi Kafka,

This KIP tries to address GDPR concern to fulfill deletion request on time
through time-based log compaction on a compaction enabled topic:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy

Any feedback will be appreciated.


Xiongqi (Wesley) Wu


Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-08-27 Thread xiongqi wu
Hi All,

Do you have any additional comments on this KIP?


On Thu, Aug 16, 2018 at 9:17 PM, xiongqi wu  wrote:

> on 2)
> The offsetmap is built starting from dirty segment.
> The compaction starts from the beginning of the log partition.  That's how
> it ensure the deletion of tomb keys.
> I will double check tomorrow.
>
> Xiongqi (Wesley) Wu
>
>
> On Thu, Aug 16, 2018 at 6:46 PM Brett Rann 
> wrote:
>
>> To just clarify a bit on 1.  whether there's an external storage/DB isn't
>> relevant here.
>> Compacted topics allow a tombstone record to be sent (a null value for a
>> key) which
>> currently will result in old values for that key being deleted if some
>> conditions are met.
>> There are existing controls to make sure the old values will stay around
>> for a minimum
>> time at least, but no dedicated control to ensure the tombstone will
>> delete
>> within a
>> maximum time.
>>
>> One popular reason that maximum time for deletion is desirable right now
>> is
>> GDPR with
>> PII. But we're not proposing any GDPR awareness in kafka, just being able
>> to guarantee
>> a max time where a tombstoned key will be removed from the compacted
>> topic.
>>
>> on 2)
>> huh, i thought it kept track of the first dirty segment and didn't
>> recompact older "clean" ones.
>> But I didn't look at code or test for that.
>>
>> On Fri, Aug 17, 2018 at 10:57 AM xiongqi wu  wrote:
>>
>> > 1, Owner of data (in this sense, kafka is the not the owner of data)
>> > should keep track of lifecycle of the data in some external storage/DB.
>> > The owner determines when to delete the data and send the delete
>> request to
>> > kafka. Kafka doesn't know about the content of data but to provide a
>> mean
>> > for deletion.
>> >
>> > 2 , each time compaction runs, it will start from first segments (no
>> > matter if it is compacted or not). The time estimation here is only used
>> > to determine whether we should run compaction on this log partition. So
>> we
>> > only need to estimate uncompacted segments.
>> >
>> > On Thu, Aug 16, 2018 at 5:35 PM, Dong Lin  wrote:
>> >
>> > > Hey Xiongqi,
>> > >
>> > > Thanks for the update. I have two questions for the latest KIP.
>> > >
>> > > 1) The motivation section says that one use case is to delete PII
>> > (Personal
>> > > Identifiable information) data within 7 days while keeping non-PII
>> > > indefinitely in compacted format. I suppose the use-case depends on
>> the
>> > > application to determine when to delete those PII data. Could you
>> explain
>> > > how can application reliably determine the set of keys that should be
>> > > deleted? Is application required to always messages from the topic
>> after
>> > > every restart and determine the keys to be deleted by looking at
>> message
>> > > timestamp, or is application supposed to persist the key-> timstamp
>> > > information in a separate persistent storage system?
>> > >
>> > > 2) It is mentioned in the KIP that "we only need to estimate earliest
>> > > message timestamp for un-compacted log segments because the deletion
>> > > requests that belong to compacted segments have already been
>> processed".
>> > > Not sure if it is correct. If a segment is compacted before user sends
>> > > message to delete a key in this segment, it seems that we still need
>> to
>> > > ensure that the segment will be compacted again within the given time
>> > after
>> > > the deletion is requested, right?
>> > >
>> > > Thanks,
>> > > Dong
>> > >
>> > > On Thu, Aug 16, 2018 at 10:27 AM, xiongqi wu 
>> > wrote:
>> > >
>> > > > Hi Xiaohe,
>> > > >
>> > > > Quick note:
>> > > > 1) Use minimum of segment.ms and max.compaction.lag.ms
>> > > > <http://max.compaction.ms
>> > <http://max.compaction.ms>>
>> > > >
>> > > > 2) I am not sure if I get your second question. first, we have
>> jitter
>> > > when
>> > > > we roll the active segment. second, on each compaction, we compact
>> upto
>> > > > the offsetmap could allow. Those will not lead to perfect compaction
>> > > storm
>> > > > overtime. In addition, I expect

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-04 Thread xiongqi wu
Colin,

I will keep the title for now, since all the previous discussions and links
are tied to this title.

I can change the title at the end or add a clarification note in the doc.

Xiongqi (Wesley) Wu


On Tue, Sep 4, 2018 at 5:47 PM xiongqi wu  wrote:

> Colin,
> Thank you for comments.
> see my inline reply below.
>
> Xiongqi (Wesley) Wu
>
>
> On Tue, Sep 4, 2018 at 5:24 PM Colin McCabe  wrote:
>
>> Hi Xiongqi,
>>
>> Thanks for this KIP.
>>
>> The name seems a bit ambiguous.  Our compaction policies are already
>> time-based, after all.  It seems like this change is focused around adding
>> a “max.compaction.lag.ms."  Perhaps the KIP title should be something
>> like "add maximum compaction lag time"?
>>
>> ==> sure. I will change the title.
>
> > The active segment is forced to roll when either "max.compaction.lag.ms"
>> > or "segment.ms" (log.roll.ms and log.roll.hours) has reached.
>>
>> If the max.compaction.lag.ms is low, it seems like segments will be
>> rolled very frequently.  This can be a source of problems in the cluster,
>> since creating many different small log segments consumes a huge amount of
>> cluster resources.  Therefore, I would suggest adding a broker-level
>> configuration which allows us to set a minimum value for
>> max.compaction.lag.ms.  If we let users set it on a per-topic basis,
>> someone could set a value of 1 ms or something, and cause chaos.
>>
>> =>  this applies to segment.ms as well. Today users can set "
> segment.ms" to a very low value, and cause a frequent rolling of active
> segments.  In my option, the minimum of "max.compaction.lag.ms" should be
> based on the minimum of "segment.ms".  Since today the minimum of
> segment.ms is 1, "max.compaction.lag.ms" also starts with 1.  "0" means
> disable.  I can use -1 as disable, but it is hard to define the meaning of
> 0 because we cannot just roll the active segment immediately.
>
>  > -- Note that an alternative configuration is to use -1 as "disabled"
>> and 0
>>  > as "immediate compaction". Because compaction lag is still determined
>>  > based on min.compaction.lag and how long to roll an active segment,
>> the
>>  > actual lag for compaction is undetermined if we use "0".  On the other
>>  > hand, we can already set "min.cleanable.dirty.ratio" to achieve the
>> same
>>  > goal.  So here we choose "0" as "disabled".
>>
>> I would prefer -1 to be the invalid setting.  Treating 0 differently than
>> 1 seems strange to me.
>>
>> => see my previous comment,  I am not strongly against, but 0 is not
> a valid configuration in my option. So I use "0" as disabled state.
>
> best,
>> Colin
>>
>>
>> On Tue, Sep 4, 2018, at 15:04, xiongqi wu wrote:
>> > Let's VOTE for this KIP.
>> > KIP:
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-354
>> > %3A+Time-based+log+compaction+policy
>> >
>> > Implementation:
>> >
>> > https://github.com/apache/kafka/pull/5611
>> >
>> >
>> >
>> > Xiongqi (Wesley) Wu
>>
>


Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-04 Thread xiongqi wu
Colin,
Thank you for comments.
see my inline reply below.

Xiongqi (Wesley) Wu


On Tue, Sep 4, 2018 at 5:24 PM Colin McCabe  wrote:

> Hi Xiongqi,
>
> Thanks for this KIP.
>
> The name seems a bit ambiguous.  Our compaction policies are already
> time-based, after all.  It seems like this change is focused around adding
> a “max.compaction.lag.ms."  Perhaps the KIP title should be something
> like "add maximum compaction lag time"?
>
> ==> sure. I will change the title.

> The active segment is forced to roll when either "max.compaction.lag.ms"
> > or "segment.ms" (log.roll.ms and log.roll.hours) has reached.
>
> If the max.compaction.lag.ms is low, it seems like segments will be
> rolled very frequently.  This can be a source of problems in the cluster,
> since creating many different small log segments consumes a huge amount of
> cluster resources.  Therefore, I would suggest adding a broker-level
> configuration which allows us to set a minimum value for
> max.compaction.lag.ms.  If we let users set it on a per-topic basis,
> someone could set a value of 1 ms or something, and cause chaos.
>
> =>  this applies to segment.ms as well. Today users can set "
segment.ms" to a very low value, and cause a frequent rolling of active
segments.  In my option, the minimum of "max.compaction.lag.ms" should be
based on the minimum of "segment.ms".  Since today the minimum of segment.ms
is 1, "max.compaction.lag.ms" also starts with 1.  "0" means disable.  I
can use -1 as disable, but it is hard to define the meaning of 0 because we
cannot just roll the active segment immediately.

 > -- Note that an alternative configuration is to use -1 as "disabled" and
> 0
>  > as "immediate compaction". Because compaction lag is still determined
>  > based on min.compaction.lag and how long to roll an active segment,
> the
>  > actual lag for compaction is undetermined if we use "0".  On the other
>  > hand, we can already set "min.cleanable.dirty.ratio" to achieve the
> same
>  > goal.  So here we choose "0" as "disabled".
>
> I would prefer -1 to be the invalid setting.  Treating 0 differently than
> 1 seems strange to me.
>
> => see my previous comment,  I am not strongly against, but 0 is not a
valid configuration in my option. So I use "0" as disabled state.

best,
> Colin
>
>
> On Tue, Sep 4, 2018, at 15:04, xiongqi wu wrote:
> > Let's VOTE for this KIP.
> > KIP:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-354
> > %3A+Time-based+log+compaction+policy
> >
> > Implementation:
> >
> > https://github.com/apache/kafka/pull/5611
> >
> >
> >
> > Xiongqi (Wesley) Wu
>


Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-04 Thread xiongqi wu
Thanks for comments.

Today, when creating topic, client only does simple local validation
and doesn't check against broker's configurations.

We cannot just let users to create a configuration in zookeeper and
dishonor the user's choice in broker side.

I agree we need a better way to enforce the right value is set such as
segment.ms, but it is not through a simple override in the broker side.

If you have better solution, let me know.  If it is require more
discussions,  I would rather track this issue outside this KIP.


Xiongqi (Wesley) Wu


On Tue, Sep 4, 2018 at 6:38 PM Colin McCabe  wrote:

> On Tue, Sep 4, 2018, at 17:47, xiongqi wu wrote:
> > Colin,
> > Thank you for comments.
> > see my inline reply below.
> >
> > Xiongqi (Wesley) Wu
> >
> >
> > On Tue, Sep 4, 2018 at 5:24 PM Colin McCabe  wrote:
> >
> > > Hi Xiongqi,
> > >
> > > Thanks for this KIP.
> > >
> > > The name seems a bit ambiguous.  Our compaction policies are already
> > > time-based, after all.  It seems like this change is focused around
> adding
> > > a “max.compaction.lag.ms."  Perhaps the KIP title should be something
> > > like "add maximum compaction lag time"?
> > >
> > > ==> sure. I will change the title.
> >
> > > The active segment is forced to roll when either "
> max.compaction.lag.ms"
> > > > or "segment.ms" (log.roll.ms and log.roll.hours) has reached.
> > >
> > > If the max.compaction.lag.ms is low, it seems like segments will be
> > > rolled very frequently.  This can be a source of problems in the
> cluster,
> > > since creating many different small log segments consumes a huge
> amount of
> > > cluster resources.  Therefore, I would suggest adding a broker-level
> > > configuration which allows us to set a minimum value for
> > > max.compaction.lag.ms.  If we let users set it on a per-topic basis,
> > > someone could set a value of 1 ms or something, and cause chaos.
> > >
> > > =>  this applies to segment.ms as well. Today users can set "
> > segment.ms" to a very low value, and cause a frequent rolling of active
> > segments.
>
> Hi Xiongqi,
>
> I agree that this is an existing problem with segment.ms.  However, that
> doesn't mean that we shouldn't fix it.  As you noted, there will be more
> interest in these topic-level retention settings as a result of GDPR.  It
> seems likely that pre-existing problems will cause more trouble.
>
> The fix seems relatively straightforward here -- add a broker-level
> minimum segment.ms that overrides per-topic minimums.  We can also fail
> with a helpful error message when someone attempts to set an invalid
> configuration.
>
> >  In my option, the minimum of "max.compaction.lag.ms" should be
> > based on the minimum of "segment.ms".  Since today the minimum of
> segment.ms
> > is 1, "max.compaction.lag.ms" also starts with 1.  "0" means disable.  I
> > can use -1 as disable, but it is hard to define the meaning of 0 because
> we
> > cannot just roll the active segment immediately.
>
> That's a fair point.  We should make 0 = disable, to be consistent with
> the other settings.
>
> best,
> Colin
>
> >
> >  > -- Note that an alternative configuration is to use -1 as "disabled"
> and
> > > 0
> > >  > as "immediate compaction". Because compaction lag is still
> determined
> > >  > based on min.compaction.lag and how long to roll an active segment,
> > > the
> > >  > actual lag for compaction is undetermined if we use "0".  On the
> other
> > >  > hand, we can already set "min.cleanable.dirty.ratio" to achieve the
> > > same
> > >  > goal.  So here we choose "0" as "disabled".
> > >
> > > I would prefer -1 to be the invalid setting.  Treating 0 differently
> than
> > > 1 seems strange to me.
> > >
> > > => see my previous comment,  I am not strongly against, but 0 is
> not a
> > valid configuration in my option. So I use "0" as disabled state.
> >
> > best,
> > > Colin
> > >
> > >
> > > On Tue, Sep 4, 2018, at 15:04, xiongqi wu wrote:
> > > > Let's VOTE for this KIP.
> > > > KIP:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-354
> > > > %3A+Time-based+log+compaction+policy
> > > >
> > > > Implementation:
> > > >
> > > > https://github.com/apache/kafka/pull/5611
> > > >
> > > >
> > > >
> > > > Xiongqi (Wesley) Wu
> > >
>


[VOTE] KIP-354 Time-based log compaction policy

2018-09-04 Thread xiongqi wu
Let's VOTE for this KIP.
KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-354
%3A+Time-based+log+compaction+policy

Implementation:

https://github.com/apache/kafka/pull/5611



Xiongqi (Wesley) Wu


Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-05 Thread xiongqi wu
If we use 0 to indicate immediate compaction, the compaction lag is
determined by segment.ms in worst case.  If segment.ms is 24 hours,
"immediate compaction" is a weaker guarantee than setting any value less
than 24 hours.  By the definition of "max compaction lag",  we cannot have
zero lag.   So I use 0 to indicate "disable".



Xiongqi (Wesley) Wu


On Wed, Sep 5, 2018 at 8:34 AM Colin McCabe  wrote:

> On Tue, Sep 4, 2018, at 22:11, Brett Rann wrote:
> > > That's a fair point. We should make 0 = disable, to be consistent with
> > the other settings.
> >
> > -1 is used elsewhere for disable and when seeing it in a config it's
> clear
> > that it's a special meaning. 0 doesn't have to mean instant, it just
> means
> > as quickly as possible. I don't think 0 is intuitive for disabled and it
> > will be confusing.  I wasn't aware segment.ms=0 == disabled, but I think
> > that is also unintuitive.
>
> I think there is an argument for keeping these two configurations
> consistent, since they are so similar.  I agree that 0 was an unfortunate
> choice.,
>
> best,
> Colin
>
> >
> > On Wed, Sep 5, 2018 at 11:38 AM Colin McCabe  wrote:
> >
> > > On Tue, Sep 4, 2018, at 17:47, xiongqi wu wrote:
> > > > Colin,
> > > > Thank you for comments.
> > > > see my inline reply below.
> > > >
> > > > Xiongqi (Wesley) Wu
> > > >
> > > >
> > > > On Tue, Sep 4, 2018 at 5:24 PM Colin McCabe 
> wrote:
> > > >
> > > > > Hi Xiongqi,
> > > > >
> > > > > Thanks for this KIP.
> > > > >
> > > > > The name seems a bit ambiguous. Our compaction policies are already
> > > > > time-based, after all. It seems like this change is focused around
> > > adding
> > > > > a “max.compaction.lag.ms." Perhaps the KIP title should be
> something
> > > > > like "add maximum compaction lag time"?
> > > > >
> > > > > ==> sure. I will change the title.
> > > >
> > > > > The active segment is forced to roll when either "
> > > max.compaction.lag.ms"
> > > > > > or "segment.ms" (log.roll.ms and log.roll.hours) has reached.
> > > > >
> > > > > If the max.compaction.lag.ms is low, it seems like segments will
> be
> > > > > rolled very frequently. This can be a source of problems in the
> > > cluster,
> > > > > since creating many different small log segments consumes a huge
> > > amount of
> > > > > cluster resources. Therefore, I would suggest adding a broker-level
> > > > > configuration which allows us to set a minimum value for
> > > > > max.compaction.lag.ms. If we let users set it on a per-topic
> basis,
> > > > > someone could set a value of 1 ms or something, and cause chaos.
> > > > >
> > > > > => this applies to segment.ms as well. Today users can
> set "
> > > > segment.ms" to a very low value, and cause a frequent rolling of
> active
> > > > segments.
> > >
> > > Hi Xiongqi,
> > >
> > > I agree that this is an existing problem with segment.ms. However,
> that
> > > doesn't mean that we shouldn't fix it. As you noted, there will be more
> > > interest in these topic-level retention settings as a result of GDPR.
> It
> > > seems likely that pre-existing problems will cause more trouble.
> > >
> > > The fix seems relatively straightforward here -- add a broker-level
> > > minimum segment.ms that overrides per-topic minimums. We can also fail
> > > with a helpful error message when someone attempts to set an invalid
> > > configuration.
> > >
> > > > In my option, the minimum of "max.compaction.lag.ms" should be
> > > > based on the minimum of "segment.ms". Since today the minimum of
> > > segment.ms
> > > > is 1, "max.compaction.lag.ms" also starts with 1. "0" means
> disable. I
> > > > can use -1 as disable, but it is hard to define the meaning of 0
> because
> > > we
> > > > cannot just roll the active segment immediately.
> > >
> > > That's a fair point. We should make 0 = disable, to be consistent with
> the
> > > other settings.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > &g

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-05 Thread xiongqi wu
Colin,

When a user creates a topic, the user doesn't know broker's configures. If
there is 100 brokers, the user can't contact all the brokers to find out
the minimum allowed value for segment.ms.  Today, the topic creation is
done via zookeeper,  at this stage, the user doesn't have right view of
broker's configuration.

In the long term,  we do need a way to enforce the minimum of segment.ms.
But I think it is not a trivial change,  so I rather keep this item as
separated PR.


Xiongqi (Wesley) Wu


On Wed, Sep 5, 2018 at 8:33 AM Colin McCabe  wrote:

> On Tue, Sep 4, 2018, at 20:25, xiongqi wu wrote:
> > Thanks for comments.
> >
> > Today, when creating topic, client only does simple local validation
> > and doesn't check against broker's configurations.
> >
> > We cannot just let users to create a configuration in zookeeper and
> > dishonor the user's choice in broker side.
>
> Hi xiongqi,
>
> I wasn't suggesting overriding the user's choices.  Given that this is
> intended for GDPR purposes, that would indeed be a very bad idea.  I was
> suggesting that we disallow users from creating topics with invalid
> settings.  The broker could have a configuration that specifies the minimum
> segment rotation time it is willing to accept.  This could be a dynamic
> config to make it easier to manage.
>
> best,
> Colin
>
> >
> > I agree we need a better way to enforce the right value is set such as
> > segment.ms, but it is not through a simple override in the broker side.
> >
> > If you have better solution, let me know.  If it is require more
> > discussions,  I would rather track this issue outside this KIP.
> >
> >
> > Xiongqi (Wesley) Wu
> >
> >
> > On Tue, Sep 4, 2018 at 6:38 PM Colin McCabe  wrote:
> >
> > > On Tue, Sep 4, 2018, at 17:47, xiongqi wu wrote:
> > > > Colin,
> > > > Thank you for comments.
> > > > see my inline reply below.
> > > >
> > > > Xiongqi (Wesley) Wu
> > > >
> > > >
> > > > On Tue, Sep 4, 2018 at 5:24 PM Colin McCabe 
> wrote:
> > > >
> > > > > Hi Xiongqi,
> > > > >
> > > > > Thanks for this KIP.
> > > > >
> > > > > The name seems a bit ambiguous.  Our compaction policies are
> already
> > > > > time-based, after all.  It seems like this change is focused around
> > > adding
> > > > > a “max.compaction.lag.ms."  Perhaps the KIP title should be
> something
> > > > > like "add maximum compaction lag time"?
> > > > >
> > > > > ==> sure. I will change the title.
> > > >
> > > > > The active segment is forced to roll when either "
> > > max.compaction.lag.ms"
> > > > > > or "segment.ms" (log.roll.ms and log.roll.hours) has reached.
> > > > >
> > > > > If the max.compaction.lag.ms is low, it seems like segments will
> be
> > > > > rolled very frequently.  This can be a source of problems in the
> > > cluster,
> > > > > since creating many different small log segments consumes a huge
> > > amount of
> > > > > cluster resources.  Therefore, I would suggest adding a
> broker-level
> > > > > configuration which allows us to set a minimum value for
> > > > > max.compaction.lag.ms.  If we let users set it on a per-topic
> basis,
> > > > > someone could set a value of 1 ms or something, and cause chaos.
> > > > >
> > > > > =>  this applies to segment.ms as well. Today users can
> set "
> > > > segment.ms" to a very low value, and cause a frequent rolling of
> active
> > > > segments.
> > >
> > > Hi Xiongqi,
> > >
> > > I agree that this is an existing problem with segment.ms.  However,
> that
> > > doesn't mean that we shouldn't fix it.  As you noted, there will be
> more
> > > interest in these topic-level retention settings as a result of GDPR.
> It
> > > seems likely that pre-existing problems will cause more trouble.
> > >
> > > The fix seems relatively straightforward here -- add a broker-level
> > > minimum segment.ms that overrides per-topic minimums.  We can also
> fail
> > > with a helpful error message when someone attempts to set an invalid
> > > configuration.
> > >
> > > >  In my option, the minimum of "max.compaction.lag.ms" should be
> > > > bas

[DISCUSS] KIP-370: Remove Orphan Partitions

2018-09-05 Thread xiongqi wu
This KIP enables broker to remove orphan partitions automatically.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-370%3A+Remove+Orphan+Partitions


Xiongqi (Wesley) Wu


Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-05 Thread xiongqi wu
I want to honor the minimum value of segment.ms (which is 1ms) to force
roll an active segment.
So if we set "max.compaction.lag.ms" any value > 0,  the minimum of
max.compaction.lag.ms and segment.ms will be used to seal an active
segment.

If we set max.compaction.lag.ms to 0,  the current implementation will
treat it as disabled.

It is a little bit weird to treat max.compaction.lag=0 the same as
max.compaction.lag=1.

There might be a reason why we set the minimum of segment.ms to 1, and I
don't want to break this assumption.



Xiongqi (Wesley) Wu


On Wed, Sep 5, 2018 at 7:54 PM Brett Rann  wrote:

> You're rolling a new segment if the condition is met right? So I'm
> struggling to understand the relevance of segment.ms here. Maybe an
> example
> would help my understanding:
>
> segment.ms=999
> *min.cleanable.dirty.ratio=1*
> max.compaction.lag.ms=1
>
> When a duplicate message comes in, after 1ms the topic should be eligible
> for compaction when the log compaction thread gets around to evaluating the
> topic.
>
> if we have
> segment.ms=999
> *min.cleanable.dirty.ratio=1*
> max.compaction.lag.ms=0
>
> When a duplicate message comes in, after 0ms the topic should be eligible
> for compaction when the log compaction thread gets around to evaluating the
> topic.
>
> In both of those cases the change would mean a new segment is rolled so the
> new message would be part of the compaction task. 0 and 1 are practically
> the same meaning since neither is providing an actual guarantee at such low
> MS settings, but effectively tying it to both the frequency of the log
> cleaner running and the priority of the given topic being the highest
> priority of all topics that are evaluated for cleaning on the next cycle.
> You've captured that nuance with careful "skipped" wording in the KIP
> here "controls
> the max time interval a message/segment can be skipped for log compaction".
>
> How is 0 different to 1, practically? And how is it relating to segment.ms
> ?
> Is it that you're proposing to have 0 mean "use segment.ms instead?" as a
> kind of third option?
>
>
>
> On Thu, Sep 6, 2018 at 11:34 AM xiongqi wu  wrote:
>
> > To make it clear,
> > I don't against using -1 as disabled, but we need to come up with the
> > meaning of "0".
> > If "0" means immediate compaction, but the actual compaction lag will be
> > segment.ms.
> > It has longer lag than setting the value to be half of segment.ms.
> > We cannot provide "0" as max compaction lag.
> >
> > Here are two options.
> > Option 1:
> > Keep 0 as disabled
> > Option 2:
> > -1 (disabled), 0 (max compaction lag = segment.ms), and others.
> >
> >
> >
> > Xiongqi (Wesley) Wu
> >
> >
> > On Wed, Sep 5, 2018 at 5:49 PM Brett Rann 
> > wrote:
> >
> > > -1 is consistent as "special" with these settings for example:
> > >
> > > log.retention.bytes
> > > socket.received.buffer.bytes
> > > socket.send.buffer.bytes
> > > queued.max.request.bytes
> > > retention.bytes
> > > retention.ms
> > >
> > > and acks.
> > >
> > > Where it may mean no limit, use OS defaults, max (acks), etc. I don't
> see
> > > much convention of 0 meaning those things.
> > >
> > > There are some NULLs but it seems convetion there is NULL is used where
> > > there's another setting in the hierarchy that would be used instead.
> > >
> > >
> > >
> > >
> > > On Thu, Sep 6, 2018 at 10:42 AM Brett Rann  wrote:
> > >
> > > > If segment.ms can't be set to 0, then we're not being consistent
> > > > by using 0 for this new setting? I throw out -1 for consideration
> > > > again :)
> > > >
> > > > On Thu, Sep 6, 2018 at 10:03 AM xiongqi wu 
> > wrote:
> > > >
> > > >> Thanks. I will document after PR is merged.
> > > >>
> > > >> BTW, Kafka enforce the minimum of "segment.ms" to 1, we cannot set
> "
> > > >> segment.ms" to 0.
> > > >>
> > > >> I also updated the title of this KIP.
> > > >>
> > > >> Xiongqi (Wesley) Wu
> > > >>
> > > >>
> > > >> On Wed, Sep 5, 2018 at 4:34 PM Brett Rann  >
> > > >> wrote:
> > > >>
> > > >> > I withdraw my comments on -1 since i'm in the minority. :) Can we
> > > >> > make sure 0 gets document

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-05 Thread xiongqi wu
To make it clear,
I don't against using -1 as disabled, but we need to come up with the
meaning of "0".
If "0" means immediate compaction, but the actual compaction lag will be
segment.ms.
It has longer lag than setting the value to be half of segment.ms.
We cannot provide "0" as max compaction lag.

Here are two options.
Option 1:
Keep 0 as disabled
Option 2:
-1 (disabled), 0 (max compaction lag = segment.ms),  and others.



Xiongqi (Wesley) Wu


On Wed, Sep 5, 2018 at 5:49 PM Brett Rann  wrote:

> -1 is consistent as "special" with these settings for example:
>
> log.retention.bytes
> socket.received.buffer.bytes
> socket.send.buffer.bytes
> queued.max.request.bytes
> retention.bytes
> retention.ms
>
> and acks.
>
> Where it may mean no limit, use OS defaults, max (acks), etc. I don't see
> much convention of 0 meaning those things.
>
> There are some NULLs but it seems convetion there is NULL is used where
> there's another setting in the hierarchy that would be used instead.
>
>
>
>
> On Thu, Sep 6, 2018 at 10:42 AM Brett Rann  wrote:
>
> > If segment.ms can't be set to 0, then we're not being consistent
> > by using 0 for this new setting? I throw out -1 for consideration
> > again :)
> >
> > On Thu, Sep 6, 2018 at 10:03 AM xiongqi wu  wrote:
> >
> >> Thanks. I will document after PR is merged.
> >>
> >> BTW, Kafka enforce the minimum of "segment.ms" to 1, we cannot set "
> >> segment.ms" to 0.
> >>
> >> I also updated the title of this KIP.
> >>
> >> Xiongqi (Wesley) Wu
> >>
> >>
> >> On Wed, Sep 5, 2018 at 4:34 PM Brett Rann 
> >> wrote:
> >>
> >> > I withdraw my comments on -1 since i'm in the minority. :) Can we
> >> > make sure 0 gets documented as meaning disabled here:
> >> > https://kafka.apache.org/documentation/#brokerconfigs
> >> <https://kafka.apache.org/documentation/#brokerconfigs> ?
> >> > And while there it would be good if segment.ms is documented
> >> > that 0 is disabled too. (there's some hierarchy of configs for that
> too
> >> > if its not set and null for others means disabled!)
> >> >
> >> >
> >> > On Thu, Sep 6, 2018 at 4:44 AM xiongqi wu 
> wrote:
> >> >
> >> > > If we use 0 to indicate immediate compaction, the compaction lag is
> >> > > determined by segment.ms in worst case. If segment.ms is 24 hours,
> >> > > "immediate compaction" is a weaker guarantee than setting any value
> >> less
> >> > > than 24 hours. By the definition of "max compaction lag", we cannot
> >> have
> >> > > zero lag. So I use 0 to indicate "disable".
> >> > >
> >> > >
> >> > >
> >> > > Xiongqi (Wesley) Wu
> >> > >
> >> > >
> >> > > On Wed, Sep 5, 2018 at 8:34 AM Colin McCabe 
> >> wrote:
> >> > >
> >> > > > On Tue, Sep 4, 2018, at 22:11, Brett Rann wrote:
> >> > > > > > That's a fair point. We should make 0 = disable, to be
> >> consistent
> >> > > with
> >> > > > > the other settings.
> >> > > > >
> >> > > > > -1 is used elsewhere for disable and when seeing it in a config
> >> it's
> >> > > > clear
> >> > > > > that it's a special meaning. 0 doesn't have to mean instant, it
> >> just
> >> > > > means
> >> > > > > as quickly as possible. I don't think 0 is intuitive for
> disabled
> >> and
> >> > > it
> >> > > > > will be confusing. I wasn't aware segment.ms=0 == disabled,
> but I
> >> > > think
> >> > > > > that is also unintuitive.
> >> > > >
> >> > > > I think there is an argument for keeping these two configurations
> >> > > > consistent, since they are so similar. I agree that 0 was an
> >> > unfortunate
> >> > > > choice.,
> >> > > >
> >> > > > best,
> >> > > > Colin
> >> > > >
> >> > > > >
> >> > > > > On Wed, Sep 5, 2018 at 11:38 AM Colin McCabe <
> cmcc...@apache.org>
> >> > > wrote:
> >> > > > >
> >> > > > > >

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-05 Thread xiongqi wu
Thanks.  I will document after PR is merged.

BTW, Kafka enforce the minimum of "segment.ms" to 1, we cannot set "
segment.ms" to 0.

I also updated the title of this KIP.

Xiongqi (Wesley) Wu


On Wed, Sep 5, 2018 at 4:34 PM Brett Rann  wrote:

> I withdraw my comments on -1 since i'm in the minority.  :) Can we
> make sure 0 gets documented as meaning disabled here:
> https://kafka.apache.org/documentation/#brokerconfigs ?
> And while there it would be good if  segment.ms is documented
>  that 0 is disabled too.  (there's some hierarchy of configs for that too
>  if its not set and null for others means disabled!)
>
>
> On Thu, Sep 6, 2018 at 4:44 AM xiongqi wu  wrote:
>
> > If we use 0 to indicate immediate compaction, the compaction lag is
> > determined by segment.ms in worst case. If segment.ms is 24 hours,
> > "immediate compaction" is a weaker guarantee than setting any value less
> > than 24 hours. By the definition of "max compaction lag", we cannot have
> > zero lag. So I use 0 to indicate "disable".
> >
> >
> >
> > Xiongqi (Wesley) Wu
> >
> >
> > On Wed, Sep 5, 2018 at 8:34 AM Colin McCabe  wrote:
> >
> > > On Tue, Sep 4, 2018, at 22:11, Brett Rann wrote:
> > > > > That's a fair point. We should make 0 = disable, to be consistent
> > with
> > > > the other settings.
> > > >
> > > > -1 is used elsewhere for disable and when seeing it in a config it's
> > > clear
> > > > that it's a special meaning. 0 doesn't have to mean instant, it just
> > > means
> > > > as quickly as possible. I don't think 0 is intuitive for disabled and
> > it
> > > > will be confusing. I wasn't aware segment.ms=0 == disabled, but I
> > think
> > > > that is also unintuitive.
> > >
> > > I think there is an argument for keeping these two configurations
> > > consistent, since they are so similar. I agree that 0 was an
> unfortunate
> > > choice.,
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > On Wed, Sep 5, 2018 at 11:38 AM Colin McCabe 
> > wrote:
> > > >
> > > > > On Tue, Sep 4, 2018, at 17:47, xiongqi wu wrote:
> > > > > > Colin,
> > > > > > Thank you for comments.
> > > > > > see my inline reply below.
> > > > > >
> > > > > > Xiongqi (Wesley) Wu
> > > > > >
> > > > > >
> > > > > > On Tue, Sep 4, 2018 at 5:24 PM Colin McCabe 
> > > wrote:
> > > > > >
> > > > > > > Hi Xiongqi,
> > > > > > >
> > > > > > > Thanks for this KIP.
> > > > > > >
> > > > > > > The name seems a bit ambiguous. Our compaction policies are
> > already
> > > > > > > time-based, after all. It seems like this change is focused
> > around
> > > > > adding
> > > > > > > a “max.compaction.lag.ms." Perhaps the KIP title should be
> > > something
> > > > > > > like "add maximum compaction lag time"?
> > > > > > >
> > > > > > > ==> sure. I will change the title.
> > > > > >
> > > > > > > The active segment is forced to roll when either "
> > > > > max.compaction.lag.ms"
> > > > > > > > or "segment.ms" (log.roll.ms and log.roll.hours) has
> reached.
> > > > > > >
> > > > > > > If the max.compaction.lag.ms is low, it seems like segments
> will
> > > be
> > > > > > > rolled very frequently. This can be a source of problems in the
> > > > > cluster,
> > > > > > > since creating many different small log segments consumes a
> huge
> > > > > amount of
> > > > > > > cluster resources. Therefore, I would suggest adding a
> > broker-level
> > > > > > > configuration which allows us to set a minimum value for
> > > > > > > max.compaction.lag.ms. If we let users set it on a per-topic
> > > basis,
> > > > > > > someone could set a value of 1 ms or something, and cause
> chaos.
> > > > > > >
> > > > > > > => this applies to segment.ms as well. Today users can
> > > set "
> > > > > > segme

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-10 Thread xiongqi wu
Thank you for comments.  I will use '0' for now.

If we create topics through admin client in the future, we might perform
some useful checks. (but the assumption is all brokers in the same cluster
have the same default configurations value, otherwise,it might still be
tricky to do such cross validation check.)


Xiongqi (Wesley) Wu


On Mon, Sep 10, 2018 at 11:15 AM Colin McCabe  wrote:

> I don't have a strong opinion.  But I think we should probably be
> consistent with how segment.ms works, and just use 0.
>
> best,
> Colin
>
>
> On Wed, Sep 5, 2018, at 21:19, Brett Rann wrote:
> > OK thanks for that clarification. I see why you're uncomfortable with 0
> now.
> >
> > I'm not really fussed. I just prefer consistency in configuration
> options.
> >
> > Personally I lean towards treating 0 and 1 similarly in that scenario,
> > because it favours the person thinking about setting the configurations,
> > and a person doesn't care about a 1ms edge case especially when the
> context
> > is the true minimum is tied to the log cleaner cadence.
> >
> > Introducing 0 to mean "disabled" because there is some uniquness in
> > segment.ms not being able to be set to 0, reduces configuration
> consistency
> > in favour of capturing a MS gap in an edge case that nobody would ever
> > notice. For someone to understand why everywhere else -1 is used to
> > disable, but here 0 is used, they would need to learn about segment.ms
> > having a 1ms minimum and then after learning would think "who cares about
> > 1ms?" in this context. I would anyway :)
> >
> > my 2c anyway. Will again defer to majority. Curious which way Colin falls
> > now.
> >
> > Don't want to spend more time on this though, It's well  into
> bikeshedding
> > territory now.  :)
> >
> >
> >
> > On Thu, Sep 6, 2018 at 1:31 PM xiongqi wu  wrote:
> >
> > > I want to honor the minimum value of segment.ms (which is 1ms) to
> force
> > > roll an active segment.
> > > So if we set "max.compaction.lag.ms" any value > 0, the minimum of
> > > max.compaction.lag.ms and segment.ms will be used to seal an active
> > > segment.
> > >
> > > If we set max.compaction.lag.ms to 0, the current implementation will
> > > treat it as disabled.
> > >
> > > It is a little bit weird to treat max.compaction.lag=0 the same as
> > > max.compaction.lag=1.
> > >
> > > There might be a reason why we set the minimum of segment.ms to 1,
> and I
> > > don't want to break this assumption.
> > >
> > >
> > >
> > > Xiongqi (Wesley) Wu
> > >
> > >
> > > On Wed, Sep 5, 2018 at 7:54 PM Brett Rann 
> > > wrote:
> > >
> > > > You're rolling a new segment if the condition is met right? So I'm
> > > > struggling to understand the relevance of segment.ms here. Maybe an
> > > > example
> > > > would help my understanding:
> > > >
> > > > segment.ms=999
> > > > *min.cleanable.dirty.ratio=1*
> > > > max.compaction.lag.ms=1
> > > >
> > > > When a duplicate message comes in, after 1ms the topic should be
> eligible
> > > > for compaction when the log compaction thread gets around to
> evaluating
> > > the
> > > > topic.
> > > >
> > > > if we have
> > > > segment.ms=999
> > > > *min.cleanable.dirty.ratio=1*
> > > > max.compaction.lag.ms=0
> > > >
> > > > When a duplicate message comes in, after 0ms the topic should be
> eligible
> > > > for compaction when the log compaction thread gets around to
> evaluating
> > > the
> > > > topic.
> > > >
> > > > In both of those cases the change would mean a new segment is rolled
> so
> > > the
> > > > new message would be part of the compaction task. 0 and 1 are
> practically
> > > > the same meaning since neither is providing an actual guarantee at
> such
> > > low
> > > > MS settings, but effectively tying it to both the frequency of the
> log
> > > > cleaner running and the priority of the given topic being the highest
> > > > priority of all topics that are evaluated for cleaning on the next
> cycle.
> > > > You've captured that nuance with careful "skipped" wording in the KIP
> > > > here "controls
> > > > the max time interval a message/segment can be skipped for log
> &

Re: [DISCUSS] KIP-370: Remove Orphan Partitions

2018-09-10 Thread xiongqi wu
Here is the implementation for the KIP 370.

https://github.com/xiowu0/kafka/commit/f1bd3085639f41a7af02567550a8e3018cfac3e9


The purpose is to do one time cleanup (after a configured delay) of orphan
partitions when a broker starts up.


Xiongqi (Wesley) Wu


On Wed, Sep 5, 2018 at 10:51 AM xiongqi wu  wrote:

>
> This KIP enables broker to remove orphan partitions automatically.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-370%3A+Remove+Orphan+Partitions
>
>
> Xiongqi (Wesley) Wu
>


Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-10-16 Thread xiongqi wu
Mayuresh,

Thanks for the comments.
The requirement is that we need to pick up segments that are older than
maxCompactionLagMs for compaction.
maxCompactionLagMs is an upper-bound, which implies that picking up
segments for compaction earlier doesn't violated the policy.
We use the creation time of a segment as an estimation of its records
arrival time, so these records can be compacted no later than
maxCompactionLagMs.

On the other hand, compaction is an expensive operation, we don't want to
compact the log partition whenever a new segment is sealed.
Therefore, we want to pick up a segment for compaction when the segment is
closed to mandatory max compaction lag (so we use segment creation time as
an estimation.)


Xiongqi (Wesley) Wu


On Mon, Oct 15, 2018 at 5:54 PM Mayuresh Gharat 
wrote:

> Hi Wesley,
>
> Thanks for the KIP and sorry for being late to the party.
>  I wanted to understand, the scenario you mentioned in Proposed changes :
>
> -
> >
> > Estimate the earliest message timestamp of an un-compacted log segment.
> we
> > only need to estimate earliest message timestamp for un-compacted log
> > segments to ensure timely compaction because the deletion requests that
> > belong to compacted segments have already been processed.
> >
> >1.
> >
> >for the first (earliest) log segment:  The estimated earliest
> >timestamp is set to the timestamp of the first message if timestamp is
> >present in the message. Otherwise, the estimated earliest timestamp
> is set
> >to "segment.largestTimestamp - maxSegmentMs”
> > (segment.largestTimestamp is lastModified time of the log segment or
> max
> >timestamp we see for the log segment.). In the later case, the actual
> >timestamp of the first message might be later than the estimation,
> but it
> >is safe to pick up the log for compaction earlier.
> >
> > When we say "actual timestamp of the first message might be later than
> the
> estimation, but it is safe to pick up the log for compaction earlier.",
> doesn't that violate the assumption that we will consider a segment for
> compaction only if the time of creation the segment has crossed the "now -
> maxCompactionLagMs" ?
>
> Thanks,
>
> Mayuresh
>
> On Mon, Sep 3, 2018 at 7:28 PM Brett Rann 
> wrote:
>
> > Might also be worth moving to a vote thread? Discussion seems to have
> gone
> > as far as it can.
> >
> > > On 4 Sep 2018, at 12:08, xiongqi wu  wrote:
> > >
> > > Brett,
> > >
> > > Yes, I will post PR tomorrow.
> > >
> > > Xiongqi (Wesley) Wu
> > >
> > >
> > > On Sun, Sep 2, 2018 at 6:28 PM Brett Rann 
> > wrote:
> > >
> > > > +1 (non-binding) from me on the interface. I'd like to see someone
> > familiar
> > > > with
> > > > the code comment on the approach, and note there's a couple of
> > different
> > > > approaches: what's documented in the KIP, and what Xiaohe Dong was
> > working
> > > > on
> > > > here:
> > > >
> > > >
> >
> https://github.com/dongxiaohe/kafka/tree/dongxiaohe/log-cleaner-compaction-max-lifetime-2.0
> > > >
> > > > If you have code working already Xiongqi Wu could you share a PR? I'd
> > be
> > > > happy
> > > > to start testing.
> > > >
> > > > On Tue, Aug 28, 2018 at 5:57 AM xiongqi wu 
> > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > Do you have any additional comments on this KIP?
> > > > >
> > > > >
> > > > > On Thu, Aug 16, 2018 at 9:17 PM, xiongqi wu 
> > wrote:
> > > > >
> > > > > > on 2)
> > > > > > The offsetmap is built starting from dirty segment.
> > > > > > The compaction starts from the beginning of the log partition.
> > That's
> > > > how
> > > > > > it ensure the deletion of tomb keys.
> > > > > > I will double check tomorrow.
> > > > > >
> > > > > > Xiongqi (Wesley) Wu
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 16, 2018 at 6:46 PM Brett Rann
> > 
> > > > > > wrote:
> > > > > >
> > > > > >> To just clarify a bit on 1. whether there's an external
> storage/DB
> > > > isn't
> > > > > >> relevant here.
> > > > > >> Compacted topics allow a tom

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-10-29 Thread xiongqi wu
Hi Dong,

Thank you for your comment.  See my inline comments.
I will update the KIP shortly.

Xiongqi (Wesley) Wu


On Sun, Oct 28, 2018 at 9:17 PM Dong Lin  wrote:

> Hey Xiongqi,
>
> Sorry for late reply. I have some comments below:
>
> 1) As discussed earlier in the email list, if the topic is configured with
> both deletion and compaction, in some cases messages produced a long time
> ago can not be deleted based on time. This is a valid use-case because we
> actually have topic which is configured with both deletion and compaction
> policy. And we should enforce the semantics for both policy. Solution A
> sounds good. We do not need interface change (e.g. extra config) to enforce
> solution A. All we need is to update implementation so that when broker
> compacts a topic, if the message has timestamp (which is the common case),
> messages that are too old (based on the time-based retention config) will
> be discarded. Since this is a valid issue and it is also related to the
> guarantee of when a message can be deleted, can we include the solution of
> this problem in the KIP?
>
==  This makes sense.  We can use similar approach to increase the log
start offset.

>
> 2) It is probably OK to assume that all messages have timestamp. The
> per-message timestamp was introduced into Kafka 0.10.0 with KIP-31 and
> KIP-32 as of Feb 2016. Kafka 0.10.0 or earlier versions are no longer
> supported. Also, since the use-case for this feature is primarily for GDPR,
> we can assume that client library has already been upgraded to support SSL,
> which feature is added after KIP-31 and KIP-32.
>
>  =>  Ok. We can use message timestamp to delete expired records if
both compaction and retention are enabled.


3) In Proposed Change section 2.a, it is said that segment.largestTimestamp
> - maxSegmentMs can be used to determine the timestamp of the earliest
> message. Would it be simpler to just use the create time of the file to
> determine the time?
>
> >  Linux/Java doesn't provide API for file creation time because
some filesystem type doesn't provide file creation time.


> 4) The KIP suggests to use must-clean-ratio to select the partition to be
> compacted. Unlike dirty ratio which is mostly for performance, the logs
> whose "must-clean-ratio" is non-zero must be compacted immediately for
> correctness reason (and for GDPR). And if this can no be achieved because
> e.g. broker compaction throughput is too low, investigation will be needed.
> So it seems simpler to first compact logs which has segment whose earliest
> timetamp is earlier than now - max.compaction.lag.ms, instead of defining
> must-clean-ratio and sorting logs based on this value.
>
>
==>  Good suggestion. This can simply the implementation quite a bit if
we are not too concerned about compaction of GDPR required partition queued
behind some large partition.  The actual compaction completion time is not
guaranteed anyway.


> 5) The KIP says max.compaction.lag.ms is 0 by default and it is also
> suggested that 0 means disable. Should we set this value to MAX_LONG by
> default to effectively disable the feature added in this KIP?
>
> > I would rather use 0 so the corresponding code path will not be
exercised.  By using MAX_LONG, we would theoretically go through related
code to find out whether the partition is required to be compacted to
satisfy MAX_LONG.

6) It is probably cleaner and readable not to include in Public Interface
> section those configs whose meaning is not changed.
>
> > I will clean that up.

7) The goal of this KIP is to ensure that log segment whose earliest
> message is earlier than a given threshold will be compacted. This goal may
> not be achieved if the compact throughput can not catchup with the total
> bytes-in-rate for the compacted topics on the broker. Thus we need an easy
> way to tell operator whether this goal is achieved. If we don't already
> have such metric, maybe we can include metrics to show 1) the total number
> of log segments (or logs) which needs to be immediately compacted as
> determined by max.compaction.lag; and 2) the maximum value of now -
> earliest_time_stamp_of_segment among all segments that needs to be
> compacted.
>
> ===> good suggestion.  I will update KIP for these metrics.

8) The Performance Impact suggests user to use the existing metrics to
> monitor the performance impact of this KIP. It i useful to list mean of
> each jmx metrics that we want user to monitor, and possibly explain how to
> interpret the value of these metrics to determine whether there is
> performance issue.
>
> =>  I will update the KIP.

> Thanks,
> Dong
>
> On Tue, Oct 16, 2018 at 10:53 AM xiongqi wu  wrote:
>
> > Mayures

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-10-29 Thread xiongqi wu
Hi Dong,
I have updated the KIP to address your comments.
One correction to previous Email:
after offline discussion with Dong,  we decide to use MAX_LONG as default
value for max.compaction.lag.ms.


Xiongqi (Wesley) Wu


On Mon, Oct 29, 2018 at 12:15 PM xiongqi wu  wrote:

> Hi Dong,
>
> Thank you for your comment.  See my inline comments.
> I will update the KIP shortly.
>
> Xiongqi (Wesley) Wu
>
>
> On Sun, Oct 28, 2018 at 9:17 PM Dong Lin  wrote:
>
>> Hey Xiongqi,
>>
>> Sorry for late reply. I have some comments below:
>>
>> 1) As discussed earlier in the email list, if the topic is configured with
>> both deletion and compaction, in some cases messages produced a long time
>> ago can not be deleted based on time. This is a valid use-case because we
>> actually have topic which is configured with both deletion and compaction
>> policy. And we should enforce the semantics for both policy. Solution A
>> sounds good. We do not need interface change (e.g. extra config) to
>> enforce
>> solution A. All we need is to update implementation so that when broker
>> compacts a topic, if the message has timestamp (which is the common case),
>> messages that are too old (based on the time-based retention config) will
>> be discarded. Since this is a valid issue and it is also related to the
>> guarantee of when a message can be deleted, can we include the solution of
>> this problem in the KIP?
>>
> ==  This makes sense.  We can use similar approach to increase the log
> start offset.
>
>>
>> 2) It is probably OK to assume that all messages have timestamp. The
>> per-message timestamp was introduced into Kafka 0.10.0 with KIP-31 and
>> KIP-32 as of Feb 2016. Kafka 0.10.0 or earlier versions are no longer
>> supported. Also, since the use-case for this feature is primarily for
>> GDPR,
>> we can assume that client library has already been upgraded to support
>> SSL,
>> which feature is added after KIP-31 and KIP-32.
>>
>>  =>  Ok. We can use message timestamp to delete expired records
> if both compaction and retention are enabled.
>
>
> 3) In Proposed Change section 2.a, it is said that segment.largestTimestamp
>> - maxSegmentMs can be used to determine the timestamp of the earliest
>> message. Would it be simpler to just use the create time of the file to
>> determine the time?
>>
>> >  Linux/Java doesn't provide API for file creation time because
> some filesystem type doesn't provide file creation time.
>
>
>> 4) The KIP suggests to use must-clean-ratio to select the partition to be
>> compacted. Unlike dirty ratio which is mostly for performance, the logs
>> whose "must-clean-ratio" is non-zero must be compacted immediately for
>> correctness reason (and for GDPR). And if this can no be achieved because
>> e.g. broker compaction throughput is too low, investigation will be
>> needed.
>> So it seems simpler to first compact logs which has segment whose earliest
>> timetamp is earlier than now - max.compaction.lag.ms, instead of defining
>> must-clean-ratio and sorting logs based on this value.
>>
>>
> ==>  Good suggestion. This can simply the implementation quite a bit
> if we are not too concerned about compaction of GDPR required partition
> queued behind some large partition.  The actual compaction completion time
> is not guaranteed anyway.
>
>
>> 5) The KIP says max.compaction.lag.ms is 0 by default and it is also
>> suggested that 0 means disable. Should we set this value to MAX_LONG by
>> default to effectively disable the feature added in this KIP?
>>
>> > I would rather use 0 so the corresponding code path will not be
> exercised.  By using MAX_LONG, we would theoretically go through related
> code to find out whether the partition is required to be compacted to
> satisfy MAX_LONG.
>
> 6) It is probably cleaner and readable not to include in Public Interface
>> section those configs whose meaning is not changed.
>>
>> > I will clean that up.
>
> 7) The goal of this KIP is to ensure that log segment whose earliest
>> message is earlier than a given threshold will be compacted. This goal may
>> not be achieved if the compact throughput can not catchup with the total
>> bytes-in-rate for the compacted topics on the broker. Thus we need an easy
>> way to tell operator whether this goal is achieved. If we don't already
>> have such metric, maybe we can include metrics to show 1) the total number
>> of log segments (or logs) which needs to be immediately compacted as
>> determined by max.compacti

Re: [DISCUSS] KIP-370: Remove Orphan Partitions

2018-10-29 Thread xiongqi wu
Dong,

Thanks for the comments.

1) With KIP-380, in theory we don't need the timeout phase.
However, once orphan partitions are removed, they cannot be recovered.
The question is should we rely on the fact that the first leaderandISR
always contains correct information.

For retention enabled topic,  the deletion phase (step 3 in this KIP) will
protect against deletion of new segments.
For log compaction topic,  since log segments can be relative old, delete
phase might delete useful segments if by any chance first leaderandISR is
incorrect.

Here is the different with/without timeout phase:
Solution 1: without timeout phase,  we rely on the first leaderandISR and
understand that if first leaderandISR is incorrect, we might loss data.  We
don't protect against bug.
Solution 2:  with timeout phase,   we rely on the fact that, during timeout
period, there is at least one valid leaderandISR for any given partition
hosted by the broker.
With the complexity of adding a timeout configuration.

The solution 2 is a more safer option that comes with the cost of timeout
configuration.
*What is your opinion on these two solutions?*


For your second comment:

I will change the metric description. Thanks for pointing out the right
metric format.


Xiongqi (Wesley) Wu


On Sun, Oct 28, 2018 at 9:39 PM Dong Lin  wrote:

> Hey Xiongqi,
>
> Thanks for the KIP. Here are some comments:
>
> 1) KIP provides two motivation for the timeout/correction phase. One
> motivation is to handle outdated requests. Would this still be an issue
> after KIP-380? The second motivation seems to be mainly for performance
> optimization when there is reassignment. In general we expect data movement
> when we reassign partitions to new brokers. So this is probably not a
> strong reason for adding a new config.
>
> 2) The KIP says "Adding metrics to keep track of the number of orphan
> partitions and the size of these orphan partitions". Can you add the
> specification of these new metrics? Here are example doc in
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-237%3A+More+Controller+Health+Metrics
> .
>
> Thanks,
> Dong
>
> On Thu, Sep 20, 2018 at 5:40 PM xiongqi wu  wrote:
>
> > Colin,
> >
> > Thanks for the comment.
> > 1)
> > auto.orphan.partition.removal.delay.ms refers to timeout since the first
> > leader and ISR request was received.  The idea is we want to wait enough
> > time to receive up-to-dated leaderandISR request and any old or new
> > partitions reassignment requests.
> >
> > 2)
> > Is there any logic to remove the partition folders on disk?  I can only
> > find references to removing older log segments, but not the folder, in
> the
> > KIP.
> > ==> yes, the plan is to remove partition folders as well.
> >
> > I will update the KIP to make it more clear.
> >
> >
> > Xiongqi (Wesley) Wu
> >
> >
> > On Thu, Sep 20, 2018 at 5:02 PM Colin McCabe  wrote:
> >
> > > Hi Xiongqi,
> > >
> > > Thanks for the KIP.
> > >
> > > Can you be a bit more clear what the timeout
> > > auto.orphan.partition.removal.delay.ms refers to?  Is the timeout
> > > measured since the partition was supposed to be on the broker?  Or is
> the
> > > timeout measured since the broker started up?
> > >
> > > Is there any logic to remove the partition folders on disk?  I can only
> > > find references to removing older log segments, but not the folder, in
> > the
> > > KIP.
> > >
> > > best,
> > > Colin
> > >
> > > On Wed, Sep 19, 2018, at 10:53, xiongqi wu wrote:
> > > > Any comments?
> > > >
> > > > Xiongqi (Wesley) Wu
> > > >
> > > >
> > > > On Mon, Sep 10, 2018 at 3:04 PM xiongqi wu 
> > wrote:
> > > >
> > > > > Here is the implementation for the KIP 370.
> > > > >
> > > > >
> > > > >
> > >
> >
> https://github.com/xiowu0/kafka/commit/f1bd3085639f41a7af02567550a8e3018cfac3e9
> > > > >
> > > > >
> > > > > The purpose is to do one time cleanup (after a configured delay) of
> > > orphan
> > > > > partitions when a broker starts up.
> > > > >
> > > > >
> > > > > Xiongqi (Wesley) Wu
> > > > >
> > > > >
> > > > > On Wed, Sep 5, 2018 at 10:51 AM xiongqi wu 
> > > wrote:
> > > > >
> > > > >>
> > > > >> This KIP enables broker to remove orphan partitions automatically.
> > > > >>
> > > > >>
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-370%3A+Remove+Orphan+Partitions
> > > > >>
> > > > >>
> > > > >> Xiongqi (Wesley) Wu
> > > > >>
> > > > >
> > >
> >
>


Re: [DISCUSS] KIP-370: Remove Orphan Partitions

2018-10-29 Thread xiongqi wu
Thanks Dong.

I have updated the KIP.
Instead of using a configure to specify the timeout, I switch it to use
internal timer.  User doesn't need a new configuration to use this feature.

Xiongqi (Wesley) Wu


On Mon, Oct 29, 2018 at 4:40 PM xiongqi wu  wrote:

> Dong,
>
> Thanks for the comments.
>
> 1) With KIP-380, in theory we don't need the timeout phase.
> However, once orphan partitions are removed, they cannot be recovered.
> The question is should we rely on the fact that the first leaderandISR
> always contains correct information.
>
> For retention enabled topic,  the deletion phase (step 3 in this KIP) will
> protect against deletion of new segments.
> For log compaction topic,  since log segments can be relative old, delete
> phase might delete useful segments if by any chance first leaderandISR is
> incorrect.
>
> Here is the different with/without timeout phase:
> Solution 1: without timeout phase,  we rely on the first leaderandISR and
> understand that if first leaderandISR is incorrect, we might loss data.  We
> don't protect against bug.
> Solution 2:  with timeout phase,   we rely on the fact that, during
> timeout period, there is at least one valid leaderandISR for any given
> partition hosted by the broker.
> With the complexity of adding a timeout configuration.
>
> The solution 2 is a more safer option that comes with the cost of timeout
> configuration.
> *What is your opinion on these two solutions?*
>
>
> For your second comment:
>
> I will change the metric description. Thanks for pointing out the right
> metric format.
>
>
> Xiongqi (Wesley) Wu
>
>
> On Sun, Oct 28, 2018 at 9:39 PM Dong Lin  wrote:
>
>> Hey Xiongqi,
>>
>> Thanks for the KIP. Here are some comments:
>>
>> 1) KIP provides two motivation for the timeout/correction phase. One
>> motivation is to handle outdated requests. Would this still be an issue
>> after KIP-380? The second motivation seems to be mainly for performance
>> optimization when there is reassignment. In general we expect data
>> movement
>> when we reassign partitions to new brokers. So this is probably not a
>> strong reason for adding a new config.
>>
>> 2) The KIP says "Adding metrics to keep track of the number of orphan
>> partitions and the size of these orphan partitions". Can you add the
>> specification of these new metrics? Here are example doc in
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-237%3A+More+Controller+Health+Metrics
>> .
>>
>> Thanks,
>> Dong
>>
>> On Thu, Sep 20, 2018 at 5:40 PM xiongqi wu  wrote:
>>
>> > Colin,
>> >
>> > Thanks for the comment.
>> > 1)
>> > auto.orphan.partition.removal.delay.ms refers to timeout since the
>> first
>> > leader and ISR request was received.  The idea is we want to wait enough
>> > time to receive up-to-dated leaderandISR request and any old or new
>> > partitions reassignment requests.
>> >
>> > 2)
>> > Is there any logic to remove the partition folders on disk?  I can only
>> > find references to removing older log segments, but not the folder, in
>> the
>> > KIP.
>> > ==> yes, the plan is to remove partition folders as well.
>> >
>> > I will update the KIP to make it more clear.
>> >
>> >
>> > Xiongqi (Wesley) Wu
>> >
>> >
>> > On Thu, Sep 20, 2018 at 5:02 PM Colin McCabe 
>> wrote:
>> >
>> > > Hi Xiongqi,
>> > >
>> > > Thanks for the KIP.
>> > >
>> > > Can you be a bit more clear what the timeout
>> > > auto.orphan.partition.removal.delay.ms refers to?  Is the timeout
>> > > measured since the partition was supposed to be on the broker?  Or is
>> the
>> > > timeout measured since the broker started up?
>> > >
>> > > Is there any logic to remove the partition folders on disk?  I can
>> only
>> > > find references to removing older log segments, but not the folder, in
>> > the
>> > > KIP.
>> > >
>> > > best,
>> > > Colin
>> > >
>> > > On Wed, Sep 19, 2018, at 10:53, xiongqi wu wrote:
>> > > > Any comments?
>> > > >
>> > > > Xiongqi (Wesley) Wu
>> > > >
>> > > >
>> > > > On Mon, Sep 10, 2018 at 3:04 PM xiongqi wu 
>> > wrote:
>> > > >
>> > > > > Here is the implementation for the KIP 370.
>> > > > >
>> > > > >
>> > > > >
>> > >
>> >
>> https://github.com/xiowu0/kafka/commit/f1bd3085639f41a7af02567550a8e3018cfac3e9
>> > > > >
>> > > > >
>> > > > > The purpose is to do one time cleanup (after a configured delay)
>> of
>> > > orphan
>> > > > > partitions when a broker starts up.
>> > > > >
>> > > > >
>> > > > > Xiongqi (Wesley) Wu
>> > > > >
>> > > > >
>> > > > > On Wed, Sep 5, 2018 at 10:51 AM xiongqi wu 
>> > > wrote:
>> > > > >
>> > > > >>
>> > > > >> This KIP enables broker to remove orphan partitions
>> automatically.
>> > > > >>
>> > > > >>
>> > > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-370%3A+Remove+Orphan+Partitions
>> > > > >>
>> > > > >>
>> > > > >> Xiongqi (Wesley) Wu
>> > > > >>
>> > > > >
>> > >
>> >
>>
>


Re: [VOTE] KIP-354 Time-based log compaction policy

2018-11-06 Thread xiongqi wu
bump
Xiongqi (Wesley) Wu


On Thu, Sep 27, 2018 at 4:20 PM xiongqi wu  wrote:

>
> Thanks Eno, Brett, Dong, Guozhang, Colin,  and Xiaohe for feedback.
> Can I have more feedback or VOTE on this KIP?
>
>
> Xiongqi (Wesley) Wu
>
>
> On Wed, Sep 19, 2018 at 10:52 AM xiongqi wu  wrote:
>
>> Any other votes or comments?
>>
>> Xiongqi (Wesley) Wu
>>
>>
>> On Tue, Sep 11, 2018 at 11:45 AM xiongqi wu  wrote:
>>
>>> Yes, more votes and code review.
>>>
>>> Xiongqi (Wesley) Wu
>>>
>>>
>>> On Mon, Sep 10, 2018 at 11:37 PM Brett Rann 
>>> wrote:
>>>
>>>> +1 (non binding) from on 0 then, and on the KIP.
>>>>
>>>> Where do we go from here? More votes?
>>>>
>>>> On Tue, Sep 11, 2018 at 5:34 AM Colin McCabe 
>>>> wrote:
>>>>
>>>> > On Mon, Sep 10, 2018, at 11:44, xiongqi wu wrote:
>>>> > > Thank you for comments. I will use '0' for now.
>>>> > >
>>>> > > If we create topics through admin client in the future, we might
>>>> perform
>>>> > > some useful checks. (but the assumption is all brokers in the same
>>>> > cluster
>>>> > > have the same default configurations value, otherwise,it might
>>>> still be
>>>> > > tricky to do such cross validation check.)
>>>> >
>>>> > This isn't something that we might do in the future-- this is
>>>> something we
>>>> > are doing now. We already have Create Topic policies which are
>>>> enforced by
>>>> > the broker. Check KIP-108 and KIP-170 for details. This is one of the
>>>> > motivations for getting rid of direct ZK access-- making sure that
>>>> these
>>>> > policies are applied.
>>>> >
>>>> > I agree that having different configurations on different brokers can
>>>> be
>>>> > confusing and frustrating . That's why more configurations are being
>>>> made
>>>> > dynamic using KIP-226. Dynamic configurations are stored centrally in
>>>> ZK,
>>>> > so they are the same on all brokers (modulo propagation delays). In
>>>> any
>>>> > case, this is a general issue, not specific to "create topics".
>>>> >
>>>> > cheers,
>>>> > Colin
>>>> >
>>>> >
>>>> > >
>>>> > >
>>>> > > Xiongqi (Wesley) Wu
>>>> > >
>>>> > >
>>>> > > On Mon, Sep 10, 2018 at 11:15 AM Colin McCabe 
>>>> > wrote:
>>>> > >
>>>> > > > I don't have a strong opinion. But I think we should probably be
>>>> > > > consistent with how segment.ms works, and just use 0.
>>>> > > >
>>>> > > > best,
>>>> > > > Colin
>>>> > > >
>>>> > > >
>>>> > > > On Wed, Sep 5, 2018, at 21:19, Brett Rann wrote:
>>>> > > > > OK thanks for that clarification. I see why you're uncomfortable
>>>> > with 0
>>>> > > > now.
>>>> > > > >
>>>> > > > > I'm not really fussed. I just prefer consistency in
>>>> configuration
>>>> > > > options.
>>>> > > > >
>>>> > > > > Personally I lean towards treating 0 and 1 similarly in that
>>>> > scenario,
>>>> > > > > because it favours the person thinking about setting the
>>>> > configurations,
>>>> > > > > and a person doesn't care about a 1ms edge case especially when
>>>> the
>>>> > > > context
>>>> > > > > is the true minimum is tied to the log cleaner cadence.
>>>> > > > >
>>>> > > > > Introducing 0 to mean "disabled" because there is some
>>>> uniquness in
>>>> > > > > segment.ms not being able to be set to 0, reduces configuration
>>>> > > > consistency
>>>> > > > > in favour of capturing a MS gap in an edge case that nobody
>>>> would
>>>> > ever
>>>> > > > > notice. For someone to understand why everywhere else -1 is
>>>> used to
>>>> > > > 

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-11-09 Thread xiongqi wu
Thanks Dong.
I have updated the KIP.

Xiongqi (Wesley) Wu


On Fri, Nov 9, 2018 at 5:31 PM Dong Lin  wrote:

> Thanks for the KIP Xiongqi. LGTM. +1 (binding)
>
> One minor comment: it may be a bit better to clarify in the public
> interface section that the value of the newly added metric is determined
> based by applying that formula across all compactable segments. For
> example:
>
> The maximum value of Math.max(now -
> earliest_timestamp_in_ms_of_uncompacted_segment - max.compaction.lag.ms,
> 0)/1000 across all compactable partitions, where the max.compaction.lag.ms
> can be overridden on per-topic basis.
>
>
>
> On Fri, Nov 9, 2018 at 5:16 PM xiongqi wu  wrote:
>
> > Thanks Joel.
> > Tracking the delay at second granularity makes sense
> > I have updated KIP.
> >
> > Xiongqi (Wesley) Wu
> >
> >
> > On Fri, Nov 9, 2018 at 5:05 PM Joel Koshy  wrote:
> >
> > > +1 with one suggestion on the proposed metric. You should probably
> > include
> > > the unit. So for e.g., max-compaction-delay-secs.
> > >
> > > Joel
> > >
> > > On Tue, Nov 6, 2018 at 5:30 PM xiongqi wu  wrote:
> > >
> > > > bump
> > > > Xiongqi (Wesley) Wu
> > > >
> > > >
> > > > On Thu, Sep 27, 2018 at 4:20 PM xiongqi wu 
> > wrote:
> > > >
> > > > >
> > > > > Thanks Eno, Brett, Dong, Guozhang, Colin,  and Xiaohe for feedback.
> > > > > Can I have more feedback or VOTE on this KIP?
> > > > >
> > > > >
> > > > > Xiongqi (Wesley) Wu
> > > > >
> > > > >
> > > > > On Wed, Sep 19, 2018 at 10:52 AM xiongqi wu 
> > > wrote:
> > > > >
> > > > >> Any other votes or comments?
> > > > >>
> > > > >> Xiongqi (Wesley) Wu
> > > > >>
> > > > >>
> > > > >> On Tue, Sep 11, 2018 at 11:45 AM xiongqi wu 
> > > > wrote:
> > > > >>
> > > > >>> Yes, more votes and code review.
> > > > >>>
> > > > >>> Xiongqi (Wesley) Wu
> > > > >>>
> > > > >>>
> > > > >>> On Mon, Sep 10, 2018 at 11:37 PM Brett Rann
> > >  > > > >
> > > > >>> wrote:
> > > > >>>
> > > > >>>> +1 (non binding) from on 0 then, and on the KIP.
> > > > >>>>
> > > > >>>> Where do we go from here? More votes?
> > > > >>>>
> > > > >>>> On Tue, Sep 11, 2018 at 5:34 AM Colin McCabe <
> cmcc...@apache.org>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>> > On Mon, Sep 10, 2018, at 11:44, xiongqi wu wrote:
> > > > >>>> > > Thank you for comments. I will use '0' for now.
> > > > >>>> > >
> > > > >>>> > > If we create topics through admin client in the future, we
> > might
> > > > >>>> perform
> > > > >>>> > > some useful checks. (but the assumption is all brokers in
> the
> > > same
> > > > >>>> > cluster
> > > > >>>> > > have the same default configurations value, otherwise,it
> might
> > > > >>>> still be
> > > > >>>> > > tricky to do such cross validation check.)
> > > > >>>> >
> > > > >>>> > This isn't something that we might do in the future-- this is
> > > > >>>> something we
> > > > >>>> > are doing now. We already have Create Topic policies which are
> > > > >>>> enforced by
> > > > >>>> > the broker. Check KIP-108 and KIP-170 for details. This is one
> > of
> > > > the
> > > > >>>> > motivations for getting rid of direct ZK access-- making sure
> > that
> > > > >>>> these
> > > > >>>> > policies are applied.
> > > > >>>> >
> > > > >>>> > I agree that having different configurations on different
> > brokers
> > > > can
> > > > >>>> be
> > > > >>>> > confusing and frustrating . That's why more configurations are
> > > being
&g

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-11-09 Thread xiongqi wu
Thanks Joel.
Tracking the delay at second granularity makes sense
I have updated KIP.

Xiongqi (Wesley) Wu


On Fri, Nov 9, 2018 at 5:05 PM Joel Koshy  wrote:

> +1 with one suggestion on the proposed metric. You should probably include
> the unit. So for e.g., max-compaction-delay-secs.
>
> Joel
>
> On Tue, Nov 6, 2018 at 5:30 PM xiongqi wu  wrote:
>
> > bump
> > Xiongqi (Wesley) Wu
> >
> >
> > On Thu, Sep 27, 2018 at 4:20 PM xiongqi wu  wrote:
> >
> > >
> > > Thanks Eno, Brett, Dong, Guozhang, Colin,  and Xiaohe for feedback.
> > > Can I have more feedback or VOTE on this KIP?
> > >
> > >
> > > Xiongqi (Wesley) Wu
> > >
> > >
> > > On Wed, Sep 19, 2018 at 10:52 AM xiongqi wu 
> wrote:
> > >
> > >> Any other votes or comments?
> > >>
> > >> Xiongqi (Wesley) Wu
> > >>
> > >>
> > >> On Tue, Sep 11, 2018 at 11:45 AM xiongqi wu 
> > wrote:
> > >>
> > >>> Yes, more votes and code review.
> > >>>
> > >>> Xiongqi (Wesley) Wu
> > >>>
> > >>>
> > >>> On Mon, Sep 10, 2018 at 11:37 PM Brett Rann
>  > >
> > >>> wrote:
> > >>>
> > >>>> +1 (non binding) from on 0 then, and on the KIP.
> > >>>>
> > >>>> Where do we go from here? More votes?
> > >>>>
> > >>>> On Tue, Sep 11, 2018 at 5:34 AM Colin McCabe 
> > >>>> wrote:
> > >>>>
> > >>>> > On Mon, Sep 10, 2018, at 11:44, xiongqi wu wrote:
> > >>>> > > Thank you for comments. I will use '0' for now.
> > >>>> > >
> > >>>> > > If we create topics through admin client in the future, we might
> > >>>> perform
> > >>>> > > some useful checks. (but the assumption is all brokers in the
> same
> > >>>> > cluster
> > >>>> > > have the same default configurations value, otherwise,it might
> > >>>> still be
> > >>>> > > tricky to do such cross validation check.)
> > >>>> >
> > >>>> > This isn't something that we might do in the future-- this is
> > >>>> something we
> > >>>> > are doing now. We already have Create Topic policies which are
> > >>>> enforced by
> > >>>> > the broker. Check KIP-108 and KIP-170 for details. This is one of
> > the
> > >>>> > motivations for getting rid of direct ZK access-- making sure that
> > >>>> these
> > >>>> > policies are applied.
> > >>>> >
> > >>>> > I agree that having different configurations on different brokers
> > can
> > >>>> be
> > >>>> > confusing and frustrating . That's why more configurations are
> being
> > >>>> made
> > >>>> > dynamic using KIP-226. Dynamic configurations are stored centrally
> > in
> > >>>> ZK,
> > >>>> > so they are the same on all brokers (modulo propagation delays).
> In
> > >>>> any
> > >>>> > case, this is a general issue, not specific to "create topics".
> > >>>> >
> > >>>> > cheers,
> > >>>> > Colin
> > >>>> >
> > >>>> >
> > >>>> > >
> > >>>> > >
> > >>>> > > Xiongqi (Wesley) Wu
> > >>>> > >
> > >>>> > >
> > >>>> > > On Mon, Sep 10, 2018 at 11:15 AM Colin McCabe <
> cmcc...@apache.org
> > >
> > >>>> > wrote:
> > >>>> > >
> > >>>> > > > I don't have a strong opinion. But I think we should probably
> be
> > >>>> > > > consistent with how segment.ms works, and just use 0.
> > >>>> > > >
> > >>>> > > > best,
> > >>>> > > > Colin
> > >>>> > > >
> > >>>> > > >
> > >>>> > > > On Wed, Sep 5, 2018, at 21:19, Brett Rann wrote:
> > >>>> > > > > OK thanks for that clarification. I see why you're
> > uncomfortable
> > >>>>

Re: [DISCUSS] KIP-388 Add observer interface to record request and response

2018-11-13 Thread xiongqi wu
Lincong,

Thanks for the KIP.
I have a question about the lifecycle of request and response.
With the current (requestAdapter, responseAdapter) implementation,
the observer can potentially extend the lifecycle of request and response
through adapter.
Anyone can implement own observer, and some observers may want to do async
process or batched processing.

Could you clarify how could we make sure this do not increase the memory
pressure on potentially holding large request/response object?



Xiongqi (Wesley) Wu


On Mon, Nov 12, 2018 at 10:23 PM Lincong Li  wrote:

> Thanks Mayuresh, Ismael and Colin for your feedback!
>
> I updated the KIP basing on your feedback. The change is basically that two
> interfaces are introduced to prevent the internal classes from being
> exposed. These two interfaces contain getters that allow user to extract
> information from request and response in their own implementation(s) of the
> observer interface and they would not constraint future implementation
> changes in neither RequestChannel.Request nor AbstractResponse. There could
> be more getters defined in these two interfaces. The implementation of
> these two interfaces will be provided as part of the KIP.
>
> I also expanded on "Add code to the broker (in KafkaApis) to allow Kafka
> servers to invoke any
> observers defined. More specifically, change KafkaApis code to invoke all
> defined observers, in the order in which they were defined, for every
> request-response pair" by providing a sample code block which shows how
> these interfaces are used in the KafkaApis class.
>
> Let me know if you have any question, concern, or comments. Thank you very
> much!
>
> Best regards,
> Lincong Li
>
> On Fri, Nov 9, 2018 at 10:34 AM Mayuresh Gharat <
> gharatmayures...@gmail.com>
> wrote:
>
> > Hi Lincong,
> >
> > Thanks for the KIP.
> >
> > As Colin pointed out, would it better to expose certain specific pieces
> of
> > information from the request/response like api key, request headers,
> record
> > counts, client ID instead of the entire request/response objects ? This
> > enables us to change the request response apis independently of this
> > pluggable public API, in future, unless you think we have a strong reason
> > that we need to expose the request, response objects.
> >
> > Also, it would be great if you can expand on :
> > "Add code to the broker (in KafkaApis) to allow Kafka servers to invoke
> any
> > observers defined. More specifically, change KafkaApis code to invoke all
> > defined observers, in the order in which they were defined, for every
> > request-response pair."
> > probably with an example of how you visualize it. It would help the KIP
> to
> > be more concrete and easier to understand the end to end workflow.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Thu, Nov 8, 2018 at 7:44 PM Ismael Juma  wrote:
> >
> > > I agree, the current KIP doesn't discuss the public API that we would
> be
> > > exposing and it's extensive if the normal usage would allow for casting
> > > AbstractRequest into the various subclasses and potentially even
> > accessing
> > > Records and related for produce request.
> > >
> > > There are many use cases where this could be useful, but it requires
> > quite
> > > a bit of thinking around the APIs that we expose and the expected
> usage.
> > >
> > > Ismael
> > >
> > > On Thu, Nov 8, 2018, 6:09 PM Colin McCabe  > >
> > > > Hi Lincong Li,
> > > >
> > > > I agree that server-side instrumentation is helpful.  However, I
> don't
> > > > think this is the right approach.
> > > >
> > > > The problem is that RequestChannel.Request and AbstractResponse are
> > > > internal classes that should not be exposed.  These are
> implementation
> > > > details that we may change in the future.  Freezing these into a
> public
> > > API
> > > > would really hold back the project.  For example, for really large
> > > > responses, we might eventually want to avoid materializing the whole
> > > > response all at once.  It would make more sense to return it in a
> > > streaming
> > > > fashion.  But if we need to support this API forever, we can't do
> that.
> > > >
> > > > I think it's fair to say that this is, at best, half a solution to
> the
> > > > problem of tracing requests.  Users still need to write the plugin
> code
> > > and
> > > > arrange for it to be on their classpath to make this work.  I think
> the
> > > > alternative here is not client-side instrumentation, but simply
> making
> > > the
> > > > change to the broker without using a plugin interface.
> > > >
> > > > If a public interface is absolutely necessary here we should expose
> > only
> > > > things like the API key, client ID, time, etc. that don't constrain
> the
> > > > implementation a lot in the future.  I think we should also use java
> > here
> > > > to avoid the compatibility issues we have had with Scala APIs in the
> > > past.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Thu, Nov 8, 2018, at 11:34, 

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-11-06 Thread xiongqi wu
Dong,

Thanks for the comments.   I have updated the KIP based on your comments.

below is reply to your questions:

1.  We only calculate this metric for log compaction that is determined by
max compaction lag. So we only collect non-negative metrics.  The log
cleaner is consistently running with some back off time if no job needs to
be done.
The max is the max among all log cleaner threads in their latest run not
the historical max.  This is similar to existing metric
"max-clean-time-secs".  I now mentioned this is metric from each thread in
the KIP.
User can look at the historical data to track how delay changes over time
(similar as other log cleaner metrics).

Another way of defining this metric is : "compaction_finish_time -
earliest_timestamp_of_first_uncompacted_segment".  So it is not w.r.t.
However,  max compaction lag may vary for different topics, and this
doesn't really tell how soon a compaction request is fulfilled after max
compaction lag.  What do you think?

2.  This is intent to track whether the latest logs compacted are
determined by max compaction lag.
The metric will be updated for each log cleaner run. If there are 2 two log
cleaner threads, and they both work on log partitions determined by "max
compaction lag" in their last run,  the value of this metric will be 2.
The previous metric doesn't provide this information if there are more than
one log cleaner thread.

3. I meant to say it is required to be picked up by log compaction after
this max lag. But the actual compaction finish time may vary, since the log
cleaner may take time to finish compaction on this partition or log cleaner
may work on other partition first.
Guarantee may be misleading, I have updated the KIP.

4. It is determined based on the cleaner checkpoint file.  This KIP doesn't
change how broker determined the un-compacted segments.
5.  done.
6.  Why should we need to make this feature depends upon message
timestamp?  "segment.largestTimestamp - maxSegmentMs" is
a reasonable estimate to determine the violation of max compaction lag,
and this estimate is only needed if the first segment of a log partition is
un-compacted.
7.  I removed unrelated part, and specifically mentioned the added
metric "num-logs-compacted-by-max-compaction-lag"
can be used for this performance impact measurement.

Xiongqi (Wesley) Wu


On Tue, Nov 6, 2018 at 6:50 PM Dong Lin  wrote:

> Hey Xiongqi,
>
> Thanks for the update. A few more comments below
>
> 1) According to the definition of
> kafka.log:type=LogCleaner,name=max-compaction-delay, it seems that the
> metric value will be a large negative number if max.compaction.lag.ms is
> MAX_LONG. Would this be a problem? Also, it seems weird that the value of
> the metric is defined w.r.t. how often the log cleaner is run.
>
> 2) Not sure if we need the metric num-logs-compacted-by-max-compaction-lag
> in addition to max-compaction-delay. It seems that operator can just use
> max-compaction-delay to determine whether the max.compaction.lag is
> properly enforced in a quantitative manner. Also, the metric name
> `num-logs-compacted-by-max-compaction-lag` is inconsistent with its
> intended meaning, i.e. the number of logs that needs to be compacted due to
> max.compaction.lag but not yet compacted. So it is probably simple to just
> remove this metric.
>
> 3) The KIP currently says that "a message record has a guaranteed
> upper-bound in time to become mandatory for compaction". The word
> "guarantee" may be misleading because the message may still not be
> compacted within max.compaction.lag after its creation. Could you clarify
> the exact semantics of the max.compaction.lag.ms in the Public Interface
> section?
>
> 4) The KIP's proposed change will estimate earliest message timestamp for
> un-compacted log segments. Can you explain how broker determines whether a
> segment has been compacted after the broker is restarted?
>
> 5) 2.b in Proposed Change section provides two way to get timestamp. To
> make the KIP easier to read for future reference, could we just mention the
> method that we plan to use and move the other solution to the rejected
> alternative section?
>
> 6) Based on the discussion (i.e. point 2 in the previous email), it is said
> that we can assume all messages have timestamp and the feature added in
> this KIP can be skipped for those messages which do not have timestamp. So
> do we still need to use "segment.largestTimestamp - maxSegmentMs" in
> Proposed Change section 2.a?
>
> 7) Based on the discussion (i.e. point 8 in the previous email), if this
> KIP requires user to monitor certain existing metrics for performance
> impact added in this KIP, can we list the metrics in the KIP for user's
> convenience?
>
>
> Thanks,
> Dong
>
> O

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-11-12 Thread xiongqi wu
Hi all,

Can I have one more vote on this KIP?
Any comment is appreciated.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Add+a+Maximum+Log+Compaction+Lag


Xiongqi (Wesley) Wu


On Fri, Nov 9, 2018 at 7:56 PM xiongqi wu  wrote:

> Thanks Dong.
> I have updated the KIP.
>
> Xiongqi (Wesley) Wu
>
>
> On Fri, Nov 9, 2018 at 5:31 PM Dong Lin  wrote:
>
>> Thanks for the KIP Xiongqi. LGTM. +1 (binding)
>>
>> One minor comment: it may be a bit better to clarify in the public
>> interface section that the value of the newly added metric is determined
>> based by applying that formula across all compactable segments. For
>> example:
>>
>> The maximum value of Math.max(now -
>> earliest_timestamp_in_ms_of_uncompacted_segment - max.compaction.lag.ms,
>> 0)/1000 across all compactable partitions, where the
>> max.compaction.lag.ms
>> can be overridden on per-topic basis.
>>
>>
>>
>> On Fri, Nov 9, 2018 at 5:16 PM xiongqi wu  wrote:
>>
>> > Thanks Joel.
>> > Tracking the delay at second granularity makes sense
>> > I have updated KIP.
>> >
>> > Xiongqi (Wesley) Wu
>> >
>> >
>> > On Fri, Nov 9, 2018 at 5:05 PM Joel Koshy  wrote:
>> >
>> > > +1 with one suggestion on the proposed metric. You should probably
>> > include
>> > > the unit. So for e.g., max-compaction-delay-secs.
>> > >
>> > > Joel
>> > >
>> > > On Tue, Nov 6, 2018 at 5:30 PM xiongqi wu 
>> wrote:
>> > >
>> > > > bump
>> > > > Xiongqi (Wesley) Wu
>> > > >
>> > > >
>> > > > On Thu, Sep 27, 2018 at 4:20 PM xiongqi wu 
>> > wrote:
>> > > >
>> > > > >
>> > > > > Thanks Eno, Brett, Dong, Guozhang, Colin,  and Xiaohe for
>> feedback.
>> > > > > Can I have more feedback or VOTE on this KIP?
>> > > > >
>> > > > >
>> > > > > Xiongqi (Wesley) Wu
>> > > > >
>> > > > >
>> > > > > On Wed, Sep 19, 2018 at 10:52 AM xiongqi wu 
>> > > wrote:
>> > > > >
>> > > > >> Any other votes or comments?
>> > > > >>
>> > > > >> Xiongqi (Wesley) Wu
>> > > > >>
>> > > > >>
>> > > > >> On Tue, Sep 11, 2018 at 11:45 AM xiongqi wu > >
>> > > > wrote:
>> > > > >>
>> > > > >>> Yes, more votes and code review.
>> > > > >>>
>> > > > >>> Xiongqi (Wesley) Wu
>> > > > >>>
>> > > > >>>
>> > > > >>> On Mon, Sep 10, 2018 at 11:37 PM Brett Rann
>> > > > > > > >
>> > > > >>> wrote:
>> > > > >>>
>> > > > >>>> +1 (non binding) from on 0 then, and on the KIP.
>> > > > >>>>
>> > > > >>>> Where do we go from here? More votes?
>> > > > >>>>
>> > > > >>>> On Tue, Sep 11, 2018 at 5:34 AM Colin McCabe <
>> cmcc...@apache.org>
>> > > > >>>> wrote:
>> > > > >>>>
>> > > > >>>> > On Mon, Sep 10, 2018, at 11:44, xiongqi wu wrote:
>> > > > >>>> > > Thank you for comments. I will use '0' for now.
>> > > > >>>> > >
>> > > > >>>> > > If we create topics through admin client in the future, we
>> > might
>> > > > >>>> perform
>> > > > >>>> > > some useful checks. (but the assumption is all brokers in
>> the
>> > > same
>> > > > >>>> > cluster
>> > > > >>>> > > have the same default configurations value, otherwise,it
>> might
>> > > > >>>> still be
>> > > > >>>> > > tricky to do such cross validation check.)
>> > > > >>>> >
>> > > > >>>> > This isn't something that we might do in the future-- this is
>> > > > >>>> something we
>> > > > >>>> > are doing now. We already have Create Topic policies which
>> are
>> > > > >>>> enforced by
>> > 

Re: [DISCUSS] KIP-354 Time-based log compaction policy

2018-09-03 Thread xiongqi wu
Brett,

Yes, I will post PR tomorrow.

Xiongqi (Wesley) Wu


On Sun, Sep 2, 2018 at 6:28 PM Brett Rann  wrote:

> +1 (non-binding) from me on the interface. I'd like to see someone familiar
> with
> the code comment on the approach, and note there's a couple of different
> approaches: what's documented in the KIP, and what Xiaohe Dong was working
> on
> here:
>
> https://github.com/dongxiaohe/kafka/tree/dongxiaohe/log-cleaner-compaction-max-lifetime-2.0
>
> If you have code working already Xiongqi Wu could you share a PR? I'd be
> happy
> to start testing.
>
> On Tue, Aug 28, 2018 at 5:57 AM xiongqi wu  wrote:
>
> > Hi All,
> >
> > Do you have any additional comments on this KIP?
> >
> >
> > On Thu, Aug 16, 2018 at 9:17 PM, xiongqi wu  wrote:
> >
> > > on 2)
> > > The offsetmap is built starting from dirty segment.
> > > The compaction starts from the beginning of the log partition. That's
> how
> > > it ensure the deletion of tomb keys.
> > > I will double check tomorrow.
> > >
> > > Xiongqi (Wesley) Wu
> > >
> > >
> > > On Thu, Aug 16, 2018 at 6:46 PM Brett Rann 
> > > wrote:
> > >
> > >> To just clarify a bit on 1. whether there's an external storage/DB
> isn't
> > >> relevant here.
> > >> Compacted topics allow a tombstone record to be sent (a null value
> for a
> > >> key) which
> > >> currently will result in old values for that key being deleted if some
> > >> conditions are met.
> > >> There are existing controls to make sure the old values will stay
> around
> > >> for a minimum
> > >> time at least, but no dedicated control to ensure the tombstone will
> > >> delete
> > >> within a
> > >> maximum time.
> > >>
> > >> One popular reason that maximum time for deletion is desirable right
> now
> > >> is
> > >> GDPR with
> > >> PII. But we're not proposing any GDPR awareness in kafka, just being
> > able
> > >> to guarantee
> > >> a max time where a tombstoned key will be removed from the compacted
> > >> topic.
> > >>
> > >> on 2)
> > >> huh, i thought it kept track of the first dirty segment and didn't
> > >> recompact older "clean" ones.
> > >> But I didn't look at code or test for that.
> > >>
> > >> On Fri, Aug 17, 2018 at 10:57 AM xiongqi wu 
> > wrote:
> > >>
> > >> > 1, Owner of data (in this sense, kafka is the not the owner of data)
> > >> > should keep track of lifecycle of the data in some external
> > storage/DB.
> > >> > The owner determines when to delete the data and send the delete
> > >> request to
> > >> > kafka. Kafka doesn't know about the content of data but to provide a
> > >> mean
> > >> > for deletion.
> > >> >
> > >> > 2 , each time compaction runs, it will start from first segments (no
> > >> > matter if it is compacted or not). The time estimation here is only
> > used
> > >> > to determine whether we should run compaction on this log partition.
> > So
> > >> we
> > >> > only need to estimate uncompacted segments.
> > >> >
> > >> > On Thu, Aug 16, 2018 at 5:35 PM, Dong Lin 
> > wrote:
> > >> >
> > >> > > Hey Xiongqi,
> > >> > >
> > >> > > Thanks for the update. I have two questions for the latest KIP.
> > >> > >
> > >> > > 1) The motivation section says that one use case is to delete PII
> > >> > (Personal
> > >> > > Identifiable information) data within 7 days while keeping non-PII
> > >> > > indefinitely in compacted format. I suppose the use-case depends
> on
> > >> the
> > >> > > application to determine when to delete those PII data. Could you
> > >> explain
> > >> > > how can application reliably determine the set of keys that should
> > be
> > >> > > deleted? Is application required to always messages from the topic
> > >> after
> > >> > > every restart and determine the keys to be deleted by looking at
> > >> message
> > >> > > timestamp, or is application supposed to persist the key->
> timstamp
> > >> > > information in a separate persistent st

Re: [DISCUSS] KIP-370: Remove Orphan Partitions

2018-09-20 Thread xiongqi wu
Colin,

Thanks for the comment.
1)
auto.orphan.partition.removal.delay.ms refers to timeout since the first
leader and ISR request was received.  The idea is we want to wait enough
time to receive up-to-dated leaderandISR request and any old or new
partitions reassignment requests.

2)
Is there any logic to remove the partition folders on disk?  I can only
find references to removing older log segments, but not the folder, in the
KIP.
==> yes, the plan is to remove partition folders as well.

I will update the KIP to make it more clear.


Xiongqi (Wesley) Wu


On Thu, Sep 20, 2018 at 5:02 PM Colin McCabe  wrote:

> Hi Xiongqi,
>
> Thanks for the KIP.
>
> Can you be a bit more clear what the timeout
> auto.orphan.partition.removal.delay.ms refers to?  Is the timeout
> measured since the partition was supposed to be on the broker?  Or is the
> timeout measured since the broker started up?
>
> Is there any logic to remove the partition folders on disk?  I can only
> find references to removing older log segments, but not the folder, in the
> KIP.
>
> best,
> Colin
>
> On Wed, Sep 19, 2018, at 10:53, xiongqi wu wrote:
> > Any comments?
> >
> > Xiongqi (Wesley) Wu
> >
> >
> > On Mon, Sep 10, 2018 at 3:04 PM xiongqi wu  wrote:
> >
> > > Here is the implementation for the KIP 370.
> > >
> > >
> > >
> https://github.com/xiowu0/kafka/commit/f1bd3085639f41a7af02567550a8e3018cfac3e9
> > >
> > >
> > > The purpose is to do one time cleanup (after a configured delay) of
> orphan
> > > partitions when a broker starts up.
> > >
> > >
> > > Xiongqi (Wesley) Wu
> > >
> > >
> > > On Wed, Sep 5, 2018 at 10:51 AM xiongqi wu 
> wrote:
> > >
> > >>
> > >> This KIP enables broker to remove orphan partitions automatically.
> > >>
> > >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-370%3A+Remove+Orphan+Partitions
> > >>
> > >>
> > >> Xiongqi (Wesley) Wu
> > >>
> > >
>


Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-19 Thread xiongqi wu
Any other votes or comments?

Xiongqi (Wesley) Wu


On Tue, Sep 11, 2018 at 11:45 AM xiongqi wu  wrote:

> Yes, more votes and code review.
>
> Xiongqi (Wesley) Wu
>
>
> On Mon, Sep 10, 2018 at 11:37 PM Brett Rann 
> wrote:
>
>> +1 (non binding) from on 0 then, and on the KIP.
>>
>> Where do we go from here? More votes?
>>
>> On Tue, Sep 11, 2018 at 5:34 AM Colin McCabe  wrote:
>>
>> > On Mon, Sep 10, 2018, at 11:44, xiongqi wu wrote:
>> > > Thank you for comments. I will use '0' for now.
>> > >
>> > > If we create topics through admin client in the future, we might
>> perform
>> > > some useful checks. (but the assumption is all brokers in the same
>> > cluster
>> > > have the same default configurations value, otherwise,it might still
>> be
>> > > tricky to do such cross validation check.)
>> >
>> > This isn't something that we might do in the future-- this is something
>> we
>> > are doing now. We already have Create Topic policies which are enforced
>> by
>> > the broker. Check KIP-108 and KIP-170 for details. This is one of the
>> > motivations for getting rid of direct ZK access-- making sure that these
>> > policies are applied.
>> >
>> > I agree that having different configurations on different brokers can be
>> > confusing and frustrating . That's why more configurations are being
>> made
>> > dynamic using KIP-226. Dynamic configurations are stored centrally in
>> ZK,
>> > so they are the same on all brokers (modulo propagation delays). In any
>> > case, this is a general issue, not specific to "create topics".
>> >
>> > cheers,
>> > Colin
>> >
>> >
>> > >
>> > >
>> > > Xiongqi (Wesley) Wu
>> > >
>> > >
>> > > On Mon, Sep 10, 2018 at 11:15 AM Colin McCabe 
>> > wrote:
>> > >
>> > > > I don't have a strong opinion. But I think we should probably be
>> > > > consistent with how segment.ms works, and just use 0.
>> > > >
>> > > > best,
>> > > > Colin
>> > > >
>> > > >
>> > > > On Wed, Sep 5, 2018, at 21:19, Brett Rann wrote:
>> > > > > OK thanks for that clarification. I see why you're uncomfortable
>> > with 0
>> > > > now.
>> > > > >
>> > > > > I'm not really fussed. I just prefer consistency in configuration
>> > > > options.
>> > > > >
>> > > > > Personally I lean towards treating 0 and 1 similarly in that
>> > scenario,
>> > > > > because it favours the person thinking about setting the
>> > configurations,
>> > > > > and a person doesn't care about a 1ms edge case especially when
>> the
>> > > > context
>> > > > > is the true minimum is tied to the log cleaner cadence.
>> > > > >
>> > > > > Introducing 0 to mean "disabled" because there is some uniquness
>> in
>> > > > > segment.ms not being able to be set to 0, reduces configuration
>> > > > consistency
>> > > > > in favour of capturing a MS gap in an edge case that nobody would
>> > ever
>> > > > > notice. For someone to understand why everywhere else -1 is used
>> to
>> > > > > disable, but here 0 is used, they would need to learn about
>> > segment.ms
>> > > > > having a 1ms minimum and then after learning would think "who
>> cares
>> > about
>> > > > > 1ms?" in this context. I would anyway :)
>> > > > >
>> > > > > my 2c anyway. Will again defer to majority. Curious which way
>> Colin
>> > falls
>> > > > > now.
>> > > > >
>> > > > > Don't want to spend more time on this though, It's well into
>> > > > bikeshedding
>> > > > > territory now. :)
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Thu, Sep 6, 2018 at 1:31 PM xiongqi wu 
>> > wrote:
>> > > > >
>> > > > > > I want to honor the minimum value of segment.ms (which is 1ms)
>> to
>> > > > force
>> > > > > > roll an active segment.
>> > > > > > So if we set "max.compaction.l

Re: [DISCUSS] KIP-370: Remove Orphan Partitions

2018-09-19 Thread xiongqi wu
Any comments?

Xiongqi (Wesley) Wu


On Mon, Sep 10, 2018 at 3:04 PM xiongqi wu  wrote:

> Here is the implementation for the KIP 370.
>
>
> https://github.com/xiowu0/kafka/commit/f1bd3085639f41a7af02567550a8e3018cfac3e9
>
>
> The purpose is to do one time cleanup (after a configured delay) of orphan
> partitions when a broker starts up.
>
>
> Xiongqi (Wesley) Wu
>
>
> On Wed, Sep 5, 2018 at 10:51 AM xiongqi wu  wrote:
>
>>
>> This KIP enables broker to remove orphan partitions automatically.
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-370%3A+Remove+Orphan+Partitions
>>
>>
>> Xiongqi (Wesley) Wu
>>
>


Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-27 Thread xiongqi wu
Thanks Eno, Brett, Dong, Guozhang, Colin,  and Xiaohe for feedback.
Can I have more feedback or VOTE on this KIP?


Xiongqi (Wesley) Wu


On Wed, Sep 19, 2018 at 10:52 AM xiongqi wu  wrote:

> Any other votes or comments?
>
> Xiongqi (Wesley) Wu
>
>
> On Tue, Sep 11, 2018 at 11:45 AM xiongqi wu  wrote:
>
>> Yes, more votes and code review.
>>
>> Xiongqi (Wesley) Wu
>>
>>
>> On Mon, Sep 10, 2018 at 11:37 PM Brett Rann 
>> wrote:
>>
>>> +1 (non binding) from on 0 then, and on the KIP.
>>>
>>> Where do we go from here? More votes?
>>>
>>> On Tue, Sep 11, 2018 at 5:34 AM Colin McCabe  wrote:
>>>
>>> > On Mon, Sep 10, 2018, at 11:44, xiongqi wu wrote:
>>> > > Thank you for comments. I will use '0' for now.
>>> > >
>>> > > If we create topics through admin client in the future, we might
>>> perform
>>> > > some useful checks. (but the assumption is all brokers in the same
>>> > cluster
>>> > > have the same default configurations value, otherwise,it might still
>>> be
>>> > > tricky to do such cross validation check.)
>>> >
>>> > This isn't something that we might do in the future-- this is
>>> something we
>>> > are doing now. We already have Create Topic policies which are
>>> enforced by
>>> > the broker. Check KIP-108 and KIP-170 for details. This is one of the
>>> > motivations for getting rid of direct ZK access-- making sure that
>>> these
>>> > policies are applied.
>>> >
>>> > I agree that having different configurations on different brokers can
>>> be
>>> > confusing and frustrating . That's why more configurations are being
>>> made
>>> > dynamic using KIP-226. Dynamic configurations are stored centrally in
>>> ZK,
>>> > so they are the same on all brokers (modulo propagation delays). In any
>>> > case, this is a general issue, not specific to "create topics".
>>> >
>>> > cheers,
>>> > Colin
>>> >
>>> >
>>> > >
>>> > >
>>> > > Xiongqi (Wesley) Wu
>>> > >
>>> > >
>>> > > On Mon, Sep 10, 2018 at 11:15 AM Colin McCabe 
>>> > wrote:
>>> > >
>>> > > > I don't have a strong opinion. But I think we should probably be
>>> > > > consistent with how segment.ms works, and just use 0.
>>> > > >
>>> > > > best,
>>> > > > Colin
>>> > > >
>>> > > >
>>> > > > On Wed, Sep 5, 2018, at 21:19, Brett Rann wrote:
>>> > > > > OK thanks for that clarification. I see why you're uncomfortable
>>> > with 0
>>> > > > now.
>>> > > > >
>>> > > > > I'm not really fussed. I just prefer consistency in configuration
>>> > > > options.
>>> > > > >
>>> > > > > Personally I lean towards treating 0 and 1 similarly in that
>>> > scenario,
>>> > > > > because it favours the person thinking about setting the
>>> > configurations,
>>> > > > > and a person doesn't care about a 1ms edge case especially when
>>> the
>>> > > > context
>>> > > > > is the true minimum is tied to the log cleaner cadence.
>>> > > > >
>>> > > > > Introducing 0 to mean "disabled" because there is some uniquness
>>> in
>>> > > > > segment.ms not being able to be set to 0, reduces configuration
>>> > > > consistency
>>> > > > > in favour of capturing a MS gap in an edge case that nobody would
>>> > ever
>>> > > > > notice. For someone to understand why everywhere else -1 is used
>>> to
>>> > > > > disable, but here 0 is used, they would need to learn about
>>> > segment.ms
>>> > > > > having a 1ms minimum and then after learning would think "who
>>> cares
>>> > about
>>> > > > > 1ms?" in this context. I would anyway :)
>>> > > > >
>>> > > > > my 2c anyway. Will again defer to majority. Curious which way
>>> Colin
>>> > falls
>>> > > > > now.
>>> > > > >
>

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-12-06 Thread xiongqi wu
Thanks Colin, Dong, and Joel for voting.
I have masked this KIP as accepted.

The pull request is up:
https://github.com/apache/kafka/pull/6009



Xiongqi (Wesley) Wu


On Thu, Dec 6, 2018 at 10:47 AM Joel Koshy  wrote:

> +1 on the updated KIP.
>
> On Wed, Dec 5, 2018 at 11:56 AM Dong Lin  wrote:
>
> > Thanks for the update. +1 (binding)
> >
> > On Wed, Dec 5, 2018 at 8:19 AM Colin McCabe  wrote:
> >
> > > Thanks, Xiongqi Wu.  +1 (binding)
> > >
> > > regards,
> > > Colin
> > >
> > >
> > > On Tue, Dec 4, 2018, at 20:58, xiongqi (wesley) wu wrote:
> > > > Colin,
> > > >
> > > > Thanks for comments.
> > > > Out of ordered message timestamp is a very good point.
> > > > We can combine max.compaction.lag.ms  with
> > > > log.message.timestamp.difference.max.ms to achieve what we want in
> an
> > > > environment that message timestamp can be shifted a lot.
> > > >
> > > > There are similar discussions regarding log.retention.ms and
> > > > log.message.timestamp.difference.max.ms  in KAFKA-4340.
> > > >
> > > > I agree that we can always use first message timestamp not the
> > > maxTimestamp
> > > > of the previous log segment to make it simple.
> > > >
> > > > I have updated the KIP.
> > > >
> > > > Xiongqi (wesley) Wu
> > > >
> > > >
> > > > On Tue, Dec 4, 2018 at 5:13 PM Colin McCabe 
> > wrote:
> > > >
> > > > > Hi Xiongqi,
> > > > >
> > > > > Thinking about this a little bit more, it seems like we don't have
> > any
> > > > > guarantees just by looking at the timestamp of the first message
> in a
> > > log
> > > > > segment.  Similarly, we don't have any guarantees just by looking
> at
> > > the
> > > > > maxTimestamp of the previous log segment.  Old data could appear
> > > anywhere--
> > > > > you could put data that was years old in the middle of a segment
> from
> > > 2018.
> > > > >
> > > > > However, if log.message.timestamp.difference.max.ms is set, then
> we
> > > can
> > > > > make some actual guarantees that old data gets purged-- which is
> what
> > > the
> > > > > GDPR requires, of course.
> > > > >
> > > > > Overall, maybe we can make KIP-354 simpler by just always looking
> at
> > > the
> > > > > timestamp of the first log message.  I don't think looking at the
> > > > > maxTimestamp of the previous segment is any more accurate.  Aside
> > from
> > > > > that, looks good, since we can get what we need with the
> combination
> > of
> > > > > this and log.message.timestamp.difference.max.ms.
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > On Mon, Nov 26, 2018, at 13:10, xiongqi wu wrote:
> > > > > > Thanks for binding and non-binding votes.
> > > > > > Can I get one more binding vote?
> > > > > >
> > > > > > Thanks in advance!
> > > > > >
> > > > > > Xiongqi (Wesley) Wu
> > > > > >
> > > > > >
> > > > > > On Wed, Nov 14, 2018 at 7:29 PM Matt Farmer 
> wrote:
> > > > > >
> > > > > > > I'm a +1 (non-binding) — This looks like it would have saved
> us a
> > > lot
> > > > > of
> > > > > > > pain in an issue we had to debug recently. I can't go into
> > > details, but
> > > > > > > figuring out how to achieve this effect gave me quite a
> headache.
> > > :)
> > > > > > >
> > > > > > > On Mon, Nov 12, 2018 at 1:00 PM xiongqi wu <
> xiongq...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > Can I have one more vote on this KIP?
> > > > > > > > Any comment is appreciated.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Add+a+Maximum+Log+Compac

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-12-04 Thread xiongqi wu
Colin,

Thanks for comments.
Out of ordered message timestamp is a very good point.
We can combine max.compaction.lag.ms  with
log.message.timestamp.difference.max.ms to achieve what we want in an
environment that message timestamp can be shifted a lot.

There are similar discussions regarding log.retention.ms and
log.message.timestamp.difference.max.ms  in KAFKA-4340.

I agree that we can always use first message timestamp not the maxTimestamp
of the previous log segment to make it simple.

I have updated the KIP.

Xiongqi (Wesley) Wu


On Tue, Dec 4, 2018 at 5:13 PM Colin McCabe  wrote:

> Hi Xiongqi,
>
> Thinking about this a little bit more, it seems like we don't have any
> guarantees just by looking at the timestamp of the first message in a log
> segment.  Similarly, we don't have any guarantees just by looking at the
> maxTimestamp of the previous log segment.  Old data could appear anywhere--
> you could put data that was years old in the middle of a segment from 2018.
>
> However, if log.message.timestamp.difference.max.ms is set, then we can
> make some actual guarantees that old data gets purged-- which is what the
> GDPR requires, of course.
>
> Overall, maybe we can make KIP-354 simpler by just always looking at the
> timestamp of the first log message.  I don't think looking at the
> maxTimestamp of the previous segment is any more accurate.  Aside from
> that, looks good, since we can get what we need with the combination of
> this and log.message.timestamp.difference.max.ms.
>
> best,
> Colin
>
>
> On Mon, Nov 26, 2018, at 13:10, xiongqi wu wrote:
> > Thanks for binding and non-binding votes.
> > Can I get one more binding vote?
> >
> > Thanks in advance!
> >
> > Xiongqi (Wesley) Wu
> >
> >
> > On Wed, Nov 14, 2018 at 7:29 PM Matt Farmer  wrote:
> >
> > > I'm a +1 (non-binding) — This looks like it would have saved us a lot
> of
> > > pain in an issue we had to debug recently. I can't go into details, but
> > > figuring out how to achieve this effect gave me quite a headache. :)
> > >
> > > On Mon, Nov 12, 2018 at 1:00 PM xiongqi wu 
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > Can I have one more vote on this KIP?
> > > > Any comment is appreciated.
> > > >
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Add+a+Maximum+Log+Compaction+Lag
> > > >
> > > >
> > > > Xiongqi (Wesley) Wu
> > > >
> > > >
> > > > On Fri, Nov 9, 2018 at 7:56 PM xiongqi wu 
> wrote:
> > > >
> > > > > Thanks Dong.
> > > > > I have updated the KIP.
> > > > >
> > > > > Xiongqi (Wesley) Wu
> > > > >
> > > > >
> > > > > On Fri, Nov 9, 2018 at 5:31 PM Dong Lin 
> wrote:
> > > > >
> > > > >> Thanks for the KIP Xiongqi. LGTM. +1 (binding)
> > > > >>
> > > > >> One minor comment: it may be a bit better to clarify in the public
> > > > >> interface section that the value of the newly added metric is
> > > determined
> > > > >> based by applying that formula across all compactable segments.
> For
> > > > >> example:
> > > > >>
> > > > >> The maximum value of Math.max(now -
> > > > >> earliest_timestamp_in_ms_of_uncompacted_segment -
> > > max.compaction.lag.ms
> > > > ,
> > > > >> 0)/1000 across all compactable partitions, where the
> > > > >> max.compaction.lag.ms
> > > > >> can be overridden on per-topic basis.
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Fri, Nov 9, 2018 at 5:16 PM xiongqi wu 
> > > wrote:
> > > > >>
> > > > >> > Thanks Joel.
> > > > >> > Tracking the delay at second granularity makes sense
> > > > >> > I have updated KIP.
> > > > >> >
> > > > >> > Xiongqi (Wesley) Wu
> > > > >> >
> > > > >> >
> > > > >> > On Fri, Nov 9, 2018 at 5:05 PM Joel Koshy 
> > > > wrote:
> > > > >> >
> > > > >> > > +1 with one suggestion on the proposed metric. You should
> probably
> > > > >> > include
> > > > >> > > the unit. So for e.g., max-compaction-delay-secs.
> > > > >>

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-11-26 Thread xiongqi wu
Thanks for binding and non-binding votes.
Can I get one more binding vote?

Thanks in advance!

Xiongqi (Wesley) Wu


On Wed, Nov 14, 2018 at 7:29 PM Matt Farmer  wrote:

> I'm a +1 (non-binding) — This looks like it would have saved us a lot of
> pain in an issue we had to debug recently. I can't go into details, but
> figuring out how to achieve this effect gave me quite a headache. :)
>
> On Mon, Nov 12, 2018 at 1:00 PM xiongqi wu  wrote:
>
> > Hi all,
> >
> > Can I have one more vote on this KIP?
> > Any comment is appreciated.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Add+a+Maximum+Log+Compaction+Lag
> >
> >
> > Xiongqi (Wesley) Wu
> >
> >
> > On Fri, Nov 9, 2018 at 7:56 PM xiongqi wu  wrote:
> >
> > > Thanks Dong.
> > > I have updated the KIP.
> > >
> > > Xiongqi (Wesley) Wu
> > >
> > >
> > > On Fri, Nov 9, 2018 at 5:31 PM Dong Lin  wrote:
> > >
> > >> Thanks for the KIP Xiongqi. LGTM. +1 (binding)
> > >>
> > >> One minor comment: it may be a bit better to clarify in the public
> > >> interface section that the value of the newly added metric is
> determined
> > >> based by applying that formula across all compactable segments. For
> > >> example:
> > >>
> > >> The maximum value of Math.max(now -
> > >> earliest_timestamp_in_ms_of_uncompacted_segment -
> max.compaction.lag.ms
> > ,
> > >> 0)/1000 across all compactable partitions, where the
> > >> max.compaction.lag.ms
> > >> can be overridden on per-topic basis.
> > >>
> > >>
> > >>
> > >> On Fri, Nov 9, 2018 at 5:16 PM xiongqi wu 
> wrote:
> > >>
> > >> > Thanks Joel.
> > >> > Tracking the delay at second granularity makes sense
> > >> > I have updated KIP.
> > >> >
> > >> > Xiongqi (Wesley) Wu
> > >> >
> > >> >
> > >> > On Fri, Nov 9, 2018 at 5:05 PM Joel Koshy 
> > wrote:
> > >> >
> > >> > > +1 with one suggestion on the proposed metric. You should probably
> > >> > include
> > >> > > the unit. So for e.g., max-compaction-delay-secs.
> > >> > >
> > >> > > Joel
> > >> > >
> > >> > > On Tue, Nov 6, 2018 at 5:30 PM xiongqi wu 
> > >> wrote:
> > >> > >
> > >> > > > bump
> > >> > > > Xiongqi (Wesley) Wu
> > >> > > >
> > >> > > >
> > >> > > > On Thu, Sep 27, 2018 at 4:20 PM xiongqi wu  >
> > >> > wrote:
> > >> > > >
> > >> > > > >
> > >> > > > > Thanks Eno, Brett, Dong, Guozhang, Colin,  and Xiaohe for
> > >> feedback.
> > >> > > > > Can I have more feedback or VOTE on this KIP?
> > >> > > > >
> > >> > > > >
> > >> > > > > Xiongqi (Wesley) Wu
> > >> > > > >
> > >> > > > >
> > >> > > > > On Wed, Sep 19, 2018 at 10:52 AM xiongqi wu <
> > xiongq...@gmail.com>
> > >> > > wrote:
> > >> > > > >
> > >> > > > >> Any other votes or comments?
> > >> > > > >>
> > >> > > > >> Xiongqi (Wesley) Wu
> > >> > > > >>
> > >> > > > >>
> > >> > > > >> On Tue, Sep 11, 2018 at 11:45 AM xiongqi wu <
> > xiongq...@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > > >>
> > >> > > > >>> Yes, more votes and code review.
> > >> > > > >>>
> > >> > > > >>> Xiongqi (Wesley) Wu
> > >> > > > >>>
> > >> > > > >>>
> > >> > > > >>> On Mon, Sep 10, 2018 at 11:37 PM Brett Rann
> > >> > >  > >> > > > >
> > >> > > > >>> wrote:
> > >> > > > >>>
> > >> > > > >>>> +1 (non binding) from on 0 then, and on the KIP.
> > >> > > > >>>>
> > >> > > > >>>> Wh

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-09-11 Thread xiongqi wu
Yes, more votes and code review.

Xiongqi (Wesley) Wu


On Mon, Sep 10, 2018 at 11:37 PM Brett Rann 
wrote:

> +1 (non binding) from on 0 then, and on the KIP.
>
> Where do we go from here? More votes?
>
> On Tue, Sep 11, 2018 at 5:34 AM Colin McCabe  wrote:
>
> > On Mon, Sep 10, 2018, at 11:44, xiongqi wu wrote:
> > > Thank you for comments. I will use '0' for now.
> > >
> > > If we create topics through admin client in the future, we might
> perform
> > > some useful checks. (but the assumption is all brokers in the same
> > cluster
> > > have the same default configurations value, otherwise,it might still be
> > > tricky to do such cross validation check.)
> >
> > This isn't something that we might do in the future-- this is something
> we
> > are doing now. We already have Create Topic policies which are enforced
> by
> > the broker. Check KIP-108 and KIP-170 for details. This is one of the
> > motivations for getting rid of direct ZK access-- making sure that these
> > policies are applied.
> >
> > I agree that having different configurations on different brokers can be
> > confusing and frustrating . That's why more configurations are being made
> > dynamic using KIP-226. Dynamic configurations are stored centrally in ZK,
> > so they are the same on all brokers (modulo propagation delays). In any
> > case, this is a general issue, not specific to "create topics".
> >
> > cheers,
> > Colin
> >
> >
> > >
> > >
> > > Xiongqi (Wesley) Wu
> > >
> > >
> > > On Mon, Sep 10, 2018 at 11:15 AM Colin McCabe 
> > wrote:
> > >
> > > > I don't have a strong opinion. But I think we should probably be
> > > > consistent with how segment.ms works, and just use 0.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Wed, Sep 5, 2018, at 21:19, Brett Rann wrote:
> > > > > OK thanks for that clarification. I see why you're uncomfortable
> > with 0
> > > > now.
> > > > >
> > > > > I'm not really fussed. I just prefer consistency in configuration
> > > > options.
> > > > >
> > > > > Personally I lean towards treating 0 and 1 similarly in that
> > scenario,
> > > > > because it favours the person thinking about setting the
> > configurations,
> > > > > and a person doesn't care about a 1ms edge case especially when the
> > > > context
> > > > > is the true minimum is tied to the log cleaner cadence.
> > > > >
> > > > > Introducing 0 to mean "disabled" because there is some uniquness in
> > > > > segment.ms not being able to be set to 0, reduces configuration
> > > > consistency
> > > > > in favour of capturing a MS gap in an edge case that nobody would
> > ever
> > > > > notice. For someone to understand why everywhere else -1 is used to
> > > > > disable, but here 0 is used, they would need to learn about
> > segment.ms
> > > > > having a 1ms minimum and then after learning would think "who cares
> > about
> > > > > 1ms?" in this context. I would anyway :)
> > > > >
> > > > > my 2c anyway. Will again defer to majority. Curious which way Colin
> > falls
> > > > > now.
> > > > >
> > > > > Don't want to spend more time on this though, It's well into
> > > > bikeshedding
> > > > > territory now. :)
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Sep 6, 2018 at 1:31 PM xiongqi wu 
> > wrote:
> > > > >
> > > > > > I want to honor the minimum value of segment.ms (which is 1ms)
> to
> > > > force
> > > > > > roll an active segment.
> > > > > > So if we set "max.compaction.lag.ms" any value > 0, the minimum
> of
> > > > > > max.compaction.lag.ms and segment.ms will be used to seal an
> > active
> > > > > > segment.
> > > > > >
> > > > > > If we set max.compaction.lag.ms to 0, the current implementation
> > will
> > > > > > treat it as disabled.
> > > > > >
> > > > > > It is a little bit weird to treat max.compaction.lag=0 the same
> as
> > > > > > max.compaction.lag=1.
> > > > > 

[jira] [Created] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

2018-08-21 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7321:
-

 Summary: ensure timely processing of deletion requests in Kafka 
topic (Time-based log compaction)
 Key: KAFKA-7321
 URL: https://issues.apache.org/jira/browse/KAFKA-7321
 Project: Kafka
  Issue Type: Improvement
  Components: log
Reporter: xiongqi wu


_Compaction enables Kafka to remove old messages that are flagged for deletion 
while other messages can be retained for a relatively longer time.  Today, a 
log segment may remain un-compacted for a long time since the eligibility for 
log compaction is determined based on compaction ratio 
(“min.cleanable.dirty.ratio”) and min compaction lag ("min.compaction.lag.ms") 
setting.  Ability to delete a log message through compaction in a timely manner 
has become an important requirement in some use cases (e.g., GDPR).  For 
example,  one use case is to delete PII (Personal Identifiable information) 
data within 7 days while keeping non-PII indefinitely in compacted format.  The 
goal of this change is to provide a time-based compaction policy that ensures 
the cleanable section is compacted after the specified time interval regardless 
of dirty ratio and “min compaction lag”.  However, dirty ratio and “min 
compaction lag” are still honored if the time based compaction rule is not 
violated. In other words, if Kafka receives a deletion request on a key (e..g, 
a key with null value), the corresponding log segment will be picked up for 
compaction after the configured time interval to remove the key._

 

_This is to track effort in KIP 354:_

_https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7322) race between compaction thread and retention thread when changing topic cleanup policy

2018-08-21 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7322:
-

 Summary: race between compaction thread and retention thread when 
changing topic cleanup policy
 Key: KAFKA-7322
 URL: https://issues.apache.org/jira/browse/KAFKA-7322
 Project: Kafka
  Issue Type: Bug
  Components: log
Reporter: xiongqi wu
Assignee: xiongqi wu


The deletion thread will grab the log.lock when it tries to rename log segment 
and schedule for actual deletion.

The compaction thread only grabs the log.lock when it tries to replace the 
original segments with the cleaned segment. The compaction thread doesn't grab 
the log when it reads records from the original segments to build offsetmap and 
new segments. As a result, if both deletion and compaction threads work on the 
same log partition. We have a race condition. 

This race happens when the topic cleanup policy is updated on the fly.  

One case to hit this race condition:

1: topic clean up policy is "compact" initially 

2: log cleaner (compaction) thread picks up the partition for compaction and 
still in progress

3: the topic clean up policy has been updated to "deletion"

4: retention thread pick up the topic partition and delete some old segments.

5: log cleaner thread reads from the deleted log and raise an IO exception. 

 

The proposed solution is to use "inprogress" map that cleaner manager has to 
protect such a race.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2018-08-30 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7362:
-

 Summary: enable kafka broker to remove orphan partitions 
automatically 
 Key: KAFKA-7362
 URL: https://issues.apache.org/jira/browse/KAFKA-7362
 Project: Kafka
  Issue Type: Improvement
  Components: core, log
Reporter: xiongqi wu
Assignee: xiongqi wu


When partition reassignment removes topic partitions from a offline broker, 
those removed partitions become orphan partitions to the broker. When the 
offline broker comes back online, it is not able to clean up both data and 
folders that belong to orphan partitions.  Log manager will scan all all dirs 
during startup, but the time based retention policy on a topic partition will 
not be kicked out until the broker is either a follower or a leader of the 
partition.  In addition, we do not have logic to delete folders that belong to 
orphan partition today. 

Open this ticket to provide a mechanism (when enabled) to safely remove orphan 
partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7501) double deallocation of producer batch upon expiration of inflight requests and error response

2018-10-12 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7501:
-

 Summary: double deallocation of producer batch upon expiration of 
inflight requests and error response
 Key: KAFKA-7501
 URL: https://issues.apache.org/jira/browse/KAFKA-7501
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: xiongqi wu
Assignee: xiongqi wu


The following event sequence will lead to double deallocation of a producer 
batch.

1) a producer batch is sent and the response is not received. 

2) the inflight producer batch is expired when deliveryTimeoutMs has reached.  
The  sender fail the producer batch via "failBatch" and the producer batch is 
deallocated via "accumulator.deallocate(batch)". 

3) the response for the batch finally arrived after batch expiration, and the 
response contains the error "Errors.MESSAGE_TOO_LARGE" .

4) the producer batch is split and the original batch is deallocated a second 
time. As a result, the "IllegalStateException" will be raised. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7650) make "auto.create.topics.enable" dynamically configurable.

2018-11-16 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7650:
-

 Summary: make "auto.create.topics.enable"  dynamically 
configurable. 
 Key: KAFKA-7650
 URL: https://issues.apache.org/jira/browse/KAFKA-7650
 Project: Kafka
  Issue Type: Improvement
Reporter: xiongqi wu
Assignee: xiongqi wu


There are several use cases that we want to make "auto.create.topics.enable" 
can be dynamically configured. 

For example:
1) wild card consumer can recreate deleted topics 
2) We also see misconfigured consumer that consumes from wrong clusters ends up 
with creating a lot of zombie topics in target cluster. 

In such cases, we may want to temporarily disable  "auto.create.topics.enable", 
and re-enable topic creation later after problem is solved without restarting 
brokers. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8249) partition reassignment may never finish if topic deletion completes first

2019-04-17 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-8249:
-

 Summary: partition reassignment may never finish if topic deletion 
completes first 
 Key: KAFKA-8249
 URL: https://issues.apache.org/jira/browse/KAFKA-8249
 Project: Kafka
  Issue Type: Bug
Reporter: xiongqi wu
Assignee: xiongqi wu


kafka allows topic deletion to complete successfully when there are pending 
partition reassignments of the same topics. (if topic deletion request comes 
after partition reassignment). 

This leads several issues: 1) pending partition reassignments of deleted topic 
never complete because the topic is deleted. 2) onPartitionReassignment -> 
updateAssignedReplicasForPartition will throw out IllegalStateException for 
non-existing node. This in turns causes controller not to resume topic deletion 
for online broker and also fail to register broker notification handler (etc.) 
during onBrokerStartup. 

To fix, we need to clean up pending partition reassignment during topic 
deletion.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8527) add dynamic maintenance broker config

2019-06-11 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-8527:
-

 Summary: add dynamic maintenance broker config
 Key: KAFKA-8527
 URL: https://issues.apache.org/jira/browse/KAFKA-8527
 Project: Kafka
  Issue Type: Improvement
Reporter: xiongqi wu
Assignee: xiongqi wu


Before we remove a broker for maintenance, we want to remove all partitions out 
of the broker first to avoid introducing new Under Replicated Partitions (URPs) 
. That is because shutting down (or killing) a broker that still hosts live 
partitions will lead to temporarily reduced replicas of those partitions. 
Moving partitions out of a broker can be done via partition reassignment.  
However, during the partition reassignment process, new topics can be created 
by Kafka and thereby new partitions can be added to the broker that is pending 
for removal. As a result, the removal process will need to recursively moving 
new topic partitions out of the maintenance broker. In a production environment 
in which topic creation is frequent and URP causing by broker removal cannot be 
tolerated, the removal process can take multiple iterations to complete the 
partition reassignment.  We want to provide a mechanism to mask a broker as 
maintenance broker (Via Cluster Level Dynamic configuration). One action Kafka 
can take for the maintenance broker is not to assign new topic partitions to 
it, and thereby facilitate the broker removal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-10806) throwable from user callback on completeFutureAndFireCallbacks can lead to unhandled exceptions

2020-12-03 Thread xiongqi wu (Jira)
xiongqi wu created KAFKA-10806:
--

 Summary: throwable from user callback on 
completeFutureAndFireCallbacks can lead to unhandled exceptions
 Key: KAFKA-10806
 URL: https://issues.apache.org/jira/browse/KAFKA-10806
 Project: Kafka
  Issue Type: Bug
Reporter: xiongqi wu


 When kafka producer tries to complete/abort a batch,  producer invokes user 
callback. However, "completeFutureAndFireCallbacks" only captures exceptions 
from user callback not all throwables.  An uncaught throwable can prevent the 
batch from being freed.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)