[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519864#comment-16519864
 ] 

ASF GitHub Bot commented on KAFKA-4682:
---

hachikuji closed pull request #4896: KAFKA-4682: Revise expiration semantics of 
consumer group offsets (KIP-211 - Part 1)
URL: https://github.com/apache/kafka/pull/4896
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9c19af17037..b484e110aee 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -798,8 +798,7 @@ public void onComplete(Map offsets, Exception
 
 OffsetCommitRequest.Builder builder = new 
OffsetCommitRequest.Builder(this.groupId, offsetData).
 setGenerationId(generation.generationId).
-setMemberId(generation.memberId).
-setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);
+setMemberId(generation.memberId);
 
 log.trace("Sending OffsetCommit request with {} to coordinator {}", 
offsets, coordinator);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 570c4d5a66a..8a51e84e76a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -111,9 +111,15 @@
  */
 private static final Schema OFFSET_COMMIT_REQUEST_V4 = 
OFFSET_COMMIT_REQUEST_V3;
 
+private static final Schema OFFSET_COMMIT_REQUEST_V5 = new Schema(
+GROUP_ID,
+GENERATION_ID,
+MEMBER_ID,
+new Field(TOPICS_KEY_NAME, new 
ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), "Topics to commit offsets."));
+
 public static Schema[] schemaVersions() {
 return new Schema[] {OFFSET_COMMIT_REQUEST_V0, 
OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2,
-OFFSET_COMMIT_REQUEST_V3, OFFSET_COMMIT_REQUEST_V4};
+OFFSET_COMMIT_REQUEST_V3, OFFSET_COMMIT_REQUEST_V4, 
OFFSET_COMMIT_REQUEST_V5};
 }
 
 // default values for the current version
@@ -166,7 +172,6 @@ public String toString() {
 private final Map offsetData;
 private String memberId = DEFAULT_MEMBER_ID;
 private int generationId = DEFAULT_GENERATION_ID;
-private long retentionTime = DEFAULT_RETENTION_TIME;
 
 public Builder(String groupId, Map 
offsetData) {
 super(ApiKeys.OFFSET_COMMIT);
@@ -184,11 +189,6 @@ public Builder setGenerationId(int generationId) {
 return this;
 }
 
-public Builder setRetentionTime(long retentionTime) {
-this.retentionTime = retentionTime;
-return this;
-}
-
 @Override
 public OffsetCommitRequest build(short version) {
 switch (version) {
@@ -199,8 +199,8 @@ public OffsetCommitRequest build(short version) {
 case 2:
 case 3:
 case 4:
-long retentionTime = version == 1 ? DEFAULT_RETENTION_TIME 
: this.retentionTime;
-return new OffsetCommitRequest(groupId, generationId, 
memberId, retentionTime, offsetData, version);
+case 5:
+return new OffsetCommitRequest(groupId, generationId, 
memberId, DEFAULT_RETENTION_TIME, offsetData, version);
 default:
 throw new UnsupportedVersionException("Unsupported version 
" + version);
 }
@@ -213,7 +213,6 @@ public String toString() {
 append(", groupId=").append(groupId).
 append(", memberId=").append(memberId).
 append(", generationId=").append(generationId).
-append(", retentionTime=").append(retentionTime).
 append(", offsetData=").append(offsetData).
 append(")");
 return bld.toString();
@@ -316,6 +315,7 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 return new OffsetCommitResponse(responseData);
 case 3:
 case 4:
+case 5:
 return new OffsetCommitResponse(throttleTimeMs, responseData);
 default:
 throw new IllegalArgumentExceptio

[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)

2018-04-25 Thread Biju Nair (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16452781#comment-16452781
 ] 

Biju Nair commented on KAFKA-4682:
--

This change will also help Kafka stream use cases where {{enable.auto.commit}} 
is overridden to set it to {{false}}. Currently we are seeing issues with 
partitions where there are no activities for periods more than the 
{{offsets.retention.minutes}}.

> Committed offsets should not be deleted if a consumer is still active 
> (KIP-211)
> ---
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: kip
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443661#comment-16443661
 ] 

ASF GitHub Bot commented on KAFKA-4682:
---

vahidhashemian opened a new pull request #4896: KAFKA-4682: Revise expiration 
semantics of consumer group offsets (KIP-211)
URL: https://github.com/apache/kafka/pull/4896
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Committed offsets should not be deleted if a consumer is still active 
> (KIP-211)
> ---
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: kip
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)

2017-11-15 Thread John Crowley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16253655#comment-16253655
 ] 

John Crowley commented on KAFKA-4682:
-

Just found this entry - had previously commented on 
https://issues.apache.org/jira/browse/KAFKA-3806

Is it possible to allow the offsets.retention.minutes to be set per groupId (in 
a similar way that retention.ms can be set per topic)?

This would allow a fairly short default - 1 day as is current - to remove 
abandoned groupId metadata yet allow the user to indicate that a particular 
groupId should be handled differently. Example in 3806 was a PubSub using Kafka 
as a persistent, reliable store supporting multiple subscribers. Some of the 
source data has very low volatility - e.g. next year's holiday calendar for a 
company, which probably only changes once a year. A consumer must still poll in 
case an error update is posted, but will in the normal case not do a real 
commit for 12 months!

> Committed offsets should not be deleted if a consumer is still active 
> (KIP-211)
> ---
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)

2017-11-03 Thread Drew Kutcharian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238126#comment-16238126
 ] 

Drew Kutcharian commented on KAFKA-4682:


This just happened to us and I just stumbled upon this JIRA while trying to 
figure out the cause. A few questions:

1. Aren't consumer offset topics compacted? Shouldn't at least the last entry 
stay on disk after cleanup?

2.  Considering that they are compacted, what is the real concern with 
workaround 2 in the description: "2. Turn the value of 
offsets.retention.minutes up really really high"?

3. As a workaround, would it make sense to set {{offsets.retention.ms}} to the 
same value as {{logs.retention.ms}} and {{auto.offset.reset}} to {{earliest}}? 
That way consumers and logs would "reset" the same time?

4. Is there a timeline for the release of KIP-211?


> Committed offsets should not be deleted if a consumer is still active 
> (KIP-211)
> ---
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: kip
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active

2017-10-18 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210323#comment-16210323
 ] 

Vahid Hashemian commented on KAFKA-4682:


I just started a KIP discussion for this JIRA. The KIP can be found 
[here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets].

> Committed offsets should not be deleted if a consumer is still active
> -
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active

2017-10-12 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202763#comment-16202763
 ] 

Vahid Hashemian commented on KAFKA-4682:


[~hachikuji] I have started drafting a KIP for the changes discussed here. 
Could you please clarify what you mean by
{quote}... we could probably also remove the commit timestamp and use the 
timestamp from the message itself. ...{quote}
I see that the commit timestamp is set to the time the request is processed 
(which supposedly is when the offset is committed). So I'm not clear what you 
mean by "timestamp from the message itself".
Thanks.

> Committed offsets should not be deleted if a consumer is still active
> -
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active

2017-08-11 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16123742#comment-16123742
 ] 

Vahid Hashemian commented on KAFKA-4682:


[~hachikuji] Thank you for your comments. You seem to be looking at this with 
an inclination to get rid of the retention time from the OffsetCommit protocol. 
I think with my comments below I'm considering the alternative:

# Ewen's KIP proposes to increase the default retention from 1 day to 7 days. 
So, allowing consumers to set a lower timeout (for the console consumer) seems 
to be helpful after his KIP; the same way allowing them to set a higher timeout 
(for actual consumer applications) is helpful before his KIP.

# Even if we have offset-level expiration, all offsets in the group should 
expire together, because the expiration timer starts ticking for all partitions 
at the same time (when the group becomes empty). The only exception is when a 
consumer has set a non-default retention time for particular partitions (e.g. 
using the OffsetCommit API).

# Agreed. The expiration timestamp won't make sense. Perhaps the retention time 
should be stored and whether to expire or not could be calculated on the fly 
from the time group becomes empty + retention time (we would need to somehow 
keep the timestamp of the group becoming empty). This expiration check needs to 
be performed only if the group is empty; otherwise there is no need to expire 
at all.

# I don't have a strong feeling about this. It's for sure simpler to let all 
offsets expire at the same time. And if we keep the individual offset retention 
it would be easier to change this in case the cache size becomes an issue.

I think there is a risk involved in removing the individual retention from the 
protocol: could some requirement arise in the future that makes us bring it 
back to the protocol? One option is to let that field stay for now, and remove 
it later once we are more certain that it won't be needed back.

> Committed offsets should not be deleted if a consumer is still active
> -
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active

2017-08-10 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16122580#comment-16122580
 ] 

Jason Gustafson commented on KAFKA-4682:


Sorry for the late response. There seem to be a few open questions:

1. Does the consumer need the ability to control the retention timeout or is a 
broker config sufficient? I am not too sure about this. There is at least one 
use case (ConsoleConsumer) where we might intentionally set a low value, but 
I'm not sure how bad it would be to let it stick with the default. It certainly 
would have been helpful prior to Ewen's KIP.

2. Do we still need offset-level expiration or should we move it to the group? 
Personally, it feels a little odd to expire offsets at different times once a 
group is empty. It's a little more intuitive to expire them all at once. 
Another way to view this would be that we deprecate the offset retention 
setting and add a group metadata retention setting. Once the group has gone 
empty, we start its retention timer. If it expires, we clear all of its state 
including offsets. 

3. Do we need to change the format of the offset metadata messages? Currently 
the offset metadata that is stored in the log includes an expiration timestamp. 
This won't make much sense any more because we won't know what timestamp to use 
when the offset is first stored. While we're at it, we could probably also 
remove the commit timestamp and use the timestamp from the message itself. This 
also depends on the answer to the first question.

4. Should we start the expiration timer for an individual offset if the group 
is no longer subscribed to the corresponding topic? My inclination is to keep 
it simple and say no, but I guess there is a risk that this tends to grow the 
cache more than existing behavior. If we're concerned about this, then we 
probably need to keep the individual offset expiration timer. Unfortunately 
because of the generic group protocol (which is also used in Connect), we don't 
currently have the ability to inspect subscriptions to know if a topic is still 
subscribed.



> Committed offsets should not be deleted if a consumer is still active
> -
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active

2017-08-10 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16122498#comment-16122498
 ] 

Vahid Hashemian commented on KAFKA-4682:


[~wushujames] Thanks for your feedback. Regarding the other details you brought 
up:

. [~hachikuji]'s suggestion on 
[KIP-186|https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days]
 makes sense to me. The {{OffsetCommit}} API can be used to override the 
default broker level property {{offset.retention.minutes}} for specific 
group/topic/partitions. This means we probably wouldn't need to have a 
group-level retention config. What a potential KIP for this JIRA would be 
adding is that the retention timer kicks off at the moment the group becomes 
empty, and while the group is stable no offset will be removed (as retention 
timer is not ticking yet).
. Regarding your second point, I guess we could pick either method. It all 
would depend on the criteria for triggering the retention timer for a 
partition. If we trigger it when the group is empty (as in the previous bullet) 
then we would be expiring the offset for {{B-0}} with all other group 
partitions. If, on the other hand, we decide to trigger the timer when the 
partition stops being consumed within the group, then {{B-0}}'s offset could 
expire while the group is still active. I'm not sure how common this scenario 
is in real applications. If it's not that common perhaps it wouldn't cost a lot 
to keep {{B-0}}'s offsets around with the rest of the group. In any case, we 
should be able to pick one approach or the other depending on what you and 
others believe is more reasonable.

What do you think? [~hachikuji], what are your thoughts on this?

> Committed offsets should not be deleted if a consumer is still active
> -
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active

2017-08-09 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120621#comment-16120621
 ] 

James Cheng commented on KAFKA-4682:


Yes, that's a reasonable summary.

Other details to consider:
* [~hachikuji] asked whether we have a single broker-level config for the 
expiration time that applies to all groups, or if we need to allow individual 
consumer groups to specify their own expiration time. In the discussion for 
[KIP-186|https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days],
 [~hachikuji] raised the same question. I'm not sure.
* Offsets are normally saved per (partition, groupId). Do we want to allow 
offsets to be expired for individual partitions separately from the group? As 
an example, say I have a groupId="foo" that commits for (Topic A, partition 0) 
and (Topic B, partition 0). And then groupId stops subscribing to Topic B, and 
only subscribes to (Topic A, partition 0). Should the offset for (Topic B, 
partition 0) stay around as long as the group is active? Or, should it be 
expired, since it is not part of the group anymore?

> Committed offsets should not be deleted if a consumer is still active
> -
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active

2017-08-03 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113290#comment-16113290
 ] 

Vahid Hashemian commented on KAFKA-4682:


[~hachikuji] et al.
I'm interested in taking this on. Just to make sure I understand the proper 
approach (KIP) before starting to write the draft:

We don't want to expire group offsets while the group is active, so we would 
want to phase out individual offset expirations inside the group (that would 
mean removing the unused retention time field from the OffsetCommit protocol). 
On the other hand, we seem to now need some sort of an expiration time per 
consumer group so we can remove offsets within the group if that expiration 
time is passed and the group is no longer active. This expiration time is set 
and takes effect only when the group becomes empty.

Is this a reasonable summary of what needs to happen?

> Committed offsets should not be deleted if a consumer is still active
> -
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)