Re?? [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-11-20 Thread ????????
Hi Guozhang,
I do agree to make the broker simple,  but the consumer retention process 
is not so complex though(working on the same thread of log retention thread, 
just add some querying offset procedure), and is disabled by default, I will 
update the patch as soon as possible you can see it.
And for the method of all consumers agree on a specified point, this seems 
not easy to determined in the cloud environment(many consumer group have 
different consumer points), it need all the consumer to coordinate a specified 
point,  it seems the min commit offset is the safest point to deleted.  If 
implemented from the client app will also have the problems as the previous 
email mentioned.


Thanks,
David




--  --
??: "Guozhang Wang";<wangg...@gmail.com>;
: 2016??11??17??(??) 3:21
??: "dev@kafka.apache.org"<dev@kafka.apache.org>; 

????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention



David,

I think it would be better implementing such synchronization (i.e. making
sure all consumers has done fetching to that point, and no one will ever
want to go back and re-consume) on the admin side, not on the broker side,
since 1) we want to keep the broker system to be simple enough, and rather
have a "layered architecture" to have such admin features on-top / by-side
of the brokers rather built inside it, and 2) for some synchronization
purposes like "making sure no on will ever want to go back and re-consume",
brokers would not have any clues and it needs to be implemented from
application to application anyways.

What do you think?

Guozhang



On Sun, Nov 13, 2016 at 6:16 AM,  <254479...@qq.com> wrote:

> Hi Becket,
> If using the trim.on.offset.commit parameter,  it will help to quickly
> trim the log, but other consumer group's consumer may find the messages are
> trimmed.
> We still need to coordinate many consumer groups to trim the log, it seems
> difficult for the single consumer to do it.
> Then it will still come to the problem: whether to implement in the
> broker side or in the admin client side.  Even implement in the broker
> side, we can still using the
> trim API to finish the log deletion for Leader or Replica segments.  And
> we can offer an option to safely delete the log(disable by default), so
> this is motivation for this KIP.
>
>
> Thanks,
> David
>
>
>
>
>
>
>
> --  --
> ??: "Becket Qin";<becket....@gmail.com>;
> : 2016??11??6??(??) 11:39
> ??: "dev"<dev@kafka.apache.org>;
>
> : Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> Hi David,
>
> I am thinking that depending on the use case, we may not need a separate
> tool to have the committed message based retention using the trim() method.
> One way to do this is to have a configuration like trim.on.offset.commit in
> the consumer so after committing the offset, the consumer will also send a
> trim request to the broker.
>
> In some cases, the application may want to trim the log in a more flexible
> way, e.g not trim on commit but every hour. In that case, it is true that
> users will need to trim the log with a separate admin client. However that
> logic could be a long running stand-alone service independent of Kafka or
> the application. It may have its own configurations as we discussed in this
> KIP so the applications in that case would just talk to that service to
> trim the log instead of taking to Kafka.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Sun, Nov 6, 2016 at 6:10 AM,  <254479...@qq.com> wrote:
>
> > Hi Becket,
> > The most important benefit of method (2) is we can safely delete the
> > log segments, becasue all the deleted log segments are consumed.
> > If the messages  are very important, in this case we need to safely
> delete
> > the log segments instead of forcing delete it after the retention time.
> > Kafka itself can insure all the deleted logs are consumed to improve
> > End-to-End reliability.  And this feature by default is disabled, so will
> > stay simple for people not use it.
> > Actually users can build a tool using the trimRequest to do this
> > work(method 1), but users must start this tool with kafka all the time,
> > this may not always holds.
> >
> >
> > Thanks,
> > David
> >
> >
> >
> >
> >
> >
> >
> >
> > --  --
> > ??: "Becket Qin";<becket@gmail.com>;
> > : 2016??11??1??(??) 3:57
&g

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-11-16 Thread Guozhang Wang
David,

I think it would be better implementing such synchronization (i.e. making
sure all consumers has done fetching to that point, and no one will ever
want to go back and re-consume) on the admin side, not on the broker side,
since 1) we want to keep the broker system to be simple enough, and rather
have a "layered architecture" to have such admin features on-top / by-side
of the brokers rather built inside it, and 2) for some synchronization
purposes like "making sure no on will ever want to go back and re-consume",
brokers would not have any clues and it needs to be implemented from
application to application anyways.

What do you think?

Guozhang



On Sun, Nov 13, 2016 at 6:16 AM, 东方甲乙 <254479...@qq.com> wrote:

> Hi Becket,
> If using the trim.on.offset.commit parameter,  it will help to quickly
> trim the log, but other consumer group's consumer may find the messages are
> trimmed.
> We still need to coordinate many consumer groups to trim the log, it seems
> difficult for the single consumer to do it.
> Then it will still come to the problem: whether to implement in the
> broker side or in the admin client side.  Even implement in the broker
> side, we can still using the
> trim API to finish the log deletion for Leader or Replica segments.  And
> we can offer an option to safely delete the log(disable by default), so
> this is motivation for this KIP.
>
>
> Thanks,
> David
>
>
>
>
>
>
>
> -- 原始邮件 --
> 发件人: "Becket Qin";<becket....@gmail.com>;
> 发送时间: 2016年11月6日(星期天) 晚上11:39
> 收件人: "dev"<dev@kafka.apache.org>;
>
> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> Hi David,
>
> I am thinking that depending on the use case, we may not need a separate
> tool to have the committed message based retention using the trim() method.
> One way to do this is to have a configuration like trim.on.offset.commit in
> the consumer so after committing the offset, the consumer will also send a
> trim request to the broker.
>
> In some cases, the application may want to trim the log in a more flexible
> way, e.g not trim on commit but every hour. In that case, it is true that
> users will need to trim the log with a separate admin client. However that
> logic could be a long running stand-alone service independent of Kafka or
> the application. It may have its own configurations as we discussed in this
> KIP so the applications in that case would just talk to that service to
> trim the log instead of taking to Kafka.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Sun, Nov 6, 2016 at 6:10 AM, 东方甲乙 <254479...@qq.com> wrote:
>
> > Hi Becket,
> > The most important benefit of method (2) is we can safely delete the
> > log segments, becasue all the deleted log segments are consumed.
> > If the messages  are very important, in this case we need to safely
> delete
> > the log segments instead of forcing delete it after the retention time.
> > Kafka itself can insure all the deleted logs are consumed to improve
> > End-to-End reliability.  And this feature by default is disabled, so will
> > stay simple for people not use it.
> > Actually users can build a tool using the trimRequest to do this
> > work(method 1), but users must start this tool with kafka all the time,
> > this may not always holds.
> >
> >
> > Thanks,
> > David
> >
> >
> >
> >
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: "Becket Qin";<becket@gmail.com>;
> > 发送时间: 2016年11月1日(星期二) 凌晨3:57
> > 收件人: "dev"<dev@kafka.apache.org>;
> >
> > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log
> retention
> >
> >
> >
> > Hi David,
> >
> > I think the trim() API is generally useful for the consume based
> retention
> > as well as other use cases. So we probably should have (1).
> >
> > For (2), it is more of an optimization by doing a favor for the users.
> This
> > could be implemented on top of (1) if we want to. So maybe we can
> implement
> > (1) first and let the applications do the trim() by themselves at this
> > point. This will put more burden on the application side but is not that
> > bad if there is only one downstream consumer group. In the future if we
> > find more use cases where multiple down stream consumer groups need to
> > coordinate among themselves and a broker side help would make things
> > simpler, we can add (2) then.
> >
> > Regarding the relation between this KIP and

Re?? [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-11-13 Thread ????????
Hi Becket,
If using the trim.on.offset.commit parameter,  it will help to quickly trim 
the log, but other consumer group's consumer may find the messages are trimmed.
We still need to coordinate many consumer groups to trim the log, it seems 
difficult for the single consumer to do it.
Then it will still come to the problem: whether to implement in the broker 
side or in the admin client side.  Even implement in the broker side, we can 
still using the 
trim API to finish the log deletion for Leader or Replica segments.  And we can 
offer an option to safely delete the log(disable by default), so this is 
motivation for this KIP.


Thanks,
David







--  --
??: "Becket Qin";<becket@gmail.com>;
: 2016??11??6??(??) 11:39
??: "dev"<dev@kafka.apache.org>; 

????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention



Hi David,

I am thinking that depending on the use case, we may not need a separate
tool to have the committed message based retention using the trim() method.
One way to do this is to have a configuration like trim.on.offset.commit in
the consumer so after committing the offset, the consumer will also send a
trim request to the broker.

In some cases, the application may want to trim the log in a more flexible
way, e.g not trim on commit but every hour. In that case, it is true that
users will need to trim the log with a separate admin client. However that
logic could be a long running stand-alone service independent of Kafka or
the application. It may have its own configurations as we discussed in this
KIP so the applications in that case would just talk to that service to
trim the log instead of taking to Kafka.

Thanks,

Jiangjie (Becket) Qin


On Sun, Nov 6, 2016 at 6:10 AM,  <254479...@qq.com> wrote:

> Hi Becket,
> The most important benefit of method (2) is we can safely delete the
> log segments, becasue all the deleted log segments are consumed.
> If the messages  are very important, in this case we need to safely delete
> the log segments instead of forcing delete it after the retention time.
> Kafka itself can insure all the deleted logs are consumed to improve
> End-to-End reliability.  And this feature by default is disabled, so will
> stay simple for people not use it.
> Actually users can build a tool using the trimRequest to do this
> work(method 1), but users must start this tool with kafka all the time,
> this may not always holds.
>
>
> Thanks,
> David
>
>
>
>
>
>
>
>
> --  --
> ??: "Becket Qin";<becket....@gmail.com>;
> ????: 2016??11??1??(??) 3:57
> ??: "dev"<dev@kafka.apache.org>;
>
> : Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> Hi David,
>
> I think the trim() API is generally useful for the consume based retention
> as well as other use cases. So we probably should have (1).
>
> For (2), it is more of an optimization by doing a favor for the users. This
> could be implemented on top of (1) if we want to. So maybe we can implement
> (1) first and let the applications do the trim() by themselves at this
> point. This will put more burden on the application side but is not that
> bad if there is only one downstream consumer group. In the future if we
> find more use cases where multiple down stream consumer groups need to
> coordinate among themselves and a broker side help would make things
> simpler, we can add (2) then.
>
> Regarding the relation between this KIP and KIP-47. At a high level, they
> are very similar, i.e. trim() by timestamp vs. trim() by offsets. It would
> be worth thinking about them together. After KIP-79, we can search messages
> by timestamp, this essentially translates the timestamp to offsets. So
> KIP-47 can also be built on top of the trim() by offsets interface after
> translating the timestamp to offsets. Jun has suggested an implementation
> in KIP-47 discussion thread which introduces a new TrimRequest. Would you
> take a look and see if that could be used for KIP-68 as well?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Sun, Oct 30, 2016 at 2:24 AM,  <254479...@qq.com> wrote:
>
> > Hi All,
> >
> >
> > As per our discussion, there are two ways to clean the consumed log:
> >
> >
> > 1) Use an Admin Tool to find the min commit offset for some topics of the
> > specified set of consumer groups, then send the trim API to all the
> > replicas of the brokers,
> > then the brokers will start to trim the log segments of these topics.
> >
> >
> > The benefit of this method is to keep the

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-11-06 Thread Becket Qin
Hi David,

I am thinking that depending on the use case, we may not need a separate
tool to have the committed message based retention using the trim() method.
One way to do this is to have a configuration like trim.on.offset.commit in
the consumer so after committing the offset, the consumer will also send a
trim request to the broker.

In some cases, the application may want to trim the log in a more flexible
way, e.g not trim on commit but every hour. In that case, it is true that
users will need to trim the log with a separate admin client. However that
logic could be a long running stand-alone service independent of Kafka or
the application. It may have its own configurations as we discussed in this
KIP so the applications in that case would just talk to that service to
trim the log instead of taking to Kafka.

Thanks,

Jiangjie (Becket) Qin


On Sun, Nov 6, 2016 at 6:10 AM, 东方甲乙 <254479...@qq.com> wrote:

> Hi Becket,
> The most important benefit of method (2) is we can safely delete the
> log segments, becasue all the deleted log segments are consumed.
> If the messages  are very important, in this case we need to safely delete
> the log segments instead of forcing delete it after the retention time.
> Kafka itself can insure all the deleted logs are consumed to improve
> End-to-End reliability.  And this feature by default is disabled, so will
> stay simple for people not use it.
> Actually users can build a tool using the trimRequest to do this
> work(method 1), but users must start this tool with kafka all the time,
> this may not always holds.
>
>
> Thanks,
> David
>
>
>
>
>
>
>
>
> -- 原始邮件 --
> 发件人: "Becket Qin";<becket....@gmail.com>;
> 发送时间: 2016年11月1日(星期二) 凌晨3:57
> 收件人: "dev"<dev@kafka.apache.org>;
>
> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> Hi David,
>
> I think the trim() API is generally useful for the consume based retention
> as well as other use cases. So we probably should have (1).
>
> For (2), it is more of an optimization by doing a favor for the users. This
> could be implemented on top of (1) if we want to. So maybe we can implement
> (1) first and let the applications do the trim() by themselves at this
> point. This will put more burden on the application side but is not that
> bad if there is only one downstream consumer group. In the future if we
> find more use cases where multiple down stream consumer groups need to
> coordinate among themselves and a broker side help would make things
> simpler, we can add (2) then.
>
> Regarding the relation between this KIP and KIP-47. At a high level, they
> are very similar, i.e. trim() by timestamp vs. trim() by offsets. It would
> be worth thinking about them together. After KIP-79, we can search messages
> by timestamp, this essentially translates the timestamp to offsets. So
> KIP-47 can also be built on top of the trim() by offsets interface after
> translating the timestamp to offsets. Jun has suggested an implementation
> in KIP-47 discussion thread which introduces a new TrimRequest. Would you
> take a look and see if that could be used for KIP-68 as well?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Sun, Oct 30, 2016 at 2:24 AM, 东方甲乙 <254479...@qq.com> wrote:
>
> > Hi All,
> >
> >
> > As per our discussion, there are two ways to clean the consumed log:
> >
> >
> > 1) Use an Admin Tool to find the min commit offset for some topics of the
> > specified set of consumer groups, then send the trim API to all the
> > replicas of the brokers,
> > then the brokers will start to trim the log segments of these topics.
> >
> >
> > The benefit of this method is to keep the broker simple and more flexible
> > for the users, but it is more complicated for the users to clean all the
> > messages which are consumed.
> >
> >
> > 2) Broker will periodically do the consumed log retention as the KIP
> > mentioned. This method is simple for the users and it can automatically
> > clean the consumed log, but it will add more query work to the brokers.
> >
> >
> > Which method is better?
> >
> >
> > Thanks,
> > David
> >
> >
> >
> >
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: "Mayuresh Gharat";<gharatmayures...@gmail.com>;
> > 发送时间: 2016年10月29日(星期六) 凌晨1:43
> > 收件人: "dev"<dev@kafka.apache.org>;
> >
> > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log
> retention
> >
> >
> >
> > I do agree wit

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-31 Thread Becket Qin
Hi David,

I think the trim() API is generally useful for the consume based retention
as well as other use cases. So we probably should have (1).

For (2), it is more of an optimization by doing a favor for the users. This
could be implemented on top of (1) if we want to. So maybe we can implement
(1) first and let the applications do the trim() by themselves at this
point. This will put more burden on the application side but is not that
bad if there is only one downstream consumer group. In the future if we
find more use cases where multiple down stream consumer groups need to
coordinate among themselves and a broker side help would make things
simpler, we can add (2) then.

Regarding the relation between this KIP and KIP-47. At a high level, they
are very similar, i.e. trim() by timestamp vs. trim() by offsets. It would
be worth thinking about them together. After KIP-79, we can search messages
by timestamp, this essentially translates the timestamp to offsets. So
KIP-47 can also be built on top of the trim() by offsets interface after
translating the timestamp to offsets. Jun has suggested an implementation
in KIP-47 discussion thread which introduces a new TrimRequest. Would you
take a look and see if that could be used for KIP-68 as well?

Thanks,

Jiangjie (Becket) Qin



On Sun, Oct 30, 2016 at 2:24 AM, 东方甲乙 <254479...@qq.com> wrote:

> Hi All,
>
>
> As per our discussion, there are two ways to clean the consumed log:
>
>
> 1) Use an Admin Tool to find the min commit offset for some topics of the
> specified set of consumer groups, then send the trim API to all the
> replicas of the brokers,
> then the brokers will start to trim the log segments of these topics.
>
>
> The benefit of this method is to keep the broker simple and more flexible
> for the users, but it is more complicated for the users to clean all the
> messages which are consumed.
>
>
> 2) Broker will periodically do the consumed log retention as the KIP
> mentioned. This method is simple for the users and it can automatically
> clean the consumed log, but it will add more query work to the brokers.
>
>
> Which method is better?
>
>
> Thanks,
> David
>
>
>
>
>
>
>
>
> -- 原始邮件 --
> 发件人: "Mayuresh Gharat";<gharatmayures...@gmail.com>;
> 发送时间: 2016年10月29日(星期六) 凌晨1:43
> 收件人: "dev"<dev@kafka.apache.org>;
>
> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> I do agree with Guozhang on having applications request an external
> service(admin) that talks to kafka, for trimming, in which case this
> external service(admin) can check if its OK to send the trim request to
> kafka brokers based on a certain conditions.
> On broker side we can have authorization by way of ACLs may be, saying that
> only this external admin service is allowed to call trim(). In this way we
> can actually move the main decision making process out of core.
>
> Thanks,
>
> Mayuresh
>
> On Fri, Oct 28, 2016 at 10:33 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Yes trim() should be an admin API and, if security is concerned, it
> should
> > be under admin authorization as well.
> >
> > For applications that needs this feature, it then boils down to the
> problem
> > that they should request the authorization token from who operates Kafka
> > before starting their app to use in their own client, which I think is a
> > feasible requirement.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Oct 28, 2016 at 9:42 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com
> > > wrote:
> >
> > > Hi Guozhang,
> > >
> > > I agree that pushing out the complexity of coordination to the client
> > > application makes it more simple for the broker in the sense that it
> does
> > > not have to be the decision maker regarding when to trim and till what
> > > offset. An I agree that if we go in this direction, providing an offset
> > > parameter makes sense.
> > >
> > >
> > > But since the main motivation for this seems like saving or reclaiming
> > the
> > > disk space on broker side, I am not 100% sure how good it is to rely on
> > the
> > > client application to be a good citizen and call the trim API.
> > > Also I see the trim() api as more of an admin api rather than client
> API.
> > >
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Fri, Oct 28, 2016 at 7:12 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Here are my thoughts:
> > > 

Re?? [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-30 Thread ????????
Hi All, 


As per our discussion, there are two ways to clean the consumed log:


1) Use an Admin Tool to find the min commit offset for some topics of the 
specified set of consumer groups, then send the trim API to all the replicas of 
the brokers,
then the brokers will start to trim the log segments of these topics.


The benefit of this method is to keep the broker simple and more flexible for 
the users, but it is more complicated for the users to clean all the messages 
which are consumed.


2) Broker will periodically do the consumed log retention as the KIP mentioned. 
This method is simple for the users and it can automatically clean the consumed 
log, but it will add more query work to the brokers.


Which method is better?


Thanks,
David








--  --
??: "Mayuresh Gharat";<gharatmayures...@gmail.com>;
: 2016??10??29??(??) 1:43
??: "dev"<dev@kafka.apache.org>; 

????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention



I do agree with Guozhang on having applications request an external
service(admin) that talks to kafka, for trimming, in which case this
external service(admin) can check if its OK to send the trim request to
kafka brokers based on a certain conditions.
On broker side we can have authorization by way of ACLs may be, saying that
only this external admin service is allowed to call trim(). In this way we
can actually move the main decision making process out of core.

Thanks,

Mayuresh

On Fri, Oct 28, 2016 at 10:33 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Yes trim() should be an admin API and, if security is concerned, it should
> be under admin authorization as well.
>
> For applications that needs this feature, it then boils down to the problem
> that they should request the authorization token from who operates Kafka
> before starting their app to use in their own client, which I think is a
> feasible requirement.
>
>
> Guozhang
>
>
> On Fri, Oct 28, 2016 at 9:42 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Hi Guozhang,
> >
> > I agree that pushing out the complexity of coordination to the client
> > application makes it more simple for the broker in the sense that it does
> > not have to be the decision maker regarding when to trim and till what
> > offset. An I agree that if we go in this direction, providing an offset
> > parameter makes sense.
> >
> >
> > But since the main motivation for this seems like saving or reclaiming
> the
> > disk space on broker side, I am not 100% sure how good it is to rely on
> the
> > client application to be a good citizen and call the trim API.
> > Also I see the trim() api as more of an admin api rather than client API.
> >
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Fri, Oct 28, 2016 at 7:12 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Here are my thoughts:
> > >
> > > If there are indeed multiple consumer groups on the same topic that
> needs
> > > to coordinate, it is equally complex if the coordination is on the
> broker
> > > or among the applications themselves: for the latter case, you would
> > > imagine some coordination services used (like ZK) to register groups
> for
> > > that topic and let these groups agree upon the minimum offset that is
> > safe
> > > to trim for all of them; for the former case, we just need to move this
> > > coordination service into the broker side, which to me is not a good
> > design
> > > under the principle of making broker simple.
> > >
> > > And as we discussed, there are scenarios where the offset to trim is
> not
> > > necessarily dependent on the committed offsets, even if the topic is
> only
> > > consumed by a single consumer group and we do not need any
> coordination.
> > So
> > > I think it is appropriate to require an "offset parameter" in the trim
> > API.
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Fri, Oct 28, 2016 at 1:27 AM, Becket Qin <becket@gmail.com>
> > wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > I think the trim() interface is generally useful. What I was
> wondering
> > is
> > > > the following:
> > > > if the user has multiple applications to coordinate, it seems simpler
> > for
> > > > the broker to coordinate instead of asking the applications to
> > coordinate
> > > > among themselves. If we let the broker do the coordination

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-28 Thread Mayuresh Gharat
gt; > this case I think it makes sense to allow such
> consumer-drive
> > > log
> > > > > > > > retention
> > > > > > > > > features.
> > > > > > > > >
> > > > > > > > > 2. In this case, my question is then whether this
> bookkeeping
> > > of
> > > > > > > > > min-committed-offsets should be done at the brokers side or
> > it
> > > > > should
> > > > > > > be
> > > > > > > > on
> > > > > > > > > the app side. My gut feeling is that it could be better
> > > bookkept
> > > > on
> > > > > > the
> > > > > > > > app
> > > > > > > > > (i.e. client) side which has the full information of the
> > > > > "registered
> > > > > > > > > consumer groups" for certain topics, and then knows the
> > > > > > > > > min-committed-offsets. And a slightly-modified KIP-47
> > mentioned
> > > > by
> > > > > > Dong
> > > > > > > > > could a better fit, where a) app side bookkeep the
> > > > consumer-driven
> > > > > > min
> > > > > > > > > offset based on their committed offsets, by either talking
> to
> > > the
> > > > > > > > consumer
> > > > > > > > > clients directly or query broker for the committed offsets
> of
> > > > those
> > > > > > > > > registered consumer groups, and then b) write
> > > > > > > > > *log.retention.min.offset* periodically
> > > > > > > > > to broker to let it delete old segments before that offset
> > > (NOTE
> > > > > that
> > > > > > > the
> > > > > > > > > semantics is exactly the same as to KIP-47, while the only
> > > > > difference
> > > > > > > is
> > > > > > > > > that we use offset instead of timestamp to indicate, which
> > can
> > > be
> > > > > > honor
> > > > > > > > by
> > > > > > > > > the same implementation of KIP-47 on broker side).
> > > > > > > > >
> > > > > > > > > My arguments for letting the app side to bookkeep such
> > > > min-offsets
> > > > > > and
> > > > > > > > only
> > > > > > > > > let brokers to take requests to delete segments accordingly
> > are
> > > > 1)
> > > > > > > > keeping
> > > > > > > > > the broker simple without any querying each other about
> such
> > > > > offsets
> > > > > > > and
> > > > > > > > > does the min() calculation, rather only keeping / deleting
> > > > messages
> > > > > > > from
> > > > > > > > > client admin requests, and 2) allowing more generalized
> > > > > client-driven
> > > > > > > log
> > > > > > > > > retention policies with KIP-47 (i.e. broker is brainless
> and
> > > only
> > > > > > take
> > > > > > > > > requests while client-app can apply any customized logic to
> > > > > determine
> > > > > > > the
> > > > > > > > > config values of *og.retention.min.offset or
> > > > > > > > **og.retention.min.timestamp*
> > > > > > > > > that
> > > > > > > > > they send to the brokers).
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin <
> > > > becket@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi David,
> > > > > > > > > >
> > > > > > > > > > > 1. What scenario is used to this configuration?
> > > > > > > > > >
> > &g

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-28 Thread Guozhang Wang
tamp to indicate, which
> can
> > be
> > > > > honor
> > > > > > > by
> > > > > > > > the same implementation of KIP-47 on broker side).
> > > > > > > >
> > > > > > > > My arguments for letting the app side to bookkeep such
> > > min-offsets
> > > > > and
> > > > > > > only
> > > > > > > > let brokers to take requests to delete segments accordingly
> are
> > > 1)
> > > > > > > keeping
> > > > > > > > the broker simple without any querying each other about such
> > > > offsets
> > > > > > and
> > > > > > > > does the min() calculation, rather only keeping / deleting
> > > messages
> > > > > > from
> > > > > > > > client admin requests, and 2) allowing more generalized
> > > > client-driven
> > > > > > log
> > > > > > > > retention policies with KIP-47 (i.e. broker is brainless and
> > only
> > > > > take
> > > > > > > > requests while client-app can apply any customized logic to
> > > > determine
> > > > > > the
> > > > > > > > config values of *og.retention.min.offset or
> > > > > > > **og.retention.min.timestamp*
> > > > > > > > that
> > > > > > > > they send to the brokers).
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin <
> > > becket@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi David,
> > > > > > > > >
> > > > > > > > > > 1. What scenario is used to this configuration?
> > > > > > > > >
> > > > > > > > > One scenario is stream processing pipeline. In a stream
> > > > processing
> > > > > > DAG,
> > > > > > > > > there will be a bunch of intermediate result, we only care
> > > about
> > > > > the
> > > > > > > > > consumer group that is in the downstream of the DAG, but
> not
> > > > other
> > > > > > > > groups.
> > > > > > > > > Ideally we want to delete the log of the intermediate
> topics
> > > > right
> > > > > > > after
> > > > > > > > > all the downstream processing jobs has successfully
> processed
> > > the
> > > > > > > > messages.
> > > > > > > > > In that case, we only care about the downstream processing
> > > jobs,
> > > > > but
> > > > > > > not
> > > > > > > > > other groups. That means if a down stream job did not
> commit
> > > > offset
> > > > > > for
> > > > > > > > > some reason, we want to wait for that job. Without the
> > > predefined
> > > > > > > > > interested group, it is hard to achieve this.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > 2. Yes, the configuration should be at topic level and set
> > > > > > dynamically.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > >
> > > > > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com>
> > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Mayuresh,
> > > > > > > > > > Thanks for the reply:
> > > > > > > > > > 1.  In the log retention check schedule, the broker first
> > > find
> > > > > the
> > > > > > > all
> > > > > > > > > the
> > > > > > > > > > consumed group which are consuming this topic, and query
> > the

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-28 Thread Mayuresh Gharat
n and lots of
> > > temporary
> > > > > > > consumers (for debugging, trouble shooting, prototype
> > development,
> > > > etc)
> > > > > > can
> > > > > > > come and go dynamically, in which case it is hard to track all
> of
> > > > such
> > > > > > > consumer and maintain the minimum committed offsets; on the
> other
> > > > hand,
> > > > > > > there are another category of topics (think: stream-app owned
> > > > > > intermediate
> > > > > > > topics like "pricing-enriched-bid-activity", as Becket
> mentioned
> > > > > above)
> > > > > > > which are particularly own but only one or a few apps, and
> hence
> > > the
> > > > > > > consumer groups for those topics are pre-defined and roughly
> > > static.
> > > > In
> > > > > > > this case I think it makes sense to allow such consumer-drive
> log
> > > > > > retention
> > > > > > > features.
> > > > > > >
> > > > > > > 2. In this case, my question is then whether this bookkeeping
> of
> > > > > > > min-committed-offsets should be done at the brokers side or it
> > > should
> > > > > be
> > > > > > on
> > > > > > > the app side. My gut feeling is that it could be better
> bookkept
> > on
> > > > the
> > > > > > app
> > > > > > > (i.e. client) side which has the full information of the
> > > "registered
> > > > > > > consumer groups" for certain topics, and then knows the
> > > > > > > min-committed-offsets. And a slightly-modified KIP-47 mentioned
> > by
> > > > Dong
> > > > > > > could a better fit, where a) app side bookkeep the
> > consumer-driven
> > > > min
> > > > > > > offset based on their committed offsets, by either talking to
> the
> > > > > > consumer
> > > > > > > clients directly or query broker for the committed offsets of
> > those
> > > > > > > registered consumer groups, and then b) write
> > > > > > > *log.retention.min.offset* periodically
> > > > > > > to broker to let it delete old segments before that offset
> (NOTE
> > > that
> > > > > the
> > > > > > > semantics is exactly the same as to KIP-47, while the only
> > > difference
> > > > > is
> > > > > > > that we use offset instead of timestamp to indicate, which can
> be
> > > > honor
> > > > > > by
> > > > > > > the same implementation of KIP-47 on broker side).
> > > > > > >
> > > > > > > My arguments for letting the app side to bookkeep such
> > min-offsets
> > > > and
> > > > > > only
> > > > > > > let brokers to take requests to delete segments accordingly are
> > 1)
> > > > > > keeping
> > > > > > > the broker simple without any querying each other about such
> > > offsets
> > > > > and
> > > > > > > does the min() calculation, rather only keeping / deleting
> > messages
> > > > > from
> > > > > > > client admin requests, and 2) allowing more generalized
> > > client-driven
> > > > > log
> > > > > > > retention policies with KIP-47 (i.e. broker is brainless and
> only
> > > > take
> > > > > > > requests while client-app can apply any customized logic to
> > > determine
> > > > > the
> > > > > > > config values of *og.retention.min.offset or
> > > > > > **og.retention.min.timestamp*
> > > > > > > that
> > > > > > > they send to the brokers).
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin <
> > becket@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi David,
> > > > > > > >
>

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-28 Thread Guozhang Wang
r certain topics, and then knows the
> > > > > > min-committed-offsets. And a slightly-modified KIP-47 mentioned
> by
> > > Dong
> > > > > > could a better fit, where a) app side bookkeep the
> consumer-driven
> > > min
> > > > > > offset based on their committed offsets, by either talking to the
> > > > > consumer
> > > > > > clients directly or query broker for the committed offsets of
> those
> > > > > > registered consumer groups, and then b) write
> > > > > > *log.retention.min.offset* periodically
> > > > > > to broker to let it delete old segments before that offset (NOTE
> > that
> > > > the
> > > > > > semantics is exactly the same as to KIP-47, while the only
> > difference
> > > > is
> > > > > > that we use offset instead of timestamp to indicate, which can be
> > > honor
> > > > > by
> > > > > > the same implementation of KIP-47 on broker side).
> > > > > >
> > > > > > My arguments for letting the app side to bookkeep such
> min-offsets
> > > and
> > > > > only
> > > > > > let brokers to take requests to delete segments accordingly are
> 1)
> > > > > keeping
> > > > > > the broker simple without any querying each other about such
> > offsets
> > > > and
> > > > > > does the min() calculation, rather only keeping / deleting
> messages
> > > > from
> > > > > > client admin requests, and 2) allowing more generalized
> > client-driven
> > > > log
> > > > > > retention policies with KIP-47 (i.e. broker is brainless and only
> > > take
> > > > > > requests while client-app can apply any customized logic to
> > determine
> > > > the
> > > > > > config values of *og.retention.min.offset or
> > > > > **og.retention.min.timestamp*
> > > > > > that
> > > > > > they send to the brokers).
> > > > > >
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin <
> becket@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi David,
> > > > > > >
> > > > > > > > 1. What scenario is used to this configuration?
> > > > > > >
> > > > > > > One scenario is stream processing pipeline. In a stream
> > processing
> > > > DAG,
> > > > > > > there will be a bunch of intermediate result, we only care
> about
> > > the
> > > > > > > consumer group that is in the downstream of the DAG, but not
> > other
> > > > > > groups.
> > > > > > > Ideally we want to delete the log of the intermediate topics
> > right
> > > > > after
> > > > > > > all the downstream processing jobs has successfully processed
> the
> > > > > > messages.
> > > > > > > In that case, we only care about the downstream processing
> jobs,
> > > but
> > > > > not
> > > > > > > other groups. That means if a down stream job did not commit
> > offset
> > > > for
> > > > > > > some reason, we want to wait for that job. Without the
> predefined
> > > > > > > interested group, it is hard to achieve this.
> > > > > > >
> > > > > > >
> > > > > > > 2. Yes, the configuration should be at topic level and set
> > > > dynamically.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com>
> wrote:
> > > > > > >
> > > > > > > > Hi Mayuresh,
> > > > > > > > Thanks for the reply:
> > > > > > > > 1.  In the log retention check schedule, the broker first
> find
> > > the
> > > > > all
> > > > > > > the
> > > > > > > > consumed group which ar

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-28 Thread Becket Qin
; > config values of *og.retention.min.offset or
> > > > **og.retention.min.timestamp*
> > > > > that
> > > > > they send to the brokers).
> > > > >
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin <becket@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi David,
> > > > > >
> > > > > > > 1. What scenario is used to this configuration?
> > > > > >
> > > > > > One scenario is stream processing pipeline. In a stream
> processing
> > > DAG,
> > > > > > there will be a bunch of intermediate result, we only care about
> > the
> > > > > > consumer group that is in the downstream of the DAG, but not
> other
> > > > > groups.
> > > > > > Ideally we want to delete the log of the intermediate topics
> right
> > > > after
> > > > > > all the downstream processing jobs has successfully processed the
> > > > > messages.
> > > > > > In that case, we only care about the downstream processing jobs,
> > but
> > > > not
> > > > > > other groups. That means if a down stream job did not commit
> offset
> > > for
> > > > > > some reason, we want to wait for that job. Without the predefined
> > > > > > interested group, it is hard to achieve this.
> > > > > >
> > > > > >
> > > > > > 2. Yes, the configuration should be at topic level and set
> > > dynamically.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote:
> > > > > >
> > > > > > > Hi Mayuresh,
> > > > > > > Thanks for the reply:
> > > > > > > 1.  In the log retention check schedule, the broker first find
> > the
> > > > all
> > > > > > the
> > > > > > > consumed group which are consuming this topic, and query the
> > commit
> > > > > > offset
> > > > > > > of this consumed group for the topic
> > > > > > > using the OffsetFetch API. And the min commit offset is the
> > minimal
> > > > > > commit
> > > > > > > offset between these commit offsets.
> > > > > > >
> > > > > > >
> > > > > > > 2.  If the console consumer reading and commit, its commit
> offset
> > > > will
> > > > > be
> > > > > > > used to calculate the min commit offset for this topic.
> > > > > > > We can avoid the random consumer using the method Becket
> > suggested.
> > > > > > >
> > > > > > >
> > > > > > > 3. It will not delete the log immediately, the log will stay
> some
> > > > time
> > > > > (
> > > > > > > retention.commitoffset.ms), and after that we only delete
> > > > > > > the log segments whose offsets are less than the min commit
> > offset.
> > > > So
> > > > > > > the user can rewind its offset in the log.retention.ms.
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > David
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > -- 原始邮件 --
> > > > > > > 发件人: "Mayuresh Gharat";<gharatmayures...@gmail.com>;
> > > > > > > 发送时间: 2016年10月19日(星期三) 上午10:25
> > > > > > > 收件人: "dev"<dev@kafka.apache.org>;
> > > > > > >
> > > > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before
> log
> > > > > > retention
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Hi David,
> > > > > > >
> > > > > > > Thanks for the KIP.
> > > > > > 

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-26 Thread Guozhang Wang
ted consumer groups, or it does not have such a set of
> > > > consumer
> > > > > > > groups.
> > > > > > >
> > > > > > > For example, for topic T, assume we know that there will be two
> > > > > > downstream
> > > > > > > consumer groups CG1 and CG2 consuming data from topic T. Will
> we
> > > add
> > > > a
> > > > > > > topic configurations such as
> > > > > > > "log.retention.commitoffset.interested.groups=CG1,CG2" to
> topic
> > T
> > > so
> > > > > > that
> > > > > > > the brokers only care about CG1 and CG2. The committed offsets
> of
> > > > other
> > > > > > > groups are not interested and won't have any impact on the
> > > committed
> > > > > > offset
> > > > > > > based log retention.
> > > > > > >
> > > > > > > It seems the current proposal does not have an "interested
> > consumer
> > > > > group
> > > > > > > set" configuration, so that means any random consumer group may
> > > > affect
> > > > > > the
> > > > > > > committed offset based log retention.
> > > > > > >
> > > > > > > I think the committed offset based log retention seems more
> > useful
> > > in
> > > > > > cases
> > > > > > > where we already know which consumer groups will be consuming
> > from
> > > > this
> > > > > > > topic, so we will only wait for those consumer groups but
> ignore
> > > the
> > > > > > > others. If a group will be consumed by many unknown or
> > > unpredictable
> > > > > > > consumer groups, it seems the existing time based log retention
> > is
> > > > much
> > > > > > > simple and clear enough. So I would argue we don't need to
> > address
> > > > the
> > > > > > case
> > > > > > > that some groups may come later in the committed offset based
> > > > > retention.
> > > > > > >
> > > > > > > That said, there may still be value to keep the data for some
> > time
> > > > even
> > > > > > > after all the interested consumer groups have consumed the
> > > messages.
> > > > > For
> > > > > > > example, in a pipelined stream processing DAG, we may want to
> > keep
> > > > the
> > > > > > data
> > > > > > > of an intermediate topic for some time in case the job fails.
> So
> > we
> > > > can
> > > > > > > resume from a previously succeeded stage instead of restart the
> > > > entire
> > > > > > > pipeline. Or we can use the intermediate topic for some
> debugging
> > > > work.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Sun, Oct 16, 2016 at 2:15 AM, 东方甲乙 <254479...@qq.com>
> wrote:
> > > > > > >
> > > > > > > > Hi Dong,
> > > > > > > > The KIP is used to solve both these 2 cases, we specify a
> > > small
> > > > > > > > consumed log retention time to deleted the consumed data and
> > > avoid
> > > > > > losing
> > > > > > > > un-consumed data.
> > > > > > > > And the specify a large force log retention time used as
> higher
> > > > bound
> > > > > > for
> > > > > > > > the data.  I will update the KIP for this info.
> > > > > > > > Another solution I think may be ok is to support an API
> to
> > > > delete
> > > > > > the
> > > > > > > > inactive group?  If the group is in inactive, but it's commit
> > > > offset
> > > > > is
> > > > > > > > also in the __commit_offsets topic and
> > > > > > > > stay in the offset cache,  we can delete it via this API.
> > > > > > > >
> > > > > > > >
>

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-25 Thread Becket Qin
> registered consumer groups, and then b) write
> > > *log.retention.min.offset* periodically
> > > to broker to let it delete old segments before that offset (NOTE that
> the
> > > semantics is exactly the same as to KIP-47, while the only difference
> is
> > > that we use offset instead of timestamp to indicate, which can be honor
> > by
> > > the same implementation of KIP-47 on broker side).
> > >
> > > My arguments for letting the app side to bookkeep such min-offsets and
> > only
> > > let brokers to take requests to delete segments accordingly are 1)
> > keeping
> > > the broker simple without any querying each other about such offsets
> and
> > > does the min() calculation, rather only keeping / deleting messages
> from
> > > client admin requests, and 2) allowing more generalized client-driven
> log
> > > retention policies with KIP-47 (i.e. broker is brainless and only take
> > > requests while client-app can apply any customized logic to determine
> the
> > > config values of *og.retention.min.offset or
> > **og.retention.min.timestamp*
> > > that
> > > they send to the brokers).
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin <becket@gmail.com>
> > wrote:
> > >
> > > > Hi David,
> > > >
> > > > > 1. What scenario is used to this configuration?
> > > >
> > > > One scenario is stream processing pipeline. In a stream processing
> DAG,
> > > > there will be a bunch of intermediate result, we only care about the
> > > > consumer group that is in the downstream of the DAG, but not other
> > > groups.
> > > > Ideally we want to delete the log of the intermediate topics right
> > after
> > > > all the downstream processing jobs has successfully processed the
> > > messages.
> > > > In that case, we only care about the downstream processing jobs, but
> > not
> > > > other groups. That means if a down stream job did not commit offset
> for
> > > > some reason, we want to wait for that job. Without the predefined
> > > > interested group, it is hard to achieve this.
> > > >
> > > >
> > > > 2. Yes, the configuration should be at topic level and set
> dynamically.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote:
> > > >
> > > > > Hi Mayuresh,
> > > > > Thanks for the reply:
> > > > > 1.  In the log retention check schedule, the broker first find the
> > all
> > > > the
> > > > > consumed group which are consuming this topic, and query the commit
> > > > offset
> > > > > of this consumed group for the topic
> > > > > using the OffsetFetch API. And the min commit offset is the minimal
> > > > commit
> > > > > offset between these commit offsets.
> > > > >
> > > > >
> > > > > 2.  If the console consumer reading and commit, its commit offset
> > will
> > > be
> > > > > used to calculate the min commit offset for this topic.
> > > > > We can avoid the random consumer using the method Becket suggested.
> > > > >
> > > > >
> > > > > 3. It will not delete the log immediately, the log will stay some
> > time
> > > (
> > > > > retention.commitoffset.ms), and after that we only delete
> > > > > the log segments whose offsets are less than the min commit offset.
> > So
> > > > > the user can rewind its offset in the log.retention.ms.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > David
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > -- 原始邮件 --
> > > > > 发件人: "Mayuresh Gharat";<gharatmayures...@gmail.com>;
> > > > > 发送时间: 2016年10月19日(星期三) 上午10:25
> > > > > 收件人: "dev"<dev@kafka.apache.org>;
> > > > >
> > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log
> > > > retention
> > > > >
> > > > >
&

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-25 Thread Joel Koshy
should be at topic level and set dynamically.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote:
> > >
> > > > Hi Mayuresh,
> > > > Thanks for the reply:
> > > > 1.  In the log retention check schedule, the broker first find the
> all
> > > the
> > > > consumed group which are consuming this topic, and query the commit
> > > offset
> > > > of this consumed group for the topic
> > > > using the OffsetFetch API. And the min commit offset is the minimal
> > > commit
> > > > offset between these commit offsets.
> > > >
> > > >
> > > > 2.  If the console consumer reading and commit, its commit offset
> will
> > be
> > > > used to calculate the min commit offset for this topic.
> > > > We can avoid the random consumer using the method Becket suggested.
> > > >
> > > >
> > > > 3. It will not delete the log immediately, the log will stay some
> time
> > (
> > > > retention.commitoffset.ms), and after that we only delete
> > > > the log segments whose offsets are less than the min commit offset.
> So
> > > > the user can rewind its offset in the log.retention.ms.
> > > >
> > > >
> > > > Thanks,
> > > > David
> > > >
> > > >
> > > >
> > > >
> > > > -- 原始邮件 --
> > > > 发件人: "Mayuresh Gharat";<gharatmayures...@gmail.com>;
> > > > 发送时间: 2016年10月19日(星期三) 上午10:25
> > > > 收件人: "dev"<dev@kafka.apache.org>;
> > > >
> > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log
> > > retention
> > > >
> > > >
> > > >
> > > > Hi David,
> > > >
> > > > Thanks for the KIP.
> > > >
> > > > I had some questions/suggestions :
> > > >
> > > > It would be great if you can explain with an example about how the
> min
> > > > offset for all the consumers will be calculated, in the KIP.
> > > > What I meant was, it would be great to understand with a pseudo
> > > > code/workflow if possible, how each broker knows all the consumers
> for
> > > the
> > > > given topic-partition and how the min is calculated.
> > > >
> > > > Also it would be good to understand what happens if we start a
> console
> > > > consumer which would actually start reading from the beginning offset
> > and
> > > > commit and crash immediately. How will the segments get deleted?
> > > >
> > > > Will it delete all the log segments if all the consumers have read
> till
> > > > latest? If Yes, would we be able to handle a scenario were we say
> that
> > > user
> > > > can rewind its offset to reprocess the data since log.retention.ms
> > might
> > > > not has reached.
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > > On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin <becket@gmail.com>
> > > wrote:
> > > >
> > > > > Hey David,
> > > > >
> > > > > Thanks for replies to the questions.
> > > > >
> > > > > I think one major thing still not clear at this point is that
> whether
> > > the
> > > > > brokers will only apply the consumed log retention to a specific
> set
> > of
> > > > > interested consumer groups, or it does not have such a set of
> > consumer
> > > > > groups.
> > > > >
> > > > > For example, for topic T, assume we know that there will be two
> > > > downstream
> > > > > consumer groups CG1 and CG2 consuming data from topic T. Will we
> add
> > a
> > > > > topic configurations such as
> > > > > "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T
> so
> > > > that
> > > > > the brokers only care about CG1 and CG2. The committed offsets of
> > other
> > > > > groups are not interested and won't have any impact on the
> committed
> > > > offset
> > > > > based log retention.
> > > > >
> > > > > It seems the current proposal does not h

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-25 Thread Jun Rao
al
> > commit
> > > offset between these commit offsets.
> > >
> > >
> > > 2.  If the console consumer reading and commit, its commit offset will
> be
> > > used to calculate the min commit offset for this topic.
> > > We can avoid the random consumer using the method Becket suggested.
> > >
> > >
> > > 3. It will not delete the log immediately, the log will stay some time
> (
> > > retention.commitoffset.ms), and after that we only delete
> > > the log segments whose offsets are less than the min commit offset.  So
> > > the user can rewind its offset in the log.retention.ms.
> > >
> > >
> > > Thanks,
> > > David
> > >
> > >
> > >
> > >
> > > -- 原始邮件 --
> > > 发件人: "Mayuresh Gharat";<gharatmayures...@gmail.com>;
> > > 发送时间: 2016年10月19日(星期三) 上午10:25
> > > 收件人: "dev"<dev@kafka.apache.org>;
> > >
> > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log
> > retention
> > >
> > >
> > >
> > > Hi David,
> > >
> > > Thanks for the KIP.
> > >
> > > I had some questions/suggestions :
> > >
> > > It would be great if you can explain with an example about how the min
> > > offset for all the consumers will be calculated, in the KIP.
> > > What I meant was, it would be great to understand with a pseudo
> > > code/workflow if possible, how each broker knows all the consumers for
> > the
> > > given topic-partition and how the min is calculated.
> > >
> > > Also it would be good to understand what happens if we start a console
> > > consumer which would actually start reading from the beginning offset
> and
> > > commit and crash immediately. How will the segments get deleted?
> > >
> > > Will it delete all the log segments if all the consumers have read till
> > > latest? If Yes, would we be able to handle a scenario were we say that
> > user
> > > can rewind its offset to reprocess the data since log.retention.ms
> might
> > > not has reached.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin <becket@gmail.com>
> > wrote:
> > >
> > > > Hey David,
> > > >
> > > > Thanks for replies to the questions.
> > > >
> > > > I think one major thing still not clear at this point is that whether
> > the
> > > > brokers will only apply the consumed log retention to a specific set
> of
> > > > interested consumer groups, or it does not have such a set of
> consumer
> > > > groups.
> > > >
> > > > For example, for topic T, assume we know that there will be two
> > > downstream
> > > > consumer groups CG1 and CG2 consuming data from topic T. Will we add
> a
> > > > topic configurations such as
> > > > "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T so
> > > that
> > > > the brokers only care about CG1 and CG2. The committed offsets of
> other
> > > > groups are not interested and won't have any impact on the committed
> > > offset
> > > > based log retention.
> > > >
> > > > It seems the current proposal does not have an "interested consumer
> > group
> > > > set" configuration, so that means any random consumer group may
> affect
> > > the
> > > > committed offset based log retention.
> > > >
> > > > I think the committed offset based log retention seems more useful in
> > > cases
> > > > where we already know which consumer groups will be consuming from
> this
> > > > topic, so we will only wait for those consumer groups but ignore the
> > > > others. If a group will be consumed by many unknown or unpredictable
> > > > consumer groups, it seems the existing time based log retention is
> much
> > > > simple and clear enough. So I would argue we don't need to address
> the
> > > case
> > > > that some groups may come later in the committed offset based
> > retention.
> > > >
> > > > That said, there may still be value to keep the data for some time
> even
> > > > after all the interested consumer groups have consumed the messages.
> > For
> > > &g

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-24 Thread Guozhang Wang
Overall I think the motivation is common and of interests to lots of users.
Would like to throw my two cents on this discussion:

1. Kafka topics can be used in different ways. For some categories of
topics (think: "pageView" event topics), it is a shared topic among
different teams / apps within the organization and lots of temporary
consumers (for debugging, trouble shooting, prototype development, etc) can
come and go dynamically, in which case it is hard to track all of such
consumer and maintain the minimum committed offsets; on the other hand,
there are another category of topics (think: stream-app owned intermediate
topics like "pricing-enriched-bid-activity", as Becket mentioned above)
which are particularly own but only one or a few apps, and hence the
consumer groups for those topics are pre-defined and roughly static. In
this case I think it makes sense to allow such consumer-drive log retention
features.

2. In this case, my question is then whether this bookkeeping of
min-committed-offsets should be done at the brokers side or it should be on
the app side. My gut feeling is that it could be better bookkept on the app
(i.e. client) side which has the full information of the "registered
consumer groups" for certain topics, and then knows the
min-committed-offsets. And a slightly-modified KIP-47 mentioned by Dong
could a better fit, where a) app side bookkeep the consumer-driven min
offset based on their committed offsets, by either talking to the consumer
clients directly or query broker for the committed offsets of those
registered consumer groups, and then b) write
*log.retention.min.offset* periodically
to broker to let it delete old segments before that offset (NOTE that the
semantics is exactly the same as to KIP-47, while the only difference is
that we use offset instead of timestamp to indicate, which can be honor by
the same implementation of KIP-47 on broker side).

My arguments for letting the app side to bookkeep such min-offsets and only
let brokers to take requests to delete segments accordingly are 1) keeping
the broker simple without any querying each other about such offsets and
does the min() calculation, rather only keeping / deleting messages from
client admin requests, and 2) allowing more generalized client-driven log
retention policies with KIP-47 (i.e. broker is brainless and only take
requests while client-app can apply any customized logic to determine the
config values of *og.retention.min.offset or **og.retention.min.timestamp* that
they send to the brokers).



Guozhang


On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin <becket@gmail.com> wrote:

> Hi David,
>
> > 1. What scenario is used to this configuration?
>
> One scenario is stream processing pipeline. In a stream processing DAG,
> there will be a bunch of intermediate result, we only care about the
> consumer group that is in the downstream of the DAG, but not other groups.
> Ideally we want to delete the log of the intermediate topics right after
> all the downstream processing jobs has successfully processed the messages.
> In that case, we only care about the downstream processing jobs, but not
> other groups. That means if a down stream job did not commit offset for
> some reason, we want to wait for that job. Without the predefined
> interested group, it is hard to achieve this.
>
>
> 2. Yes, the configuration should be at topic level and set dynamically.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote:
>
> > Hi Mayuresh,
> > Thanks for the reply:
> > 1.  In the log retention check schedule, the broker first find the all
> the
> > consumed group which are consuming this topic, and query the commit
> offset
> > of this consumed group for the topic
> > using the OffsetFetch API. And the min commit offset is the minimal
> commit
> > offset between these commit offsets.
> >
> >
> > 2.  If the console consumer reading and commit, its commit offset will be
> > used to calculate the min commit offset for this topic.
> > We can avoid the random consumer using the method Becket suggested.
> >
> >
> > 3. It will not delete the log immediately, the log will stay some time (
> > retention.commitoffset.ms), and after that we only delete
> > the log segments whose offsets are less than the min commit offset.  So
> > the user can rewind its offset in the log.retention.ms.
> >
> >
> > Thanks,
> > David
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: "Mayuresh Gharat";<gharatmayures...@gmail.com>;
> > 发送时间: 2016年10月19日(星期三) 上午10:25
> > 收件人: "dev"<dev@kafka.apache.org>;
> >
> > 主题: Re: [D

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-22 Thread Becket Qin
Hi David,

> 1. What scenario is used to this configuration?

One scenario is stream processing pipeline. In a stream processing DAG,
there will be a bunch of intermediate result, we only care about the
consumer group that is in the downstream of the DAG, but not other groups.
Ideally we want to delete the log of the intermediate topics right after
all the downstream processing jobs has successfully processed the messages.
In that case, we only care about the downstream processing jobs, but not
other groups. That means if a down stream job did not commit offset for
some reason, we want to wait for that job. Without the predefined
interested group, it is hard to achieve this.


2. Yes, the configuration should be at topic level and set dynamically.

Thanks,

Jiangjie (Becket) Qin

On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote:

> Hi Mayuresh,
> Thanks for the reply:
> 1.  In the log retention check schedule, the broker first find the all the
> consumed group which are consuming this topic, and query the commit offset
> of this consumed group for the topic
> using the OffsetFetch API. And the min commit offset is the minimal commit
> offset between these commit offsets.
>
>
> 2.  If the console consumer reading and commit, its commit offset will be
> used to calculate the min commit offset for this topic.
> We can avoid the random consumer using the method Becket suggested.
>
>
> 3. It will not delete the log immediately, the log will stay some time (
> retention.commitoffset.ms), and after that we only delete
> the log segments whose offsets are less than the min commit offset.  So
> the user can rewind its offset in the log.retention.ms.
>
>
> Thanks,
> David
>
>
>
>
> -- 原始邮件 --
> 发件人: "Mayuresh Gharat";<gharatmayures...@gmail.com>;
> 发送时间: 2016年10月19日(星期三) 上午10:25
> 收件人: "dev"<dev@kafka.apache.org>;
>
> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> Hi David,
>
> Thanks for the KIP.
>
> I had some questions/suggestions :
>
> It would be great if you can explain with an example about how the min
> offset for all the consumers will be calculated, in the KIP.
> What I meant was, it would be great to understand with a pseudo
> code/workflow if possible, how each broker knows all the consumers for the
> given topic-partition and how the min is calculated.
>
> Also it would be good to understand what happens if we start a console
> consumer which would actually start reading from the beginning offset and
> commit and crash immediately. How will the segments get deleted?
>
> Will it delete all the log segments if all the consumers have read till
> latest? If Yes, would we be able to handle a scenario were we say that user
> can rewind its offset to reprocess the data since log.retention.ms might
> not has reached.
>
> Thanks,
>
> Mayuresh
>
> On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin <becket@gmail.com> wrote:
>
> > Hey David,
> >
> > Thanks for replies to the questions.
> >
> > I think one major thing still not clear at this point is that whether the
> > brokers will only apply the consumed log retention to a specific set of
> > interested consumer groups, or it does not have such a set of consumer
> > groups.
> >
> > For example, for topic T, assume we know that there will be two
> downstream
> > consumer groups CG1 and CG2 consuming data from topic T. Will we add a
> > topic configurations such as
> > "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T so
> that
> > the brokers only care about CG1 and CG2. The committed offsets of other
> > groups are not interested and won't have any impact on the committed
> offset
> > based log retention.
> >
> > It seems the current proposal does not have an "interested consumer group
> > set" configuration, so that means any random consumer group may affect
> the
> > committed offset based log retention.
> >
> > I think the committed offset based log retention seems more useful in
> cases
> > where we already know which consumer groups will be consuming from this
> > topic, so we will only wait for those consumer groups but ignore the
> > others. If a group will be consumed by many unknown or unpredictable
> > consumer groups, it seems the existing time based log retention is much
> > simple and clear enough. So I would argue we don't need to address the
> case
> > that some groups may come later in the committed offset based retention.
> >
> > That said, there may still be value to keep the data for some time even
>

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-18 Thread Mayuresh Gharat
Hi David,

Thanks for the KIP.

I had some questions/suggestions :

It would be great if you can explain with an example about how the min
offset for all the consumers will be calculated, in the KIP.
What I meant was, it would be great to understand with a pseudo
code/workflow if possible, how each broker knows all the consumers for the
given topic-partition and how the min is calculated.

Also it would be good to understand what happens if we start a console
consumer which would actually start reading from the beginning offset and
commit and crash immediately. How will the segments get deleted?

Will it delete all the log segments if all the consumers have read till
latest? If Yes, would we be able to handle a scenario were we say that user
can rewind its offset to reprocess the data since log.retention.ms might
not has reached.

Thanks,

Mayuresh

On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin <becket@gmail.com> wrote:

> Hey David,
>
> Thanks for replies to the questions.
>
> I think one major thing still not clear at this point is that whether the
> brokers will only apply the consumed log retention to a specific set of
> interested consumer groups, or it does not have such a set of consumer
> groups.
>
> For example, for topic T, assume we know that there will be two downstream
> consumer groups CG1 and CG2 consuming data from topic T. Will we add a
> topic configurations such as
> "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T so that
> the brokers only care about CG1 and CG2. The committed offsets of other
> groups are not interested and won't have any impact on the committed offset
> based log retention.
>
> It seems the current proposal does not have an "interested consumer group
> set" configuration, so that means any random consumer group may affect the
> committed offset based log retention.
>
> I think the committed offset based log retention seems more useful in cases
> where we already know which consumer groups will be consuming from this
> topic, so we will only wait for those consumer groups but ignore the
> others. If a group will be consumed by many unknown or unpredictable
> consumer groups, it seems the existing time based log retention is much
> simple and clear enough. So I would argue we don't need to address the case
> that some groups may come later in the committed offset based retention.
>
> That said, there may still be value to keep the data for some time even
> after all the interested consumer groups have consumed the messages. For
> example, in a pipelined stream processing DAG, we may want to keep the data
> of an intermediate topic for some time in case the job fails. So we can
> resume from a previously succeeded stage instead of restart the entire
> pipeline. Or we can use the intermediate topic for some debugging work.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Sun, Oct 16, 2016 at 2:15 AM, 东方甲乙 <254479...@qq.com> wrote:
>
> > Hi Dong,
> > The KIP is used to solve both these 2 cases, we specify a small
> > consumed log retention time to deleted the consumed data and avoid losing
> > un-consumed data.
> > And the specify a large force log retention time used as higher bound for
> > the data.  I will update the KIP for this info.
> > Another solution I think may be ok is to support an API to delete the
> > inactive group?  If the group is in inactive, but it's commit offset is
> > also in the __commit_offsets topic and
> > stay in the offset cache,  we can delete it via this API.
> >
> >
> > Thanks,
> > David
> >
> >
> > -- 原始邮件 --
> > 发件人: "Dong Lin";<lindon...@gmail.com>;
> > 发送时间: 2016年10月14日(星期五) 凌晨5:01
> > 收件人: "dev"<dev@kafka.apache.org>;
> >
> > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log
> retention
> >
> >
> >
> > Hi David,
> >
> > As explained in the motivation section of the KIP, the problem is that if
> > log retention is too small, we may lose data; and if log retention is too
> > large, then we waste disk space. Therefore, we need to solve one if the
> two
> > problems -- allow data to be persisted longer for consumption if log
> > retention is set too small, or allow data to be expired earlier if log
> > retention is too large. I think the KIP probably needs to make this clear
> > and explain which one is rejected and why. Note that the choice of the
> two
> > affects the solution -- if we want to address the first problem then
> > log.retention.ms should be used as lower bound on the actual retention
> > time, and if we want to address the

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-17 Thread Becket Qin
Hey David,

Thanks for replies to the questions.

I think one major thing still not clear at this point is that whether the
brokers will only apply the consumed log retention to a specific set of
interested consumer groups, or it does not have such a set of consumer
groups.

For example, for topic T, assume we know that there will be two downstream
consumer groups CG1 and CG2 consuming data from topic T. Will we add a
topic configurations such as
"log.retention.commitoffset.interested.groups=CG1,CG2" to topic T so that
the brokers only care about CG1 and CG2. The committed offsets of other
groups are not interested and won't have any impact on the committed offset
based log retention.

It seems the current proposal does not have an "interested consumer group
set" configuration, so that means any random consumer group may affect the
committed offset based log retention.

I think the committed offset based log retention seems more useful in cases
where we already know which consumer groups will be consuming from this
topic, so we will only wait for those consumer groups but ignore the
others. If a group will be consumed by many unknown or unpredictable
consumer groups, it seems the existing time based log retention is much
simple and clear enough. So I would argue we don't need to address the case
that some groups may come later in the committed offset based retention.

That said, there may still be value to keep the data for some time even
after all the interested consumer groups have consumed the messages. For
example, in a pipelined stream processing DAG, we may want to keep the data
of an intermediate topic for some time in case the job fails. So we can
resume from a previously succeeded stage instead of restart the entire
pipeline. Or we can use the intermediate topic for some debugging work.

Thanks,

Jiangjie (Becket) Qin



On Sun, Oct 16, 2016 at 2:15 AM, 东方甲乙 <254479...@qq.com> wrote:

> Hi Dong,
> The KIP is used to solve both these 2 cases, we specify a small
> consumed log retention time to deleted the consumed data and avoid losing
> un-consumed data.
> And the specify a large force log retention time used as higher bound for
> the data.  I will update the KIP for this info.
> Another solution I think may be ok is to support an API to delete the
> inactive group?  If the group is in inactive, but it's commit offset is
> also in the __commit_offsets topic and
> stay in the offset cache,  we can delete it via this API.
>
>
> Thanks,
> David
>
>
> -- 原始邮件 --
> 发件人: "Dong Lin";<lindon...@gmail.com>;
> 发送时间: 2016年10月14日(星期五) 凌晨5:01
> 收件人: "dev"<dev@kafka.apache.org>;
>
> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> Hi David,
>
> As explained in the motivation section of the KIP, the problem is that if
> log retention is too small, we may lose data; and if log retention is too
> large, then we waste disk space. Therefore, we need to solve one if the two
> problems -- allow data to be persisted longer for consumption if log
> retention is set too small, or allow data to be expired earlier if log
> retention is too large. I think the KIP probably needs to make this clear
> and explain which one is rejected and why. Note that the choice of the two
> affects the solution -- if we want to address the first problem then
> log.retention.ms should be used as lower bound on the actual retention
> time, and if we want to address the second problem then the
> log.retention.ms
> should be used as higher bound on the actual retention time.
>
> In both cases, we probably need to figure out a way to determine "active
> consumer group". Maybe we can compare the time-since-last-commit against a
> threshold to determine this. In addition, the threshold can be overridden
> either per-topic or per-groupId. If we go along this route, the rejected
> solution (per-topic vs. per-groupId) should probably be explained in the
> KIP.
>
>
> Thanks,
> Dong
>
>
>
> On Thu, Oct 13, 2016 at 10:23 AM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi David,
> >
> > Thanks for your explanation. There still seems to be issue with this
> > solution. Please see my comment inline.
> >
> >
> > On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <254479...@qq.com> wrote:
> >
> >> Hi Dong,
> >> Sorry for the delay, here are the comments:
> >> 1.I think we should distinguish these two cases:
> >> (1) group has no member, but has commit offset :  In this case we should
> >> consider its commit offset
> >> (2) group has no member, no commit offset:  Skip this group
> >> Is it ok?
> >>
> >>
> >> L

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-13 Thread Dong Lin
Hi David,

As explained in the motivation section of the KIP, the problem is that if
log retention is too small, we may lose data; and if log retention is too
large, then we waste disk space. Therefore, we need to solve one if the two
problems -- allow data to be persisted longer for consumption if log
retention is set too small, or allow data to be expired earlier if log
retention is too large. I think the KIP probably needs to make this clear
and explain which one is rejected and why. Note that the choice of the two
affects the solution -- if we want to address the first problem then
log.retention.ms should be used as lower bound on the actual retention
time, and if we want to address the second problem then the log.retention.ms
should be used as higher bound on the actual retention time.

In both cases, we probably need to figure out a way to determine "active
consumer group". Maybe we can compare the time-since-last-commit against a
threshold to determine this. In addition, the threshold can be overridden
either per-topic or per-groupId. If we go along this route, the rejected
solution (per-topic vs. per-groupId) should probably be explained in the
KIP.


Thanks,
Dong



On Thu, Oct 13, 2016 at 10:23 AM, Dong Lin <lindon...@gmail.com> wrote:

> Hi David,
>
> Thanks for your explanation. There still seems to be issue with this
> solution. Please see my comment inline.
>
>
> On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <254479...@qq.com> wrote:
>
>> Hi Dong,
>> Sorry for the delay, here are the comments:
>> 1.I think we should distinguish these two cases:
>> (1) group has no member, but has commit offset :  In this case we should
>> consider its commit offset
>> (2) group has no member, no commit offset:  Skip this group
>> Is it ok?
>>
>>
>> ListGroup API can list the groups,  but this API only show the Online
>> Group, so we should enhance the listGroup API to list those groups in the
>> case (1)
>>
>> Say some user starts a consumer to consume topic A with
> enable.auto.commit = true. Later they change the group name in the config.
> Then the proposed solution will never execute consumed log retention for
> the topic A, right? I think group name change is pretty common and we
> should take care of this issue. One possible solution is to add a config to
> specify the maximum time since last offset commit before we consider a
> group is inactive.
>
>
>>
>> 2. Because every consumer group may appear in different time, say, group
>> 1 start to consume in day 1, group 2 start to consume in day 2.  If we
>> delete the log segment right away,
>> group 2 can not consume these message.  So we hope the messages can hold
>> for a specified time.  I think many use-cases will need there configs, if
>> there are many consumer groups.
>>
>>
> If we want to take care of group 2, can we simply disable consumed log
> retention for the topic and set log retention to 1 day? Can you explain the
> benefit of enabling consumed log retention and set consumed log retention
> to 1 day?
>
> Currently the flow graph in the KIP suggests that we delete data iff
> (consumed log retention is triggered OR forced log retention is triggered).
> And alternative solution is to delete data iff ( (consumed log retention is
> disabled OR consumed log retention is triggered) AND forced log retention
> is triggered). I would argue that the 2nd scheme is better. Say the
> consumed log retention is enabled. The 1st scheme basically interprets
> forced log retention as the upper bound of the time the data can stay in
> Kafka. The 2nd scheme interprets forced log retention as the lower bound of
> the time the data can stay in Kafka, which is more consistent with the
> purpose of having this forced log retention (to save disk space). And if we
> adopt the 2nd solution, the use-case you suggested can be easily addressed
> by setting forced log retention to 1 day and enable consumed log retention.
> What do you think?
>
>
>
>>
>> Thanks,
>> David
>>
>>
>>
>>
>> -- 原始邮件 --
>> 发件人: "Dong Lin";<lindon...@gmail.com>;
>> 发送时间: 2016年10月10日(星期一) 下午4:05
>> 收件人: "dev"<dev@kafka.apache.org>;
>>
>> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>>
>>
>>
>> Hey David,
>>
>> Thanks for reply. Please see comment inline.
>>
>> On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) <pengwei...@huawei.com>
>> wrote:
>>
>> > Hi Dong
>> >Thanks for the questions:
>> >
>> > 1.  Now we don't distinguish inactive or active groups. Because in 

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-13 Thread Dong Lin
Hi David,

Thanks for your explanation. There still seems to be issue with this
solution. Please see my comment inline.


On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <254479...@qq.com> wrote:

> Hi Dong,
> Sorry for the delay, here are the comments:
> 1.I think we should distinguish these two cases:
> (1) group has no member, but has commit offset :  In this case we should
> consider its commit offset
> (2) group has no member, no commit offset:  Skip this group
> Is it ok?
>
>
> ListGroup API can list the groups,  but this API only show the Online
> Group, so we should enhance the listGroup API to list those groups in the
> case (1)
>
> Say some user starts a consumer to consume topic A with enable.auto.commit
= true. Later they change the group name in the config. Then the proposed
solution will never execute consumed log retention for the topic A, right?
I think group name change is pretty common and we should take care of this
issue. One possible solution is to add a config to specify the maximum time
since last offset commit before we consider a group is inactive.


>
> 2. Because every consumer group may appear in different time, say, group 1
> start to consume in day 1, group 2 start to consume in day 2.  If we delete
> the log segment right away,
> group 2 can not consume these message.  So we hope the messages can hold
> for a specified time.  I think many use-cases will need there configs, if
> there are many consumer groups.
>
>
If we want to take care of group 2, can we simply disable consumed log
retention for the topic and set log retention to 1 day? Can you explain the
benefit of enabling consumed log retention and set consumed log retention
to 1 day?

Currently the flow graph in the KIP suggests that we delete data iff
(consumed log retention is triggered OR forced log retention is triggered).
And alternative solution is to delete data iff ( (consumed log retention is
disabled OR consumed log retention is triggered) AND forced log retention
is triggered). I would argue that the 2nd scheme is better. Say the
consumed log retention is enabled. The 1st scheme basically interprets
forced log retention as the upper bound of the time the data can stay in
Kafka. The 2nd scheme interprets forced log retention as the lower bound of
the time the data can stay in Kafka, which is more consistent with the
purpose of having this forced log retention (to save disk space). And if we
adopt the 2nd solution, the use-case you suggested can be easily addressed
by setting forced log retention to 1 day and enable consumed log retention.
What do you think?



>
> Thanks,
> David
>
>
>
>
> -- 原始邮件 --
> 发件人: "Dong Lin";<lindon...@gmail.com>;
> 发送时间: 2016年10月10日(星期一) 下午4:05
> 收件人: "dev"<dev@kafka.apache.org>;
>
> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> Hey David,
>
> Thanks for reply. Please see comment inline.
>
> On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) <pengwei...@huawei.com>
> wrote:
>
> > Hi Dong
> >Thanks for the questions:
> >
> > 1.  Now we don't distinguish inactive or active groups. Because in some
> > case maybe inactive group will become active again, and using the
> previous
> > commit offset.
> >
> > So we will not delete the log segment in the consumer retention if there
> > are some groups consume but not commit, but the log segment can be
> delete by
> >  the force retention.
> >
>
> So in the example I provided, the consumed log retention will be
> effectively disabled, right? This seems to be a real problem in operation
> -- we don't want log retention to be un-intentionally disabled simply
> because someone start a tool to consume from that topic. Either this KIP
> should provide a way to handle this, or there should be a way for operator
> to be aware of such case and be able to re-eanble consumed log retention
> for the topic. What do you think?
>
>
>
> > 2.  These configs are used to determine the out of date time of the
> > consumed retention, like the parameters of the force retention
> > (log.retention.hours, log.retention.minutes, log.retention.ms). For
> > example, users want the save the log for 3 days, after 3 days, kafka will
> > delete the log segments which are
> >
> > consumed by all the consumer group.  The log retention thread need these
> > parameters.
> >
> > It makes sense to have configs such as log.retention.ms -- it is used to
> make data available for up to a configured amount of time before it is
> deleted. My question is what is the use-case for making log available for
> another e.g. 3 days after it has been consumed by all

RE?? [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-13 Thread ????????
Hi Dong,
Sorry for the delay, here are the comments:
1.I think we should distinguish these two cases:
(1) group has no member, but has commit offset :  In this case we should 
consider its commit offset
(2) group has no member, no commit offset:  Skip this group
Is it ok?


ListGroup API can list the groups,  but this API only show the Online Group, so 
we should enhance the listGroup API to list those groups in the case (1)


2. Because every consumer group may appear in different time, say, group 1 
start to consume in day 1, group 2 start to consume in day 2.  If we delete the 
log segment right away,
group 2 can not consume these message.  So we hope the messages can hold for a 
specified time.  I think many use-cases will need there configs, if there are 
many consumer groups.


Thanks,
David




--  --
??: "Dong Lin";<lindon...@gmail.com>;
: 2016??10??10??(??) 4:05
??: "dev"<dev@kafka.apache.org>; 

????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention



Hey David,

Thanks for reply. Please see comment inline.

On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) <pengwei...@huawei.com> wrote:

> Hi Dong
>Thanks for the questions:
>
> 1.  Now we don't distinguish inactive or active groups. Because in some
> case maybe inactive group will become active again, and using the previous
> commit offset.
>
> So we will not delete the log segment in the consumer retention if there
> are some groups consume but not commit, but the log segment can be delete by
>  the force retention.
>

So in the example I provided, the consumed log retention will be
effectively disabled, right? This seems to be a real problem in operation
-- we don't want log retention to be un-intentionally disabled simply
because someone start a tool to consume from that topic. Either this KIP
should provide a way to handle this, or there should be a way for operator
to be aware of such case and be able to re-eanble consumed log retention
for the topic. What do you think?



> 2.  These configs are used to determine the out of date time of the
> consumed retention, like the parameters of the force retention
> (log.retention.hours, log.retention.minutes, log.retention.ms). For
> example, users want the save the log for 3 days, after 3 days, kafka will
> delete the log segments which are
>
> consumed by all the consumer group.  The log retention thread need these
> parameters.
>
> It makes sense to have configs such as log.retention.ms -- it is used to
make data available for up to a configured amount of time before it is
deleted. My question is what is the use-case for making log available for
another e.g. 3 days after it has been consumed by all consumer groups. The
purpose of this KIP is to allow log to be deleted right as long as all
interested consumer groups have consumed it. Can you provide a use-case for
keeping log available for longer time after it has been consumed by all
groups?


>
> Thanks,
> David
>
>
> > Hey David,
> >
> > Thanks for the KIP. Can you help with the following two questions:
> >
> > 1) If someone start a consumer (e.g. kafka-console-consumer) to consume a
> > topic for debug/validation purpose, a randome consumer group may be
> created
> > and offset may be committed for this consumer group. If no offset commit
> is
> > made for this consumer group in the future, will this effectively
> > disable consumed log retention for this topic? In other words, how do
> this
> > KIP distinguish active consumer group from inactive ones?
> >
> > 2) Why do we need new configs such as log.retention.commitoffset.hours?
> Can
> >we simply delete log segments if consumed log retention is enabled for
> this
> > topic and all consumer groups have consumed messages in the log segment?
> >
> > Thanks,
> > Dong
> >
> >
> >
> >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) <pengwei...@huawei.com>
> wrote:
> >
> > > Hi Becket,
> > >
> > >   Thanks for the feedback:
> > > 1.  We use the simple consumer api to query the commit offset, so we
> don't
> > > need to specify the consumer group.
> > > 2.  Every broker using the simple consumer api(OffsetFetchKey) to query
> > > the commit offset in the log retention process.  The client can commit
> > > offset or not.
> > > 3.  It does not need to distinguish the follower brokers or leader
> > > brokers,  every brokers can query.
> > > 4.  We don't need to change the protocols, we mainly change the log
> > > retention process in the log manager.
> > >
> > >   One question is the query min offset need O(par

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-10 Thread Renu Tewari
Hi David
  This is a very timely KIP given the number of use cases in the streams
processing pipeline than need consumed log retention management.

Some questions that Becket and Dong asked just wanted to make sure are
described in the KIP.

1. How is the configuration setup per topic to know what is the set of
consumer groups that are "subscribed" to this topic whose committed offsets
will be tracked. Can we have more details on how this will be dynamically
tracked as consumers come and go.
2. Is there a timeout to determine if a consumer group has stopped
committing offsets to topic partitions that they had earlier consumed? Or
the consumed log retention will track each known consumer/consumers groups
committed offset and stop any cleaning if a consumer disappears after
consuming. This is to Dong's earlier question.
3. Can the log.retention value be set to 0 to indicate the log is set to be
cleaned to the min committed offset immediately after it has been consumed?

4. What guarantee is given on when the consumed log will eventually be
cleaned. If the log.retention timeout is enabled for a consumed offset and
a new consumer starts consuming from the beginning then is the min
committed offset value changed and the timer based on log.retention timeout
restarted?

 This kind of all relates to active and inactive consumers and if the set
changes dynamically how does the consumed log retention actually make
progress.

regards
renu


On Mon, Oct 10, 2016 at 1:05 AM, Dong Lin  wrote:

> Hey David,
>
> Thanks for reply. Please see comment inline.
>
> On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) 
> wrote:
>
> > Hi Dong
> >Thanks for the questions:
> >
> > 1.  Now we don't distinguish inactive or active groups. Because in some
> > case maybe inactive group will become active again, and using the
> previous
> > commit offset.
> >
> > So we will not delete the log segment in the consumer retention if there
> > are some groups consume but not commit, but the log segment can be
> delete by
> >  the force retention.
> >
>
> So in the example I provided, the consumed log retention will be
> effectively disabled, right? This seems to be a real problem in operation
> -- we don't want log retention to be un-intentionally disabled simply
> because someone start a tool to consume from that topic. Either this KIP
> should provide a way to handle this, or there should be a way for operator
> to be aware of such case and be able to re-eanble consumed log retention
> for the topic. What do you think?
>
>
>
> > 2.  These configs are used to determine the out of date time of the
> > consumed retention, like the parameters of the force retention
> > (log.retention.hours, log.retention.minutes, log.retention.ms). For
> > example, users want the save the log for 3 days, after 3 days, kafka will
> > delete the log segments which are
> >
> > consumed by all the consumer group.  The log retention thread need these
> > parameters.
> >
> > It makes sense to have configs such as log.retention.ms -- it is used to
> make data available for up to a configured amount of time before it is
> deleted. My question is what is the use-case for making log available for
> another e.g. 3 days after it has been consumed by all consumer groups. The
> purpose of this KIP is to allow log to be deleted right as long as all
> interested consumer groups have consumed it. Can you provide a use-case for
> keeping log available for longer time after it has been consumed by all
> groups?
>
>
> >
> > Thanks,
> > David
> >
> >
> > > Hey David,
> > >
> > > Thanks for the KIP. Can you help with the following two questions:
> > >
> > > 1) If someone start a consumer (e.g. kafka-console-consumer) to
> consume a
> > > topic for debug/validation purpose, a randome consumer group may be
> > created
> > > and offset may be committed for this consumer group. If no offset
> commit
> > is
> > > made for this consumer group in the future, will this effectively
> > > disable consumed log retention for this topic? In other words, how do
> > this
> > > KIP distinguish active consumer group from inactive ones?
> > >
> > > 2) Why do we need new configs such as log.retention.commitoffset.
> hours?
> > Can
> > >we simply delete log segments if consumed log retention is enabled for
> > this
> > > topic and all consumer groups have consumed messages in the log
> segment?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) 
> > wrote:
> > >
> > > > Hi Becket,
> > > >
> > > >   Thanks for the feedback:
> > > > 1.  We use the simple consumer api to query the commit offset, so we
> > don't
> > > > need to specify the consumer group.
> > > > 2.  Every broker using the simple consumer api(OffsetFetchKey) to
> query
> > > > the commit offset in the log retention process.  The client can
> commit
> > > > offset or not.
> > > > 3.  It does not need to distinguish the 

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-10 Thread Dong Lin
Hey David,

Thanks for reply. Please see comment inline.

On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L)  wrote:

> Hi Dong
>Thanks for the questions:
>
> 1.  Now we don't distinguish inactive or active groups. Because in some
> case maybe inactive group will become active again, and using the previous
> commit offset.
>
> So we will not delete the log segment in the consumer retention if there
> are some groups consume but not commit, but the log segment can be delete by
>  the force retention.
>

So in the example I provided, the consumed log retention will be
effectively disabled, right? This seems to be a real problem in operation
-- we don't want log retention to be un-intentionally disabled simply
because someone start a tool to consume from that topic. Either this KIP
should provide a way to handle this, or there should be a way for operator
to be aware of such case and be able to re-eanble consumed log retention
for the topic. What do you think?



> 2.  These configs are used to determine the out of date time of the
> consumed retention, like the parameters of the force retention
> (log.retention.hours, log.retention.minutes, log.retention.ms). For
> example, users want the save the log for 3 days, after 3 days, kafka will
> delete the log segments which are
>
> consumed by all the consumer group.  The log retention thread need these
> parameters.
>
> It makes sense to have configs such as log.retention.ms -- it is used to
make data available for up to a configured amount of time before it is
deleted. My question is what is the use-case for making log available for
another e.g. 3 days after it has been consumed by all consumer groups. The
purpose of this KIP is to allow log to be deleted right as long as all
interested consumer groups have consumed it. Can you provide a use-case for
keeping log available for longer time after it has been consumed by all
groups?


>
> Thanks,
> David
>
>
> > Hey David,
> >
> > Thanks for the KIP. Can you help with the following two questions:
> >
> > 1) If someone start a consumer (e.g. kafka-console-consumer) to consume a
> > topic for debug/validation purpose, a randome consumer group may be
> created
> > and offset may be committed for this consumer group. If no offset commit
> is
> > made for this consumer group in the future, will this effectively
> > disable consumed log retention for this topic? In other words, how do
> this
> > KIP distinguish active consumer group from inactive ones?
> >
> > 2) Why do we need new configs such as log.retention.commitoffset.hours?
> Can
> >we simply delete log segments if consumed log retention is enabled for
> this
> > topic and all consumer groups have consumed messages in the log segment?
> >
> > Thanks,
> > Dong
> >
> >
> >
> >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) 
> wrote:
> >
> > > Hi Becket,
> > >
> > >   Thanks for the feedback:
> > > 1.  We use the simple consumer api to query the commit offset, so we
> don't
> > > need to specify the consumer group.
> > > 2.  Every broker using the simple consumer api(OffsetFetchKey) to query
> > > the commit offset in the log retention process.  The client can commit
> > > offset or not.
> > > 3.  It does not need to distinguish the follower brokers or leader
> > > brokers,  every brokers can query.
> > > 4.  We don't need to change the protocols, we mainly change the log
> > > retention process in the log manager.
> > >
> > >   One question is the query min offset need O(partitions * groups) time
> > > complexity, another alternative is to build an internal topic to save
> every
> > > partition's min offset, it can reduce to O(1).
> > > I will update the wiki for more details.
> > >
> > > Thanks,
> > > David
> > >
> > >
> > > > Hi Pengwei,
> > > >
> > > > Thanks for the KIP proposal. It is a very useful KIP. At a high
> level,
> > > the
> > > > proposed behavior looks reasonable to me.
> > > >
> > > > However, it seems that some of the details are not mentioned in the
> KIP.
> > > > For example,
> > > >
> > > > 1. How will the expected consumer group be specified? Is it through
> a per
> > > > topic dynamic configuration?
> > > > 2. How do the brokers detect the consumer offsets? Is it required
> for a
> > > > consumer to commit offsets?
> > > > 3. How do all the replicas know the about the committed offsets?
> e.g. 1)
> > > > non-coordinator brokers which do not have the committed offsets, 2)
> > > > follower brokers which do not have consumers directly consuming from
> it.
> > > > 4. Is there any other changes need to be made (e.g. new protocols) in
> > > > addition to the configuration change?
> > > >
> > > > It would be great if you can update the wiki to have more details.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) 
> > > wrote:
> > > >
> > > > > Hi All,
> > > > >I have made a KIP to enhance the log retention, 

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-10 Thread Pengwei (L)
Hi Dong
   Thanks for the questions:

1.  Now we don't distinguish inactive or active groups. Because in some case 
maybe inactive group will become active again, and using the previous commit 
offset.

So we will not delete the log segment in the consumer retention if there are 
some groups consume but not commit, but the log segment can be delete by
 the force retention.

2.  These configs are used to determine the out of date time of the consumed 
retention, like the parameters of the force retention (log.retention.hours, 
log.retention.minutes, log.retention.ms). For example, users want the save the 
log for 3 days, after 3 days, kafka will delete the log segments which are

consumed by all the consumer group.  The log retention thread need these 
parameters.


Thanks,
David


> Hey David,
>
> Thanks for the KIP. Can you help with the following two questions:
>
> 1) If someone start a consumer (e.g. kafka-console-consumer) to consume a
> topic for debug/validation purpose, a randome consumer group may be created
> and offset may be committed for this consumer group. If no offset commit is
> made for this consumer group in the future, will this effectively
> disable consumed log retention for this topic? In other words, how do this
> KIP distinguish active consumer group from inactive ones?
>
> 2) Why do we need new configs such as log.retention.commitoffset.hours? Can
>we simply delete log segments if consumed log retention is enabled for this
> topic and all consumer groups have consumed messages in the log segment?
>
> Thanks,
> Dong
>
>
>
>On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L)  wrote:
>
> > Hi Becket,
> >
> >   Thanks for the feedback:
> > 1.  We use the simple consumer api to query the commit offset, so we don't
> > need to specify the consumer group.
> > 2.  Every broker using the simple consumer api(OffsetFetchKey) to query
> > the commit offset in the log retention process.  The client can commit
> > offset or not.
> > 3.  It does not need to distinguish the follower brokers or leader
> > brokers,  every brokers can query.
> > 4.  We don't need to change the protocols, we mainly change the log
> > retention process in the log manager.
> >
> >   One question is the query min offset need O(partitions * groups) time
> > complexity, another alternative is to build an internal topic to save every
> > partition's min offset, it can reduce to O(1).
> > I will update the wiki for more details.
> >
> > Thanks,
> > David
> >
> >
> > > Hi Pengwei,
> > >
> > > Thanks for the KIP proposal. It is a very useful KIP. At a high level,
> > the
> > > proposed behavior looks reasonable to me.
> > >
> > > However, it seems that some of the details are not mentioned in the KIP.
> > > For example,
> > >
> > > 1. How will the expected consumer group be specified? Is it through a per
> > > topic dynamic configuration?
> > > 2. How do the brokers detect the consumer offsets? Is it required for a
> > > consumer to commit offsets?
> > > 3. How do all the replicas know the about the committed offsets? e.g. 1)
> > > non-coordinator brokers which do not have the committed offsets, 2)
> > > follower brokers which do not have consumers directly consuming from it.
> > > 4. Is there any other changes need to be made (e.g. new protocols) in
> > > addition to the configuration change?
> > >
> > > It would be great if you can update the wiki to have more details.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) 
> > wrote:
> > >
> > > > Hi All,
> > > >I have made a KIP to enhance the log retention, details as follows:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 68+Add+a+consumed+log+retention+before+log+retention
> > > >Now start a discuss thread for this KIP , looking forward to the
> > > > feedback.
> > > >
> > > > Thanks,
> > > > David
> > > >
> > > >



Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-09 Thread Becket Qin
Hey David,

Thanks for updating the wiki.

1. I was actually thinking of letting every broker just consume the
__consumer_offsets topic. But it seems less efficient if there are only a
few topics configured for committed offsets based retention. So querying
the committed offsets seems reasonable. From the wiki it is not clear
whether the committed offset query happens sync or async. It is probably
better to do this asynchronously, i.e. in another thread other than the log
deleting thread. Otherwise querying the committed offsets may slow down or
even potentially block the log deletion due to a remote call failure.

2. Using new consumer does not necessarily introduce a new group unless we
use Kafka based group management. But using KafkaConsumer directly to query
the committed offsets may not work in this case because by default it uses
the consumer group in the ConsumerConfig. We can use NetworkClient and see
if we can reuse some of the code in the new consumer. Since there has been
a lot of efforts spent on deprecating the SimpleConsumer, we probably want
to avoid introducing any new usage. Anyway, this is implementation detail
and we can figure that out when writing the patch.

3. What I am thinking is that we want to consider whether we will allow
multiple policies to be set at the same time? If we do allow that, which
one of the policies will take precedence. Otherwise it might be confusing
for the users if they have multiple retention policies set.

In addition to the above, it seems that we need some way to configure the
set of consumer groups a topic should be listening on? If it is through
topic config, it would be good to document the configuration name and
format of value in the wiki as well.

Thanks,

Jiangjie (Becket) Qin



On Sun, Oct 9, 2016 at 7:14 AM, 东方甲乙 <254479...@qq.com> wrote:

> Hi Becket,
> This is david, thanks for the comments.  I have update some info in
> the wiki. All the changes is nearly described in the workflow.
> Answer for the commnets:
> 1. Every brokers only have some of the groups' commit offset which are
> storaged in the __comsumer_offsets topics,  it still have to query other
> coordinator(other brokers) for some group's commit offset.
> So we use the OffsetFetchRequest to query one group's commit offset.
>
>
> 2. If using new consumer to query the commit offset will introduce new
> group, but if we use the OffsetFetchRequest to query (like the
> consumer-offset-checker tool, first find the coordinator and build an
> channel to query), we will not introduce new group.
>
>
> 3. I think the KIP-47's functionality seems a little different from this
> KIP, though we are all modifying the log retention.
>
>
> Thanks,
> David.
>
>
>
>
>
>
>
>
> -- 原始邮件 --
> 发件人: "Becket Qin";<becket....@gmail.com>;
> 发送时间: 2016年10月9日(星期天) 中午1:00
> 收件人: "dev"<dev@kafka.apache.org>;
>
> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> Hi David,
>
> Thanks for the explanation. Could you update the KIP-68 wiki to include the
> changes that need to be made?
>
> I have a few more comments below:
>
> 1. We already have an internal topic __consumer_offsets to store all the
> committed offsets. So the brokers can probably just consume from that to
> get the committed offsets for all the partitions of each group.
>
> 2. It is probably better to use o.a.k.clients.consumer.KafkaConsumer
> instead of SimpleConsumer. It handles all the leader movements and
> potential failures.
>
> 3. KIP-47 also has a proposal for a new time based log retention policy and
> propose a new configuration on log retention. It may be worth thinking
> about the behavior together.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) <pengwei...@huawei.com> wrote:
>
> > Hi Becket,
> >
> >   Thanks for the feedback:
> > 1.  We use the simple consumer api to query the commit offset, so we
> don't
> > need to specify the consumer group.
> > 2.  Every broker using the simple consumer api(OffsetFetchKey) to query
> > the commit offset in the log retention process.  The client can commit
> > offset or not.
> > 3.  It does not need to distinguish the follower brokers or leader
> > brokers,  every brokers can query.
> > 4.  We don't need to change the protocols, we mainly change the log
> > retention process in the log manager.
> >
> >   One question is the query min offset need O(partitions * groups) time
> > complexity, another alternative is to build an internal topic to save
> every
> > partition's min offset, it can reduce to O(1).
> > I will update the wiki for more details.
>

RE?? [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-09 Thread ????????
Hi Becket,
This is david, thanks for the comments.  I have update some info in the 
wiki. All the changes is nearly described in the workflow.
Answer for the commnets:
1. Every brokers only have some of the groups' commit offset which are storaged 
in the __comsumer_offsets topics,  it still have to query other 
coordinator(other brokers) for some group's commit offset.
So we use the OffsetFetchRequest to query one group's commit offset.


2. If using new consumer to query the commit offset will introduce new group, 
but if we use the OffsetFetchRequest to query (like the consumer-offset-checker 
tool, first find the coordinator and build an channel to query), we will not 
introduce new group.


3. I think the KIP-47's functionality seems a little different from this KIP, 
though we are all modifying the log retention. 


Thanks,
David.








--  --
??: "Becket Qin";<becket@gmail.com>;
: 2016??10??9??(??) 1:00
??: "dev"<dev@kafka.apache.org>; 

????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention



Hi David,

Thanks for the explanation. Could you update the KIP-68 wiki to include the
changes that need to be made?

I have a few more comments below:

1. We already have an internal topic __consumer_offsets to store all the
committed offsets. So the brokers can probably just consume from that to
get the committed offsets for all the partitions of each group.

2. It is probably better to use o.a.k.clients.consumer.KafkaConsumer
instead of SimpleConsumer. It handles all the leader movements and
potential failures.

3. KIP-47 also has a proposal for a new time based log retention policy and
propose a new configuration on log retention. It may be worth thinking
about the behavior together.

Thanks,

Jiangjie (Becket) Qin

On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) <pengwei...@huawei.com> wrote:

> Hi Becket,
>
>   Thanks for the feedback:
> 1.  We use the simple consumer api to query the commit offset, so we don't
> need to specify the consumer group.
> 2.  Every broker using the simple consumer api(OffsetFetchKey) to query
> the commit offset in the log retention process.  The client can commit
> offset or not.
> 3.  It does not need to distinguish the follower brokers or leader
> brokers,  every brokers can query.
> 4.  We don't need to change the protocols, we mainly change the log
> retention process in the log manager.
>
>   One question is the query min offset need O(partitions * groups) time
> complexity, another alternative is to build an internal topic to save every
> partition's min offset, it can reduce to O(1).
> I will update the wiki for more details.
>
> Thanks,
> David
>
>
> > Hi Pengwei,
> >
> > Thanks for the KIP proposal. It is a very useful KIP. At a high level,
> the
> > proposed behavior looks reasonable to me.
> >
> > However, it seems that some of the details are not mentioned in the KIP.
> > For example,
> >
> > 1. How will the expected consumer group be specified? Is it through a per
> > topic dynamic configuration?
> > 2. How do the brokers detect the consumer offsets? Is it required for a
> > consumer to commit offsets?
> > 3. How do all the replicas know the about the committed offsets? e.g. 1)
> > non-coordinator brokers which do not have the committed offsets, 2)
> > follower brokers which do not have consumers directly consuming from it.
> > 4. Is there any other changes need to be made (e.g. new protocols) in
> > addition to the configuration change?
> >
> > It would be great if you can update the wiki to have more details.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) <pengwei...@huawei.com>
> wrote:
> >
> > > Hi All,
> > >I have made a KIP to enhance the log retention, details as follows:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 68+Add+a+consumed+log+retention+before+log+retention
> > >Now start a discuss thread for this KIP , looking forward to the
> > > feedback.
> > >
> > > Thanks,
> > > David
> > >
> > >
>

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-09 Thread Dong Lin
Hey David,

Thanks for the KIP. Can you help with the following two questions:

1) If someone start a consumer (e.g. kafka-console-consumer) to consume a
topic for debug/validation purpose, a randome consumer group may be created
and offset may be committed for this consumer group. If no offset commit is
made for this consumer group in the future, will this effectively
disable consumed log retention for this topic? In other words, how do this
KIP distinguish active consumer group from inactive ones?

2) Why do we need new configs such as log.retention.commitoffset.hours? Can
we simply delete log segments if consumed log retention is enabled for this
topic and all consumer groups have consumed messages in the log segment?

Thanks,
Dong



On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L)  wrote:

> Hi Becket,
>
>   Thanks for the feedback:
> 1.  We use the simple consumer api to query the commit offset, so we don't
> need to specify the consumer group.
> 2.  Every broker using the simple consumer api(OffsetFetchKey) to query
> the commit offset in the log retention process.  The client can commit
> offset or not.
> 3.  It does not need to distinguish the follower brokers or leader
> brokers,  every brokers can query.
> 4.  We don't need to change the protocols, we mainly change the log
> retention process in the log manager.
>
>   One question is the query min offset need O(partitions * groups) time
> complexity, another alternative is to build an internal topic to save every
> partition's min offset, it can reduce to O(1).
> I will update the wiki for more details.
>
> Thanks,
> David
>
>
> > Hi Pengwei,
> >
> > Thanks for the KIP proposal. It is a very useful KIP. At a high level,
> the
> > proposed behavior looks reasonable to me.
> >
> > However, it seems that some of the details are not mentioned in the KIP.
> > For example,
> >
> > 1. How will the expected consumer group be specified? Is it through a per
> > topic dynamic configuration?
> > 2. How do the brokers detect the consumer offsets? Is it required for a
> > consumer to commit offsets?
> > 3. How do all the replicas know the about the committed offsets? e.g. 1)
> > non-coordinator brokers which do not have the committed offsets, 2)
> > follower brokers which do not have consumers directly consuming from it.
> > 4. Is there any other changes need to be made (e.g. new protocols) in
> > addition to the configuration change?
> >
> > It would be great if you can update the wiki to have more details.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) 
> wrote:
> >
> > > Hi All,
> > >I have made a KIP to enhance the log retention, details as follows:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 68+Add+a+consumed+log+retention+before+log+retention
> > >Now start a discuss thread for this KIP , looking forward to the
> > > feedback.
> > >
> > > Thanks,
> > > David
> > >
> > >
>


Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-08 Thread Becket Qin
Hi David,

Thanks for the explanation. Could you update the KIP-68 wiki to include the
changes that need to be made?

I have a few more comments below:

1. We already have an internal topic __consumer_offsets to store all the
committed offsets. So the brokers can probably just consume from that to
get the committed offsets for all the partitions of each group.

2. It is probably better to use o.a.k.clients.consumer.KafkaConsumer
instead of SimpleConsumer. It handles all the leader movements and
potential failures.

3. KIP-47 also has a proposal for a new time based log retention policy and
propose a new configuration on log retention. It may be worth thinking
about the behavior together.

Thanks,

Jiangjie (Becket) Qin

On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L)  wrote:

> Hi Becket,
>
>   Thanks for the feedback:
> 1.  We use the simple consumer api to query the commit offset, so we don't
> need to specify the consumer group.
> 2.  Every broker using the simple consumer api(OffsetFetchKey) to query
> the commit offset in the log retention process.  The client can commit
> offset or not.
> 3.  It does not need to distinguish the follower brokers or leader
> brokers,  every brokers can query.
> 4.  We don't need to change the protocols, we mainly change the log
> retention process in the log manager.
>
>   One question is the query min offset need O(partitions * groups) time
> complexity, another alternative is to build an internal topic to save every
> partition's min offset, it can reduce to O(1).
> I will update the wiki for more details.
>
> Thanks,
> David
>
>
> > Hi Pengwei,
> >
> > Thanks for the KIP proposal. It is a very useful KIP. At a high level,
> the
> > proposed behavior looks reasonable to me.
> >
> > However, it seems that some of the details are not mentioned in the KIP.
> > For example,
> >
> > 1. How will the expected consumer group be specified? Is it through a per
> > topic dynamic configuration?
> > 2. How do the brokers detect the consumer offsets? Is it required for a
> > consumer to commit offsets?
> > 3. How do all the replicas know the about the committed offsets? e.g. 1)
> > non-coordinator brokers which do not have the committed offsets, 2)
> > follower brokers which do not have consumers directly consuming from it.
> > 4. Is there any other changes need to be made (e.g. new protocols) in
> > addition to the configuration change?
> >
> > It would be great if you can update the wiki to have more details.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) 
> wrote:
> >
> > > Hi All,
> > >I have made a KIP to enhance the log retention, details as follows:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 68+Add+a+consumed+log+retention+before+log+retention
> > >Now start a discuss thread for this KIP , looking forward to the
> > > feedback.
> > >
> > > Thanks,
> > > David
> > >
> > >
>


Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-08 Thread Pengwei (L)
Hi Becket,

  Thanks for the feedback:
1.  We use the simple consumer api to query the commit offset, so we don't need 
to specify the consumer group.
2.  Every broker using the simple consumer api(OffsetFetchKey) to query the 
commit offset in the log retention process.  The client can commit offset or 
not.
3.  It does not need to distinguish the follower brokers or leader brokers,  
every brokers can query.
4.  We don't need to change the protocols, we mainly change the log retention 
process in the log manager.

  One question is the query min offset need O(partitions * groups) time 
complexity, another alternative is to build an internal topic to save every 
partition's min offset, it can reduce to O(1).
I will update the wiki for more details.

Thanks,
David


> Hi Pengwei,
>
> Thanks for the KIP proposal. It is a very useful KIP. At a high level, the
> proposed behavior looks reasonable to me.
>
> However, it seems that some of the details are not mentioned in the KIP.
> For example,
>
> 1. How will the expected consumer group be specified? Is it through a per
> topic dynamic configuration?
> 2. How do the brokers detect the consumer offsets? Is it required for a
> consumer to commit offsets?
> 3. How do all the replicas know the about the committed offsets? e.g. 1)
> non-coordinator brokers which do not have the committed offsets, 2)
> follower brokers which do not have consumers directly consuming from it.
> 4. Is there any other changes need to be made (e.g. new protocols) in
> addition to the configuration change?
>
> It would be great if you can update the wiki to have more details.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L)  wrote:
>
> > Hi All,
> >I have made a KIP to enhance the log retention, details as follows:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 68+Add+a+consumed+log+retention+before+log+retention
> >Now start a discuss thread for this KIP , looking forward to the
> > feedback.
> >
> > Thanks,
> > David
> >
> >