[jira] [Comment Edited] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group

2018-06-25 Thread Narayan Periwal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523095#comment-16523095
 ] 

Narayan Periwal edited comment on KAFKA-6681 at 6/26/18 4:02 AM:
-

[~steven.aerts], We are using RangeAssignor(which is the default), and not the 
Sticky Assignor which KAFKA-7026 mentions of.

Some observation is that there is spike in the number of UnderReplicated 
partition, after which multiple consumer instances start consuming the same 
topic partition

Our Kafka brokers and consumer both are in version 0.10.2.1


was (Author: nperiwal):
[~steven.aerts], We are using RangeAssignor(which is the default), and not the 
Sticky Assignor which KAFKA-7026 mentions of.

> Two instances of kafka consumer reading the same partition within a consumer 
> group
> --
>
> Key: KAFKA-6681
> URL: https://issues.apache.org/jira/browse/KAFKA-6681
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.10.2.1
>Reporter: Narayan Periwal
>Priority: Critical
> Attachments: server-1.log, server-2.log
>
>
> We have seen this issue with the Kafka consumer, the new library that got 
> introduced in 0.9
> With this new client, the group management is done by kafka coordinator, 
> which is one of the kafka broker.
> We are using Kafka broker 0.10.2.1 and consumer client version is also 
> 0.10.2.1 
> The issue that we have faced is that, after rebalancing, some of the 
> partitions gets consumed by 2 instances within a consumer group, leading to 
> duplication of the entire partition data. Both the instances continue to read 
> until the next rebalancing, or the restart of those clients. 
> It looks like that a particular consumer goes on fetching the data from a 
> partition, but the broker is not able to identify this "stale" consumer 
> instance. 
> We have hit this twice in production. Please look at it the earliest. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-25 Thread Narayan Periwal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523146#comment-16523146
 ] 

Narayan Periwal edited comment on KAFKA-7026 at 6/26/18 4:01 AM:
-

[~vahid], Can this issue be there with RangeAssignor as well, because we have 
seen this issue occuring multiple time in our Kafka consumer (0.10.2.1) with 
RangeAssignor. Jira - KAFKA-6681.

Some observation is that there is spike in the number of UnderReplicated 
partition in our Kafka cluster, after which multiple consumer instances start 
consuming the same topic partition.

Kafka broker is also at version 0.10.2.1


was (Author: nperiwal):
[~vahid], Can this issue be there with RangeAssignor as well, because we have 
seen this issue occuring multiple time in our Kafka consumer (0.10.2.1) with 
RangeAssignor. Jira - KAFKA-6681.

Some observation is that there is spike in the number of UnderReplicated 
partition, after which multiple consumer instances start consuming the same 
topic partition

> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-25 Thread Narayan Periwal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523146#comment-16523146
 ] 

Narayan Periwal edited comment on KAFKA-7026 at 6/26/18 3:59 AM:
-

[~vahid], Can this issue be there with RangeAssignor as well, because we have 
seen this issue occuring multiple time in our Kafka consumer (0.10.2.1) with 
RangeAssignor. Jira - KAFKA-6681.

Some observation is that there is spike in the number of UnderReplicated 
partition, after which multiple consumer instances start consuming the same 
topic partition


was (Author: nperiwal):
[~vahid], Can this issue be there with RangeAssignor as well, because we have 
seen this issue occuring multiple time in our Kafka consumer (0.10.2.1) with 
RangeAssignor. Jira - KAFKA-6681

> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-25 Thread Narayan Periwal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523146#comment-16523146
 ] 

Narayan Periwal commented on KAFKA-7026:


[~vahid], Can this issue be there with RangeAssignor as well, because we have 
seen this issue occuring multiple time in our Kafka consumer (0.10.2.1) with 
RangeAssignor. Jira - KAFKA-6681

> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group

2018-06-25 Thread Narayan Periwal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523095#comment-16523095
 ] 

Narayan Periwal commented on KAFKA-6681:


[~steven.aerts], We are using RangeAssignor(which is the default), and not the 
Sticky Assignor which KAFKA-7026 mentions of.

> Two instances of kafka consumer reading the same partition within a consumer 
> group
> --
>
> Key: KAFKA-6681
> URL: https://issues.apache.org/jira/browse/KAFKA-6681
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.10.2.1
>Reporter: Narayan Periwal
>Priority: Critical
> Attachments: server-1.log, server-2.log
>
>
> We have seen this issue with the Kafka consumer, the new library that got 
> introduced in 0.9
> With this new client, the group management is done by kafka coordinator, 
> which is one of the kafka broker.
> We are using Kafka broker 0.10.2.1 and consumer client version is also 
> 0.10.2.1 
> The issue that we have faced is that, after rebalancing, some of the 
> partitions gets consumed by 2 instances within a consumer group, leading to 
> duplication of the entire partition data. Both the instances continue to read 
> until the next rebalancing, or the restart of those clients. 
> It looks like that a particular consumer goes on fetching the data from a 
> partition, but the broker is not able to identify this "stale" consumer 
> instance. 
> We have hit this twice in production. Please look at it the earliest. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7097) VerifiableProducer does not work properly with --message-create-time argument

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523074#comment-16523074
 ] 

ASF GitHub Bot commented on KAFKA-7097:
---

tedyu opened a new pull request #5292: KAFKA-7097 VerifiableProducer does not 
work properly with --message-create-time argument
URL: https://github.com/apache/kafka/pull/5292
 
 
   Currently create time is interpreted as integer.
   
   This PR makes the tool accept long values.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> VerifiableProducer does not work properly with --message-create-time argument
> -
>
> Key: KAFKA-7097
> URL: https://issues.apache.org/jira/browse/KAFKA-7097
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Jasper Knulst
>Priority: Major
>
> If you run:
>  
> ./bin/kafka-verifiable-producer.sh --broker-list  --topic 
> test_topic_increasing_p2 --message-create-time  --acks -1 
> --max-messages 100
> the "" for --message-create-time doesn't take a 13 digit long 
> like 1529656934000. 
> The error message:
> verifiable-producer: error: argument --message-create-time: could not convert 
> '1529656934000' to Integer (For input string: "1529656934000")
>  
> When you provide a 10 digit (1529656934) epoch for the argument it does work 
> but this leads to your topic being cleaned up in a few minutes since the 
> retention time is expired.
>  
> The error seems to be obvious since VerifiableProducer.java has:
>         Long createTime = (long) res.getInt("createTime");
> when parsing the argument. This should be taken as a Long instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523060#comment-16523060
 ] 

ASF GitHub Bot commented on KAFKA-7026:
---

vahidhashemian opened a new pull request #5291: KAFKA-7026: Sticky Assignor 
Partition Assignment Improvement
URL: https://github.com/apache/kafka/pull/5291
 
 
   In the current implementation of sticky assignor the leader does not cache 
the most recent calculated assignment. It relies on the fact that each consumer 
in the group sends its subscribed topics and also its current assignment when a 
rebalance occurs. This could lead to the issue described in 
[KAFKA-7026](https://issues.apache.org/jira/browse/KAFKA-7026), in which 
current assignment of a consumer is no longer valid and should be ignored. The 
solution implemented in this PR involves the leader caching the most recent 
assignment of each consumer, so the assignment reported by a consumer can be 
properly ignored if necessary.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7098) Improve accuracy of the log cleaner throttle rate

2018-06-25 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7098:
---

 Summary: Improve accuracy of the log cleaner throttle rate
 Key: KAFKA-7098
 URL: https://issues.apache.org/jira/browse/KAFKA-7098
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin


LogCleaner uses the Throttler class to throttler the log cleaning rate to the 
user-specified limit, i.e. log.cleaner.io.max.bytes.per.second. However, in 
Throttler.maybeThrottle(), the periodStartNs is set to the time before the 
sleep after the sleep() is called, which artificially increase the actual 
window size and under-estimate the actual log cleaning rate. This causes the 
log cleaning IO to be higher than the user-specified limit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-25 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522967#comment-16522967
 ] 

Vahid Hashemian commented on KAFKA-7026:


Thanks! Yes, this is a valid bug. Working on a PR and should submit one soon.

> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7089) Extend `kafka-consumer-groups.sh` to show "beginning offsets"

2018-06-25 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522963#comment-16522963
 ] 

Vahid Hashemian commented on KAFKA-7089:


Thanks [~hachikuji]. I'll create a short KIP.

> Extend `kafka-consumer-groups.sh` to show "beginning offsets"
> -
>
> Key: KAFKA-7089
> URL: https://issues.apache.org/jira/browse/KAFKA-7089
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Matthias J. Sax
>Assignee: Vahid Hashemian
>Priority: Minor
>
> Currently, `kafka-consumer-groups.sh` only shows "current offset", "end 
> offset" and "lag". It would be helpful to extend the tool to also show 
> "beginning/earliest offset".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6098) Delete and Re-create topic operation could result in race condition

2018-06-25 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522943#comment-16522943
 ] 

Dhruvil Shah commented on KAFKA-6098:
-

I don't think we can provide any formal guarantees for these APIs if topics are 
being created and deleted concurrently. From the discussion so far, it looks 
like we want to be able to define the semantics for what happens in a single 
threaded application trying to delete, list, create topics, is this correct?

One way to fix this problem could be to have deleteTopics return success only 
after the topic has been completely deleted (i.e. the topic znode has been 
deleted). listTopics could continue returning the topic information for this 
duration. Would this address the issue?

> Delete and Re-create topic operation could result in race condition
> ---
>
> Key: KAFKA-6098
> URL: https://issues.apache.org/jira/browse/KAFKA-6098
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Dhruvil Shah
>Priority: Major
>  Labels: reliability
>
> Here is the following process to re-produce this issue:
> 1. Delete a topic using the delete topic request.
> 2. Confirm the topic is deleted using the list topics request.
> 3. Create the topic using the create topic request.
> In step 3) a race condition can happen that the response returns a 
> {{TOPIC_ALREADY_EXISTS}} error code, indicating the topic has already existed.
> The root cause of the above issue is in the {{TopicDeletionManager}} class:
> {code}
> controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq,
>  OfflinePartition)
> controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq,
>  NonExistentPartition)
> topicsToBeDeleted -= topic
> partitionsToBeDeleted.retain(_.topic != topic)
> kafkaControllerZkUtils.deleteTopicZNode(topic)
> kafkaControllerZkUtils.deleteTopicConfigs(Seq(topic))
> kafkaControllerZkUtils.deleteTopicDeletions(Seq(topic))
> controllerContext.removeTopic(topic)
> {code}
> I.e. it first update the broker's metadata cache through the ISR and metadata 
> update request, then delete the topic zk path, and then delete the 
> topic-deletion zk path. However, upon handling the create topic request, the 
> broker will simply try to write to the topic zk path directly. Hence there is 
> a race condition that between brokers update their metadata cache (hence list 
> topic request not returning this topic anymore) and zk path for the topic be 
> deleted (hence the create topic succeed).
> The reason this problem could be exposed, is through current handling logic 
> of the create topic response, most of which takes {{TOPIC_ALREADY_EXISTS}} as 
> "OK" and moves on, and the zk path will be deleted later, hence leaving the 
> topic to be not created at all:
> https://github.com/apache/kafka/blob/249e398bf84cdd475af6529e163e78486b43c570/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java#L221
> https://github.com/apache/kafka/blob/1a653c813c842c0b67f26fb119d7727e272cf834/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L232
> Looking at the code history, it seems this race condition always exist, but 
> testing on trunk / 1.0 with the above steps it is more likely to happen than 
> before. I wonder if the ZK async calls have an effect here. cc [~junrao] 
> [~onurkaraman]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6591) Move check for super user in SimpleAclProvider before ACL evaluation

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522905#comment-16522905
 ] 

ASF GitHub Bot commented on KAFKA-6591:
---

hachikuji closed pull request #4618: KAFKA-6591: Move super user check before 
ACL matching 
URL: https://github.com/apache/kafka/pull/4618
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 504d71ad63c..1dc3a1fb412 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -112,38 +112,46 @@ class SimpleAclAuthorizer extends Authorizer with Logging 
{
 
 val principal = session.principal
 val host = session.clientAddress.getHostAddress
-val acls = getMatchingAcls(resource.resourceType, resource.name)
-
-// Check if there is any Deny acl match that would disallow this operation.
-val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls)
-
-// Check if there are any Allow ACLs which would allow this operation.
-// Allowing read, write, delete, or alter implies allowing describe.
-// See #{org.apache.kafka.common.acl.AclOperation} for more details about 
ACL inheritance.
-val allowOps = operation match {
-  case Describe => Set[Operation](Describe, Read, Write, Delete, Alter)
-  case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs)
-  case _ => Set[Operation](operation)
+
+def isEmptyAclAndAuthorized(acls: Set[Acl]): Boolean = {
+  if (acls.isEmpty) {
+// No ACLs found for this resource, permission is determined by value 
of config allow.everyone.if.no.acl.found
+authorizerLogger.debug(s"No acl found for resource $resource, 
authorized = $shouldAllowEveryoneIfNoAclIsFound")
+shouldAllowEveryoneIfNoAclIsFound
+  } else false
+}
+
+def denyAclExists(acls: Set[Acl]): Boolean = {
+  // Check if there are any Deny ACLs which would forbid this operation.
+  aclMatch(operation, resource, principal, host, Deny, acls)
 }
-val allowMatch = allowOps.exists(operation => aclMatch(operation, 
resource, principal, host, Allow, acls))
 
-//we allow an operation if a user is a super user or if no acls are found 
and user has configured to allow all users
-//when no acls are found or if no deny acls are found and at least one 
allow acls matches.
-val authorized = isSuperUser(operation, resource, principal, host) ||
-  isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
-  (!denyMatch && allowMatch)
+def allowAclExists(acls: Set[Acl]): Boolean = {
+  // Check if there are any Allow ACLs which would allow this operation.
+  // Allowing read, write, delete, or alter implies allowing describe.
+  // See #{org.apache.kafka.common.acl.AclOperation} for more details 
about ACL inheritance.
+  val allowOps = operation match {
+case Describe => Set[Operation](Describe, Read, Write, Delete, Alter)
+case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs)
+case _ => Set[Operation](operation)
+  }
+  allowOps.exists(operation => aclMatch(operation, resource, principal, 
host, Allow, acls))
+}
+
+def aclsAllowAccess = {
+  //we allow an operation if no acls are found and user has configured to 
allow all users
+  //when no acls are found or if no deny acls are found and at least one 
allow acls matches.
+  val acls = getMatchingAcls(resource.resourceType, resource.name)
+  isEmptyAclAndAuthorized(acls) || (!denyAclExists(acls) && 
allowAclExists(acls))
+}
+
+// Evaluate if operation is allowed
+val authorized = isSuperUser(operation, resource, principal, host) || 
aclsAllowAccess
 
 logAuditMessage(principal, authorized, operation, resource, host)
 authorized
   }
 
-  def isEmptyAclAndAuthorized(operation: Operation, resource: Resource, 
principal: KafkaPrincipal, host: String, acls: Set[Acl]): Boolean = {
-if (acls.isEmpty) {
-  authorizerLogger.debug(s"No acl found for resource $resource, authorized 
= $shouldAllowEveryoneIfNoAclIsFound")
-  shouldAllowEveryoneIfNoAclIsFound
-} else false
-  }
-
   def isSuperUser(operation: Operation, resource: Resource, principal: 
KafkaPrincipal, host: String): Boolean = {
 if (superUsers.contains(principal)) {
   authorizerLogger.debug(s"principal = $principal is a super user, 
allowing operation without checking acls.")


 


[jira] [Closed] (KAFKA-6978) Make Streams Window retention time strict

2018-06-25 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler closed KAFKA-6978.
---

> Make Streams Window retention time strict
> -
>
> Key: KAFKA-6978
> URL: https://issues.apache.org/jira/browse/KAFKA-6978
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently, the configured retention time for windows is a lower bound. We 
> actually keep the window around until it's time to roll a new segment. At 
> that time, we drop all windows in the oldest segment.
> As long as a window is still in a segment, we will continue to add 
> late-arriving records to it and also serve IQ queries from it. This is sort 
> of nice, because it makes optimistic use of the fact that the windows live 
> for some time after their retention expires. However, it is also a source of 
> (apparent) non-determinism, and it's arguably better for programability if we 
> adhere strictly to the configured constraints.
> Therefore, the new behavior will be:
>  * once the retention time for a window passes, Streams will drop any 
> later-arriving records (with a warning log and a metric)
>  * likewise, IQ will first check whether the window is younger than its 
> retention time before answering queries.
> No changes need to be made to the underlying segment management, this is 
> purely to make the behavior more strict wrt the configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-25 Thread rajadayalan perumalsamy (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522861#comment-16522861
 ] 

rajadayalan perumalsamy commented on KAFKA-7012:


Tried 1.1.2 version in our kafka cluster, performance looks good. 
 
Thank you all again!!!

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: regression
> Fix For: 2.0.0, 1.0.2, 1.1.1
>
> Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6978) Make Streams Window retention time strict

2018-06-25 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-6978.
-
   Resolution: Fixed
Fix Version/s: 2.1.0

This feature was merged in 
https://github.com/apache/kafka/commit/954be11bf2d3dc9fa11a69830d2ef5ff580ff533

> Make Streams Window retention time strict
> -
>
> Key: KAFKA-6978
> URL: https://issues.apache.org/jira/browse/KAFKA-6978
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently, the configured retention time for windows is a lower bound. We 
> actually keep the window around until it's time to roll a new segment. At 
> that time, we drop all windows in the oldest segment.
> As long as a window is still in a segment, we will continue to add 
> late-arriving records to it and also serve IQ queries from it. This is sort 
> of nice, because it makes optimistic use of the fact that the windows live 
> for some time after their retention expires. However, it is also a source of 
> (apparent) non-determinism, and it's arguably better for programability if we 
> adhere strictly to the configured constraints.
> Therefore, the new behavior will be:
>  * once the retention time for a window passes, Streams will drop any 
> later-arriving records (with a warning log and a metric)
>  * likewise, IQ will first check whether the window is younger than its 
> retention time before answering queries.
> No changes need to be made to the underlying segment management, this is 
> purely to make the behavior more strict wrt the configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7096) Consumer should drop the data for unassigned topic partitions

2018-06-25 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522768#comment-16522768
 ] 

Mayuresh Gharat commented on KAFKA-7096:


[~lindong] : [https://github.com/apache/kafka/pull/5289]

 

> Consumer should drop the data for unassigned topic partitions
> -
>
> Key: KAFKA-7096
> URL: https://issues.apache.org/jira/browse/KAFKA-7096
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> currently if a client has assigned topics : T1, T2, T3 and calls poll(), the 
> poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the 
> client unassigns some topics (for example T3) and calls poll() we still hold 
> the data (for T3) in the completedFetches queue until we actually reach the 
> buffered data for the unassigned Topics (T3 in our example) on subsequent 
> poll() calls, at which point we drop that data. This process of holding the 
> data is unnecessary.
> When a client creates a topic, it takes time for the broker to fetch ACLs for 
> the topic. But during this time, the client will issue fetchRequest for the 
> topic, it will get response for the partitions of this topic. The response 
> consist of TopicAuthorizationException for each of the partitions. This 
> response for each partition is wrapped with a completedFetch and added to the 
> completedFetches queue. Now when the client calls the next poll() it sees the 
> TopicAuthorizationException from the first buffered CompletedFetch. At this 
> point the client chooses to sleep for 1.5 min as a backoff (as per the 
> design), hoping that the Broker fetches the ACL from ACL store in the 
> meantime. Actually the Broker has already fetched the ACL by this time. When 
> the client calls poll() after the sleep, it again sees the 
> TopicAuthorizationException from the second completedFetch and it sleeps 
> again. So it takes (1.5 * 60 * partitions) seconds before the client can see 
> any data. With this patch, the client when it sees the first 
> TopicAuthorizationException, it can all assign(EmptySet), which will get rid 
> of the buffered completedFetches (those with TopicAuthorizationException) and 
> it can again call assign(TopicPartitions) before calling poll(). With this 
> patch we found that client was able to get the records as soon as the Broker 
> fetched the ACLs from ACL store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7097) VerifiableProducer does not work properly with --message-create-time argument

2018-06-25 Thread Jasper Knulst (JIRA)
Jasper Knulst created KAFKA-7097:


 Summary: VerifiableProducer does not work properly with 
--message-create-time argument
 Key: KAFKA-7097
 URL: https://issues.apache.org/jira/browse/KAFKA-7097
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.0.0
Reporter: Jasper Knulst


If you run:

 

./bin/kafka-verifiable-producer.sh --broker-list  --topic 
test_topic_increasing_p2 --message-create-time  --acks -1 
--max-messages 100

the "" for --message-create-time doesn't take a 13 digit long 
like 1529656934000. 

The error message:

verifiable-producer: error: argument --message-create-time: could not convert 
'1529656934000' to Integer (For input string: "1529656934000")

 

When you provide a 10 digit (1529656934) epoch for the argument it does work 
but this leads to your topic being cleaned up in a few minutes since the 
retention time is expired.

 

The error seems to be obvious since VerifiableProducer.java has:

        Long createTime = (long) res.getInt("createTime");

when parsing the argument. This should be taken as a Long instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2018-06-25 Thread Collin Scangarella (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518766#comment-16518766
 ] 

Collin Scangarella edited comment on KAFKA-6817 at 6/25/18 6:21 PM:


Thank you Matthias. In case anyone arrives with the same issue - we had to 
implement ValueTransformer and use it with the transformValues step as follows: 
`streamsBuilder.stream(...).transformValues(...).filter((k, v) -> v != 
null)...`. The reason why the transform is after the source (instead of before 
the sync) is because our state stores where also throwing the same 
UnknownProducerIdException. Additionally, we had to use transformValues instead 
of transform because the state store was unable to identify the correct serdes 
for the message. Finally, we had to filter out the null values as 
transformValues sends messages even if they are null.


was (Author: col...@scangarella.com):
Thank you Matthias. In case anyone arrives with the same issue - we had to 
implement ValueTransformer and us it with the transformValues step as follows 
`streamsBuilder.stream(...).transformValues(...).filter((k, v) -> v != 
null)...`. The reason why we had to put the transform after the source (instead 
of before the sync) is because our state stores where also throwing the same 
UnknownProducerIdException. Additionally, we had to use transformValues instead 
of transform because the state store was unable to identify the correct serdes 
for the message. Finally, we had to filter out the null values as 
transformValues sends messages even if they are null.

> UnknownProducerIdException when writing messages with old timestamps
> 
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Odin Standal
>Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 22 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error 

[jira] [Created] (KAFKA-7096) Consumer should drop the data for unassigned topic partitions

2018-06-25 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-7096:
--

 Summary: Consumer should drop the data for unassigned topic 
partitions
 Key: KAFKA-7096
 URL: https://issues.apache.org/jira/browse/KAFKA-7096
 Project: Kafka
  Issue Type: Improvement
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat


currently if a client has assigned topics : T1, T2, T3 and calls poll(), the 
poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the 
client unassigns some topics (for example T3) and calls poll() we still hold 
the data (for T3) in the completedFetches queue until we actually reach the 
buffered data for the unassigned Topics (T3 in our example) on subsequent 
poll() calls, at which point we drop that data. This process of holding the 
data is unnecessary.

When a client creates a topic, it takes time for the broker to fetch ACLs for 
the topic. But during this time, the client will issue fetchRequest for the 
topic, it will get response for the partitions of this topic. The response 
consist of TopicAuthorizationException for each of the partitions. This 
response for each partition is wrapped with a completedFetch and added to the 
completedFetches queue. Now when the client calls the next poll() it sees the 
TopicAuthorizationException from the first buffered CompletedFetch. At this 
point the client chooses to sleep for 1.5 min as a backoff (as per the design), 
hoping that the Broker fetches the ACL from ACL store in the meantime. Actually 
the Broker has already fetched the ACL by this time. When the client calls 
poll() after the sleep, it again sees the TopicAuthorizationException from the 
second completedFetch and it sleeps again. So it takes (1.5 * 60 * partitions) 
seconds before the client can see any data. With this patch, the client when it 
sees the first TopicAuthorizationException, it can all assign(EmptySet), which 
will get rid of the buffered completedFetches (those with 
TopicAuthorizationException) and it can again call assign(TopicPartitions) 
before calling poll(). With this patch we found that client was able to get the 
records as soon as the Broker fetched the ACLs from ACL store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7089) Extend `kafka-consumer-groups.sh` to show "beginning offsets"

2018-06-25 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522556#comment-16522556
 ] 

Jason Gustafson commented on KAFKA-7089:


Yeah, I think we should have a short KIP. Seems noncontroversial though, so 
probably won't need much discussion.

> Extend `kafka-consumer-groups.sh` to show "beginning offsets"
> -
>
> Key: KAFKA-7089
> URL: https://issues.apache.org/jira/browse/KAFKA-7089
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Matthias J. Sax
>Assignee: Vahid Hashemian
>Priority: Minor
>
> Currently, `kafka-consumer-groups.sh` only shows "current offset", "end 
> offset" and "lag". It would be helpful to extend the tool to also show 
> "beginning/earliest offset".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7092) Multiple topics (including __consumer_offsets) have not been cleaned for a few months

2018-06-25 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522545#comment-16522545
 ] 

Jason Gustafson commented on KAFKA-7092:


Yes, you are probably hitting one of those bugs. The most likely one is 
KAFKA-5413 which is fixed in all versions greater than 0.11. The fix for 
KAFKA-6264 will be in 2.0.0, which is in the process of being released. We will 
also be releasing 0.10.2.2 shortly, which will contain the fix for KAFKA-5413.

> Multiple topics (including __consumer_offsets) have not been cleaned for a 
> few months
> -
>
> Key: KAFKA-7092
> URL: https://issues.apache.org/jira/browse/KAFKA-7092
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1
> Environment: linux
> kafka 0.10.2.1
>Reporter: Noam Berman
>Priority: Critical
>
> Hi,
> We stumbled upon a scenario - our 8 node cluster (0.10.2.1) hasn't been 
> cleaning logs for many topics since 26/3/2018 (during which the brokers were 
> restarted). This includes __consumer_offsets, which has since grown to 
> enormouse proportions.
> The cluster is an upgraded cluster from 0.10.1.0.
> I believe this is a reproduction of either 
> https://issues.apache.org/jira/browse/KAFKA-5413 or 
> https://issues.apache.org/jira/browse/KAFKA-6264, but i haven't found any 
> workaround for this.
> output of one of __consumer_offsets partition data folders:
> {noformat}
> user@kafka-broker0a:/var/lib/kafka/data/__consumer_offsets-100$ ls -ltr
> total 132832
> -rw-r--r-- 1 kafka kafka 0 Mar 26 07:10 .log
> -rw-r--r-- 1 kafka kafka 320648 Mar 26 07:11 004729570626.log
> -rw-r--r-- 1 kafka kafka 2697871 Mar 26 07:11 006877054274.log
> -rw-r--r-- 1 kafka kafka 3792861 Mar 26 07:11 006877066658.log
> -rw-r--r-- 1 kafka kafka 931064 Mar 26 07:11 006877084831.log
> -rw-r--r-- 1 kafka kafka 118282 Mar 26 07:11 006877089209.log
> -rw-r--r-- 1 kafka kafka 1807647 Mar 26 07:11 006877089458.log
> -rw-r--r-- 1 kafka kafka 239104 Mar 26 07:11 006877096136.log
> -rw-r--r-- 1 kafka kafka 1835988 Mar 26 07:11 006877097398.log
> -rw-r--r-- 1 kafka kafka 575040 Mar 26 07:11 006877104257.log
> -rw-r--r-- 1 kafka kafka 390255 Mar 26 07:11 006877106774.log
> -rw-r--r-- 1 kafka kafka 3105807 Mar 26 07:11 006877108341.log
> -rw-r--r-- 1 kafka kafka 2086353 Mar 26 07:11 006877120354.log
> -rw-r--r-- 1 kafka kafka 2434773 Mar 26 07:12 006877128121.log
> -rw-r--r-- 1 kafka kafka 1432500 Mar 26 07:12 006877137370.log
> -rw-r--r-- 1 kafka kafka 1901742 Mar 26 07:12 006877142384.log
> -rw-r--r-- 1 kafka kafka 2310319 Mar 26 07:12 006877149921.log
> -rw-r--r-- 1 kafka kafka 2084367 Mar 26 07:12 006877157735.log
> -rw-r--r-- 1 kafka kafka 23317 Mar 26 07:12 006877165836.log
> -rw-r--r-- 1 kafka kafka 3715029 Mar 26 07:12 006877165946.log
> -rw-r--r-- 1 kafka kafka 2217586 Mar 26 07:12 006877179092.log
> -rw-r--r-- 1 kafka kafka 1133491 Mar 26 07:12 006877187739.log
> -rw-r--r-- 1 kafka kafka 1351694 Mar 26 07:12 006877191615.log
> -rw-r--r-- 1 kafka kafka 1397573 Mar 26 19:11 006877195811.log
> -rw-r--r-- 1 kafka kafka 1439494 Mar 28 00:37 006877201824.log
> -rw-r--r-- 1 kafka kafka 1679347 Mar 29 07:10 006877207157.log
> -rw-r--r-- 1 kafka kafka 0 Mar 29 09:35 .timeindex
> -rw-r--r-- 1 kafka kafka 0 Mar 29 09:35 .index
> -rw-r--r-- 1 kafka kafka 12 Mar 29 09:35 006877089458.timeindex
> -rw-r--r-- 1 kafka kafka 8 Mar 29 09:35 006877089458.index
> -rw-r--r-- 1 kafka kafka 12 Mar 29 09:35 006877089209.timeindex
> -rw-r--r-- 1 kafka kafka 0 Mar 29 09:35 006877089209.index
> -rw-r--r-- 1 kafka kafka 12 Mar 29 09:35 006877084831.timeindex
> -rw-r--r-- 1 kafka kafka 0 Mar 29 09:35 006877084831.index
> -rw-r--r-- 1 kafka kafka 36 Mar 29 09:35 006877066658.timeindex
> -rw-r--r-- 1 kafka kafka 24 Mar 29 09:35 006877066658.index
> . 
> .
> -rw-r--r-- 1 kafka kafka 10485760 Jun 24 14:52 006877558794.index
> -rw-r--r-- 1 kafka kafka 1235857 Jun 24 14:52 
> 006877558794.log{noformat}
> As you can see, the oldest log file has a 0..0 file name. 
> Is there any version that we can upgrade to that will solve this issue for 
> us? 
>  
> Thanks
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6986) Export Admin Client metrics through Stream Threads

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522514#comment-16522514
 ] 

ASF GitHub Bot commented on KAFKA-6986:
---

guozhangwang closed pull request #5210: KAFKA-6986: Export Admin Client metrics 
through Stream Threads
URL: https://github.com/apache/kafka/pull/5210
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 0171b617e7c..75c93b63228 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
@@ -768,4 +770,11 @@ public ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(String groupId) {
 public DeleteConsumerGroupsResult deleteConsumerGroups(Collection 
groupIds) {
 return deleteConsumerGroups(groupIds, new 
DeleteConsumerGroupsOptions());
 }
+
+/**
+ * Get the metrics kept by the adminClient
+ *
+ * @return
+ */
+public abstract Map metrics();
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 495095a9276..016195308b0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -35,6 +35,8 @@
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -2734,4 +2736,9 @@ void handleFailure(Throwable throwable) {
 
 return new DeleteConsumerGroupsResult(new HashMap>(futures));
 }
+
+@Override
+public Map metrics() {
+return Collections.unmodifiableMap(this.metrics.metrics());
+}
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 2fc7048b759..6fbdca265c1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
@@ -47,6 +49,8 @@
 private Node controller;
 private int timeoutNextRequests = 0;
 
+private Map mockMetrics = new HashMap<>();
+
 /**
  * Creates MockAdminClient for a cluster with the given brokers. The Kafka 
cluster ID uses the default value from
  * DEFAULT_CLUSTER_ID.
@@ -391,4 +395,10 @@ public void close(long duration, TimeUnit unit) {}
 }
 }
 
+public void setMockMetrics(MetricName name, Metric metric) { 
mockMetrics.put(name, metric); }
+
+@Override
+public Map metrics() {
+return mockMetrics;
+}
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 6a707ff986d..cef8116e880 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -382,12 +382,12 @@ public void setGlobalStateRestoreListener(final 
StateRestoreListener globalState
  *
  * @return Map of all metrics.
  */
-// TODO: we can add metrics for admin client as well
 public Map metrics() {
 final Map result = new LinkedHashMap<>();
 for (final StreamThread thread : threads) {
 result.putAll(thread.producerMetrics());
 result.putAll(thread.consumerMetrics());
+result.putAll(thread.adminClientMetrics());
 }
 if (globalStreamThread != null) 
result.putAll(globalStreamThread.consumerMetrics());
 result.putAll(metrics.metrics());
diff 

[jira] [Commented] (KAFKA-6986) Export Admin Client metrics through Stream Threads

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522515#comment-16522515
 ] 

ASF GitHub Bot commented on KAFKA-6986:
---

shunge opened a new pull request #5210: KAFKA-6986: Export Admin Client metrics 
through Stream Threads
URL: https://github.com/apache/kafka/pull/5210
 
 
   
   KAFKA-6986:Export Admin Client metrics through Stream Threads
   
   We already exported producer and consumer metrics through KafkaStreams class:
   
   https://github.com/apache/kafka/pull/4998
   
   It makes sense to also export the Admin client metrics.
   
   I didn't add a separate unittest case for this. Let me know if it's needed. 
   
   This is my first contribution, feel free to point out any mistakes that I 
did.
   
   @abbccdda 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Export Admin Client metrics through Stream Threads
> --
>
> Key: KAFKA-6986
> URL: https://issues.apache.org/jira/browse/KAFKA-6986
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Shun Guan
>Priority: Minor
>  Labels: Newcomer, beginner, newbie
>
> We already exported producer and consumer metrics through KafkaStreams class:
> [https://github.com/apache/kafka/pull/4998]
> It makes sense to also export the Admin client metrics.
> If any new contributor wishes to take over this one, please let me know. I 
> will revisit and close this ticket in one or two months later in case no one 
> claims it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka

2018-06-25 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522504#comment-16522504
 ] 

Jason Gustafson commented on KAFKA-7095:


See 
[KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets]
 which will go into the next release. You shouldn't need to restart your 
application in order to consumer the next offset even if the committed offset 
is lost. What `auto.offset.reset` policy are you using?

> Low traffic consumer is not consuming messages after the offsets is deleted 
> by Kafka
> 
>
> Key: KAFKA-7095
> URL: https://issues.apache.org/jira/browse/KAFKA-7095
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: Aldo Sinanaj
>Priority: Minor
>
> Hello guys.
> I have a low traffic consumers for a given consumer group and I have the 
> default broker setting for this property *offsets.retention.minutes*. So if a 
> messages is coming after 2 days and Kafka has deleted the offset for that 
> given consumer, then the consumer will not consume the new incoming messages. 
> If I restart the application it will consume from the earliest which is 
> obvious since the offset is deleted.
> My question is why it doesn't consume the new messages if I don't restart the 
> application? And how does this version of Kafka understands if a consumer is 
> active or inactive? Is my consumer considered inactive in this case?
> Thanks,
> Aldo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka

2018-06-25 Thread Aldo Sinanaj (JIRA)
Aldo Sinanaj created KAFKA-7095:
---

 Summary: Low traffic consumer is not consuming messages after the 
offsets is deleted by Kafka
 Key: KAFKA-7095
 URL: https://issues.apache.org/jira/browse/KAFKA-7095
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.1.1
Reporter: Aldo Sinanaj


Hello guys.

I have a low traffic consumers for a given consumer group and I have the 
default broker setting for this property *offsets.retention.minutes*. So if a 
messages is coming after 2 days and Kafka has deleted the offset for that given 
consumer, then the consumer will not consume the new incoming messages. If I 
restart the application it will consume from the earliest which is obvious 
since the offset is deleted.

My question is why it doesn't consume the new messages if I don't restart the 
application? And how does this version of Kafka understands if a consumer is 
active or inactive? Is my consumer considered inactive in this case?

Thanks,

Aldo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-25 Thread Steven Aerts (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522404#comment-16522404
 ] 

Steven Aerts edited comment on KAFKA-7026 at 6/25/18 3:06 PM:
--

I found three issues in kafka which are I think all the same. This issue, 
KAFKA-6681 and KAFKA-6717.
 I will comment on this one as I think it gives the best description.

We were able to see this issue both on 0.11.0 as on 1.1.0.

When we are in this state, the consumer group is marked as stable:
{code:java}
$./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group c-group  
--describe  --state --verbose
COORDINATOR (ID)ASSIGNMENT-STRATEGY   STATE#MEMBERS
broker3:9092 (1003) stickyStable   6
{code}
While the assignment is clearly broken:
{code:java}
$./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group 
wfei-aggregator-product-ap-1-2-PT6H  --describe  --members --verbose
CONSUMER-ID HOST   CLIENT-ID  #PARTITIONS 
ASSIGNMENT
consumer-1-63f5550e-fd12-4a1f-be13-fb33ac82d9d9 /host1 consumer-1 70  
coll-v2-events-beta3(7,12,17,20,24,29,38,39,45,48,49,51,55,61,64,66,69,73,80,83,94,97,99,101,111,122,128,133,134,136,139,144,149,153,160,161,168,178,179,184,188,196,210,213,215,224,243,252,254,255,258,262,281,283,285,293,294,297,302,303,304,305,311,316,319,326,331,337,342,343)
consumer-2-6490433c-f181-4d37-adb8-e3e8679bc960 /host1 consumer-2 70  
coll-v2-events-beta3(4,6,16,19,30,34,41,43,44,49,52,54,72,76,85,86,92,93,97,105,108,113,123,124,126,131,133,138,143,147,156,159,169,174,191,197,198,204,208,215,217,230,231,242,252,257,264,267,272,273,275,277,279,284,287,291,294,300,303,305,316,326,333,337,338,340,342,348,350,358)
consumer-1-c4fd0a50-456a-4994-9c85-d843b8bc4319 /host2 consumer-1 70  
coll-v2-events-beta3(1,3,5,7,13,18,22,23,24,32,33,35,36,37,40,55,68,74,77,84,87,94,98,102,122,127,135,137,141,142,148,152,154,157,158,165,177,178,193,194,199,220,221,222,228,236,238,239,249,259,262,270,281,283,285,293,295,299,301,314,320,329,331,341,344,345,346,347,352,353)
consumer-1-517ab012-fa10-4d4e-9465-861f4912b013 /host3 consumer-1 70  
coll-v2-events-beta3(2,8,9,14,17,26,28,45,46,48,53,57,58,59,60,63,66,70,73,75,78,80,82,88,91,95,103,107,109,115,119,120,134,136,151,155,160,166,170,172,182,183,186,188,189,203,205,226,232,233,237,244,246,247,248,260,263,278,282,286,292,296,308,312,313,319,328,332,343,357)
consumer-2-e4e4ab60-e94f-4242-93fa-99aa39cafc9f /host2 consumer-2 70  
coll-v2-events-beta3(0,10,11,15,20,25,27,29,31,38,47,50,79,83,96,99,101,111,112,114,125,139,140,145,150,162,168,171,175,179,187,190,192,195,200,202,206,207,209,211,213,214,218,219,223,227,229,234,240,241,245,250,251,253,254,256,261,269,271,276,288,290,298,315,323,325,334,349,354,355)
consumer-2-8bbc1a53-a626-4e5b-825e-57d71dc4658c /host3 consumer-2 70  
coll-v2-events-beta3(21,39,42,51,56,62,64,65,67,71,81,89,90,100,104,106,110,116,117,118,121,129,130,132,146,149,161,163,164,167,173,176,180,181,184,185,201,210,212,216,224,225,235,265,266,268,274,280,289,297,302,304,306,307,309,310,311,317,318,321,322,324,327,330,335,336,339,351,356,359)
{code}
So we have 360 partitions but there are 420 assigned.
 You clearly see that the partitions assigned to the fist consumer are also 
assigned to other consumers. (7, 17, 20, ...).

This issue is typically triggered when consumers loose (temporarily) their 
connection with the broker.  After restarting this consumer, everything is 
rebalanced correctly.


was (Author: steven.aerts):
I found three issues in kafka which are I think all the same.  This issue, 
[KAFKA-6681] and [KAFKA-6717].
I will comment on this one as I think it gives the best description.

We were able to see this issue both on 0.11.0 as on 1.1.0.

When we are in this state, the consumer group is marked as stable:

{code}
$./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group c-group  
--describe  --state --verbose
COORDINATOR (ID)ASSIGNMENT-STRATEGY   STATE#MEMBERS
broker3:9092 (1003) stickyStable   6
{code}

While the assignment is clearly broken:

{code}
$./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group 
wfei-aggregator-product-ap-1-2-PT6H  --describe  --members --verbose
CONSUMER-ID HOST   CLIENT-ID  #PARTITIONS 
ASSIGNMENT
consumer-1-63f5550e-fd12-4a1f-be13-fb33ac82d9d9 /host1 consumer-1 70  
coll-v2-events-beta3(7,12,17,20,24,29,38,39,45,48,49,51,55,61,64,66,69,73,80,83,94,97,99,101,111,122,128,133,134,136,139,144,149,153,160,161,168,178,179,184,188,196,210,213,215,224,243,252,254,255,258,262,281,283,285,293,294,297,302,303,304,305,311,316,319,326,331,337,342,343)
consumer-2-6490433c-f181-4d37-adb8-e3e8679bc960 /host1 consumer-2 70  

[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-25 Thread Steven Aerts (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522404#comment-16522404
 ] 

Steven Aerts commented on KAFKA-7026:
-

I found three issues in kafka which are I think all the same.  This issue, 
[KAFKA-6681] and [KAFKA-6717].
I will comment on this one as I think it gives the best description.

We were able to see this issue both on 0.11.0 as on 1.1.0.

When we are in this state, the consumer group is marked as stable:

{code}
$./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group c-group  
--describe  --state --verbose
COORDINATOR (ID)ASSIGNMENT-STRATEGY   STATE#MEMBERS
broker3:9092 (1003) stickyStable   6
{code}

While the assignment is clearly broken:

{code}
$./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group 
wfei-aggregator-product-ap-1-2-PT6H  --describe  --members --verbose
CONSUMER-ID HOST   CLIENT-ID  #PARTITIONS 
ASSIGNMENT
consumer-1-63f5550e-fd12-4a1f-be13-fb33ac82d9d9 /host1 consumer-1 70  
coll-v2-events-beta3(7,12,17,20,24,29,38,39,45,48,49,51,55,61,64,66,69,73,80,83,94,97,99,101,111,122,128,133,134,136,139,144,149,153,160,161,168,178,179,184,188,196,210,213,215,224,243,252,254,255,258,262,281,283,285,293,294,297,302,303,304,305,311,316,319,326,331,337,342,343)
consumer-2-6490433c-f181-4d37-adb8-e3e8679bc960 /host1 consumer-2 70  
coll-v2-events-beta3(4,6,16,19,30,34,41,43,44,49,52,54,72,76,85,86,92,93,97,105,108,113,123,124,126,131,133,138,143,147,156,159,169,174,191,197,198,204,208,215,217,230,231,242,252,257,264,267,272,273,275,277,279,284,287,291,294,300,303,305,316,326,333,337,338,340,342,348,350,358)
consumer-1-c4fd0a50-456a-4994-9c85-d843b8bc4319 /host2 consumer-1 70  
coll-v2-events-beta3(1,3,5,7,13,18,22,23,24,32,33,35,36,37,40,55,68,74,77,84,87,94,98,102,122,127,135,137,141,142,148,152,154,157,158,165,177,178,193,194,199,220,221,222,228,236,238,239,249,259,262,270,281,283,285,293,295,299,301,314,320,329,331,341,344,345,346,347,352,353)
consumer-1-517ab012-fa10-4d4e-9465-861f4912b013 /host3 consumer-1 70  
coll-v2-events-beta3(2,8,9,14,17,26,28,45,46,48,53,57,58,59,60,63,66,70,73,75,78,80,82,88,91,95,103,107,109,115,119,120,134,136,151,155,160,166,170,172,182,183,186,188,189,203,205,226,232,233,237,244,246,247,248,260,263,278,282,286,292,296,308,312,313,319,328,332,343,357)
consumer-2-e4e4ab60-e94f-4242-93fa-99aa39cafc9f /host2 consumer-2 70  
coll-v2-events-beta3(0,10,11,15,20,25,27,29,31,38,47,50,79,83,96,99,101,111,112,114,125,139,140,145,150,162,168,171,175,179,187,190,192,195,200,202,206,207,209,211,213,214,218,219,223,227,229,234,240,241,245,250,251,253,254,256,261,269,271,276,288,290,298,315,323,325,334,349,354,355)
consumer-2-8bbc1a53-a626-4e5b-825e-57d71dc4658c /host3 consumer-2 70  
coll-v2-events-beta3(21,39,42,51,56,62,64,65,67,71,81,89,90,100,104,106,110,116,117,118,121,129,130,132,146,149,161,163,164,167,173,176,180,181,184,185,201,210,212,216,224,225,235,265,266,268,274,280,289,297,302,304,306,307,309,310,311,317,318,321,322,324,327,330,335,336,339,351,356,359)
{code}

So we have 360 partitions but there are 420 assigned.
You clearly see that the partitions assigned to the fist consumer are also 
assigned to other consumers. (7, 17, 20, ...).

This issue is typically triggered when consumers loose (temporarily) their 
connection with the broker.


> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this 

[jira] [Commented] (KAFKA-6717) TopicPartition Assined twice to a consumer group for 2 consumer instances

2018-06-25 Thread Yuancheng PENG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522397#comment-16522397
 ] 

Yuancheng PENG commented on KAFKA-6717:
---

Hi [~steven.aerts], I confirm that it's the same issue as KAFKA-7026. I 
discovered this issue on production. Sorry for not being able to reproducing it.

> TopicPartition Assined twice to a consumer group for 2 consumer instances 
> --
>
> Key: KAFKA-6717
> URL: https://issues.apache.org/jira/browse/KAFKA-6717
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Yuancheng PENG
>Priority: Major
>
> I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain 
> pattern.
> There are 10 consumers with the same group id.
> I expected that topic-partition to be assigned to only one consumer instance. 
> However some topic partitions are assigned twice in 2 different difference 
> instance, hence the consumer group process duplicate messages.
> {code:java}
> props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> Collections.singletonList(StickyAssignor.class));
> KafkaConsumer c = new KafkaConsumer<>(props);
> c.subscribe(Pattern.compile(TOPIC_PATTERN), new 
> NoOpConsumerRebalanceListener());
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group

2018-06-25 Thread Steven Aerts (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522373#comment-16522373
 ] 

Steven Aerts commented on KAFKA-6681:
-

I think this problem is a duplicate of [KAFKA-7026].

> Two instances of kafka consumer reading the same partition within a consumer 
> group
> --
>
> Key: KAFKA-6681
> URL: https://issues.apache.org/jira/browse/KAFKA-6681
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.10.2.1
>Reporter: Narayan Periwal
>Priority: Critical
> Attachments: server-1.log, server-2.log
>
>
> We have seen this issue with the Kafka consumer, the new library that got 
> introduced in 0.9
> With this new client, the group management is done by kafka coordinator, 
> which is one of the kafka broker.
> We are using Kafka broker 0.10.2.1 and consumer client version is also 
> 0.10.2.1 
> The issue that we have faced is that, after rebalancing, some of the 
> partitions gets consumed by 2 instances within a consumer group, leading to 
> duplication of the entire partition data. Both the instances continue to read 
> until the next rebalancing, or the restart of those clients. 
> It looks like that a particular consumer goes on fetching the data from a 
> partition, but the broker is not able to identify this "stale" consumer 
> instance. 
> We have hit this twice in production. Please look at it the earliest. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6717) TopicPartition Assined twice to a consumer group for 2 consumer instances

2018-06-25 Thread Steven Aerts (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522372#comment-16522372
 ] 

Steven Aerts commented on KAFKA-6717:
-

I think this problem is a duplicate of [KAFKA-7026].

> TopicPartition Assined twice to a consumer group for 2 consumer instances 
> --
>
> Key: KAFKA-6717
> URL: https://issues.apache.org/jira/browse/KAFKA-6717
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Yuancheng PENG
>Priority: Major
>
> I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain 
> pattern.
> There are 10 consumers with the same group id.
> I expected that topic-partition to be assigned to only one consumer instance. 
> However some topic partitions are assigned twice in 2 different difference 
> instance, hence the consumer group process duplicate messages.
> {code:java}
> props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> Collections.singletonList(StickyAssignor.class));
> KafkaConsumer c = new KafkaConsumer<>(props);
> c.subscribe(Pattern.compile(TOPIC_PATTERN), new 
> NoOpConsumerRebalanceListener());
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7094) Variate should unify code style in one method, and use camel name

2018-06-25 Thread Matt Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang updated KAFKA-7094:
-
Description: 
In one method, there are two variates, partitionsTobeLeader and 
partitionsToBeFollower, which should use unify code style, that will be helpful 
to code maintenance.

 

https://github.com/apache/kafka/pull/5287 

  was:
In one method, there are two variates, partitionsTobeLeader and 
partitionsToBeFollower, which should use unify code style, that will be helpful 
to code maintenance.

 


> Variate should unify code style in one method, and use  camel name
> --
>
> Key: KAFKA-7094
> URL: https://issues.apache.org/jira/browse/KAFKA-7094
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.1
>Reporter: Matt Wang
>Priority: Trivial
>
> In one method, there are two variates, partitionsTobeLeader and 
> partitionsToBeFollower, which should use unify code style, that will be 
> helpful to code maintenance.
>  
> https://github.com/apache/kafka/pull/5287 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7094) Variate should unify code style in one method, and use camel name

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522114#comment-16522114
 ] 

ASF GitHub Bot commented on KAFKA-7094:
---

wangzzu opened a new pull request #5287: KAFKA-7094: Variate should unify code 
style in one method, and use  camel name
URL: https://github.com/apache/kafka/pull/5287
 
 
   In one method, there are two variates, partitionsTobeLeader and 
partitionsToBeFollower, which should use unify code style, that will be helpful 
to code maintenance. 
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Variate should unify code style in one method, and use  camel name
> --
>
> Key: KAFKA-7094
> URL: https://issues.apache.org/jira/browse/KAFKA-7094
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.1
>Reporter: Matt Wang
>Priority: Trivial
>
> In one method, there are two variates, partitionsTobeLeader and 
> partitionsToBeFollower, which should use unify code style, that will be 
> helpful to code maintenance.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7094) Variate should unify code style in one method, and use camel name

2018-06-25 Thread Matt Wang (JIRA)
Matt Wang created KAFKA-7094:


 Summary: Variate should unify code style in one method, and use  
camel name
 Key: KAFKA-7094
 URL: https://issues.apache.org/jira/browse/KAFKA-7094
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.0.1
Reporter: Matt Wang


In one method, there are two variates, partitionsTobeLeader and 
partitionsToBeFollower, which should use unify code style, that will be helpful 
to code maintenance.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

2018-06-25 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522031#comment-16522031
 ] 

Ted Yu commented on KAFKA-7088:
---

Maybe set log level to DEBUG when you have a chance.

Thanks

> Kafka streams thread waits infinitely on transaction init
> -
>
> Key: KAFKA-7088
> URL: https://issues.apache.org/jira/browse/KAFKA-7088
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
> Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 
> 20:07:43 UTC 2018 
> kafka-streams (client) 1.0.1
> kafka broker 1.1.0
> Java version:
> OpenJDK Runtime Environment (build 1.8.0_171-b10)
> OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)
> kakfa config overrides:
> num.stream.threads: 6
> session.timeout.ms: 1
> request.timeout.ms: 11000
> fetch.max.wait.ms: 500
> max.poll.records: 1000
> topic has 24 partitions
>Reporter: Lukasz Gluchowski
>Priority: Major
>  Labels: eos
>
> A kafka stream application thread stops processing without any feedback. The 
> topic has 24 partitions and I noticed that processing stopped only for some 
> partitions. I will describe what happened to partition:10. The application is 
> still running (now for about 8 hours) and that thread is hanging there and no 
> rebalancing that took place.
> There is no error (we have a custom `Thread.UncaughtExceptionHandler` which 
> was not called). I noticed that after couple of minutes stream stopped 
> processing (at offset 32606948 where log-end-offset is 33472402). 
> Broker itself is not reporting any active consumer in that consumer group and 
> the only info I was able to gather was from thread dump:
> {code:java}
> "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33"
>  #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition 
> [0x7fe0215d4000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0xfec6a2f8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at 
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code}
>  
> I tried restarting application once but the situation repeated. Thread read 
> some data, committed offset and stopped processing, leaving that thread in 
> wait 

[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

2018-06-25 Thread Lukasz Gluchowski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522004#comment-16522004
 ] 

Lukasz Gluchowski commented on KAFKA-7088:
--

[~yuzhih...@gmail.com] I turned TRACE logging on non production environment to 
try it out and it was filling the disk very quickly. Sorry but I can't turn it 
on on production.

> Kafka streams thread waits infinitely on transaction init
> -
>
> Key: KAFKA-7088
> URL: https://issues.apache.org/jira/browse/KAFKA-7088
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
> Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 
> 20:07:43 UTC 2018 
> kafka-streams (client) 1.0.1
> kafka broker 1.1.0
> Java version:
> OpenJDK Runtime Environment (build 1.8.0_171-b10)
> OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)
> kakfa config overrides:
> num.stream.threads: 6
> session.timeout.ms: 1
> request.timeout.ms: 11000
> fetch.max.wait.ms: 500
> max.poll.records: 1000
> topic has 24 partitions
>Reporter: Lukasz Gluchowski
>Priority: Major
>  Labels: eos
>
> A kafka stream application thread stops processing without any feedback. The 
> topic has 24 partitions and I noticed that processing stopped only for some 
> partitions. I will describe what happened to partition:10. The application is 
> still running (now for about 8 hours) and that thread is hanging there and no 
> rebalancing that took place.
> There is no error (we have a custom `Thread.UncaughtExceptionHandler` which 
> was not called). I noticed that after couple of minutes stream stopped 
> processing (at offset 32606948 where log-end-offset is 33472402). 
> Broker itself is not reporting any active consumer in that consumer group and 
> the only info I was able to gather was from thread dump:
> {code:java}
> "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33"
>  #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition 
> [0x7fe0215d4000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0xfec6a2f8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at 
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code}
>  
> I tried restarting 

[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2018-06-25 Thread Wouter Bancken (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521948#comment-16521948
 ] 

Wouter Bancken commented on KAFKA-6817:
---

Is there any news on a solution/workaround for clients not using Kafka Streams?

We have tried setting transactional.id.expiration.ms to its maximum value (24 
days) as mentioned earlier in the comments and this greatly reduces the number 
of occurrences but it does not eliminate the error. 

In our use cases it is possible that a producer will remain active for a long 
period without writing to a certain topic.

> UnknownProducerIdException when writing messages with old timestamps
> 
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Odin Standal
>Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 22 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error 
> occurs after 10 minutes of producing messages that have old timestamps (type 
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so 
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code 
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers 
> from memory erroneously when processing records which are older than the 
> maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
> configuration), which is set by default to 7 days. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)