[jira] [Comment Edited] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524543#comment-16524543 ] Narayan Periwal edited comment on KAFKA-7026 at 6/27/18 4:11 AM: - [~vahid], unfortunately we are not able to reproduce this in our QA setup. Only co-relation that we have seen is this seems to happen when there is spike in the number of under replicated partitions in the kafka cluster. One more thing is when this issue happens, we have seen our consumers not processing data for more than "max.poll.interval.ms", thus the consumer.poll() call is not invoked for "max.poll.interval.ms", which means the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. Looks like, the first consumer, after recovery(able to process now), is still getting data from the earlier assigned partition, leading to this issue. was (Author: nperiwal): [~vahid], unfortunately we are not able to reproduce this in our QA setup. Only co-relation that we have seen is this seems to happen when there is spike in the number of under replicated partitions in the kafka cluster. One more thing is when this issue happens, we have seen our consumers not processing data for more than "max.poll.interval.ms", thus the consumer.poll() call is not invoked for "max.poll.interval.ms", which means the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. Looks like, the old consumer, after recovery, is still getting data from the earlier assigned partition, leading to this issue. > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.1.0 > > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524543#comment-16524543 ] Narayan Periwal edited comment on KAFKA-7026 at 6/27/18 4:10 AM: - [~vahid], unfortunately we are not able to reproduce this in our QA setup. Only co-relation that we have seen is this seems to happen when there is spike in the number of under replicated partitions in the kafka cluster. One more thing is when this issue happens, we have seen our consumers not processing data for more than "max.poll.interval.ms", thus the consumer.poll() call is not invoked for "max.poll.interval.ms", which means the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. Looks like, the old consumer, after recovery, is still getting data from the earlier assigned partition, leading to this issue. was (Author: nperiwal): [~vahid], unfortunately we are not able to reproduce this in our QA setup. Only co-relation that we have seen is this seems to happen when there is spike in the number of under replicated partitions in the kafka cluster. > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.1.0 > > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524543#comment-16524543 ] Narayan Periwal commented on KAFKA-7026: [~vahid], unfortunately we are not able to reproduce this in our QA setup. Only co-relation that we have seen is this seems to happen when there is spike in the number of under replicated partitions in the kafka cluster. > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.1.0 > > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524538#comment-16524538 ] Vahid Hashemian commented on KAFKA-7026: [~nperiwal] Is the issue you described reproducible by following certain steps? > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.1.0 > > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524517#comment-16524517 ] Narayan Periwal commented on KAFKA-7026: [~vahid], Agree that this may not be the actual cause of the issue. But we have seen this occuring multiple times in our production setup, with consumers continuing to consume the same partition unless a manual restart is triggered. So, it could be due to some other issue. [~steven.aerts], we are using custom checkpointing in zookeeper, so the kafka-consumer-groups.sh script to describe the consumer group does not work for us. However, we have a mechanism to detect multiple consumers when consuming from the same partition. I am sharing the distribution of one such case. Topic - test, consumer group - group1, consumers - c1,c2,c3,c4,c5. Partition 3,4,5 of this topic were being consumed by multiple consumer instances. {noformat} group: group1, topic: test, partition: 0, consumer: c2 group: group1, topic: test, partition: 1, consumer: c4 group: group1, topic: test, partition: 2, consumer: c4 group: group1, topic: test, partition: 3, consumer: c3,c4 group: group1, topic: test, partition: 4, consumer: c3,c5 group: group1, topic: test, partition: 5, consumer: c3,c5 group: group1, topic: test, partition: 6, consumer: c5 group: group1, topic: test, partition: 7, consumer: c1 group: group1, topic: test, partition: 8, consumer: c1 group: group1, topic: test, partition: 9, consumer: c1 {noformat} > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.1.0 > > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka
[ https://issues.apache.org/jira/browse/KAFKA-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524432#comment-16524432 ] Jason Gustafson commented on KAFKA-7095: The easy thing to do is increase the offsets.retention.minutes to something larger to ensure that the committed offset is not lost. However, there shouldn't be any need to restart the application. The consumer should just reset the offset if it can't find a committed offset to use. I tried this out on 0.10.1 using a very low offsets.retention.minutes and it works as expected. I would suggest using a more recent client version, and if that doesn't work, collect and post DEBUG logging from the consumer. > Low traffic consumer is not consuming messages after the offsets is deleted > by Kafka > > > Key: KAFKA-7095 > URL: https://issues.apache.org/jira/browse/KAFKA-7095 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 >Reporter: Aldo Sinanaj >Priority: Minor > > Hello guys. > I have a low traffic consumers for a given consumer group and I have the > default broker setting for this property *offsets.retention.minutes*. So if a > messages is coming after 2 days and Kafka has deleted the offset for that > given consumer, then the consumer will not consume the new incoming messages. > If I restart the application it will consume from the earliest which is > obvious since the offset is deleted. > My question is why it doesn't consume the new messages if I don't restart the > application? And how does this version of Kafka understands if a consumer is > active or inactive? Is my consumer considered inactive in this case? > Thanks, > Aldo -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup
[ https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524421#comment-16524421 ] Guozhang Wang commented on KAFKA-5037: -- If we want to only fix-forward this issue beyond 2.0. I'd propose the following to refactor the {{StreamsPartitionAssignor#assign()}} function as follows. The principle is that, unlike consumer who would assign partial available topics to members hoping that when the metadata gets refreshed again another rebalance is triggered to complete the full subscription, in Streams we would not start at all if not all the requested source topics are available. This is a simpler rule and easier for users to reason: in the past the most complaints are that "I need to be notified when this happens, but I still see application in RUNNING, and was not aware we are actually only executing partial topology". 1. At the beginning of {{assign}}, we first check that all the non-repartition source topics are included in the {{metadata}}. If not, we log an error at the leader and set an error in the Assignment userData bytes, indicating that leader cannot complete assignment and the error code would indicate the root cause of it. 2. Upon receiving the assignment, if the error is not NONE the streams will shutdown itself with a log entry re-stating the root cause interpreted from the error code. 3. With 1) / 2) above, the non repartition source topics should always been available, and hence we should never encounter the `NOT_AVAILABLE` case for num.partitions of those repartition topics, we will remove `NOT_AVAILABLE` from the possible values, and remove the corresponding logic in DefaultPartitionGrouper for `NOT_AVAILABLE`. Note that 1) above requires us to bump up the version for assignment userData again, which should be fine with version probing added in 2.0. There is an alternative walkaround, to use negative version numbers as the error codes, and as long as clients knows how to interpret them that is fine. Personally I felt it is a bit hacky and would suggest we just bump up the version. WDYT? > Infinite loop if all input topics are unknown at startup > > > Key: KAFKA-5037 > URL: https://issues.apache.org/jira/browse/KAFKA-5037 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: user-experience > > See discusion: https://github.com/apache/kafka/pull/2815 > We will need some rewrite on {{StreamPartitionsAssignor}} and to add much > more test for all kind of corner cases, including pattern subscriptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup
[ https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524389#comment-16524389 ] Guozhang Wang commented on KAFKA-5037: -- I'm now grouping these three tickets: 5037, 6587 and 6437 into a single general issue, and here is my summary of it: 1. If there is a repartition topic whose num.partitions is dependent on the corresponding sub-topology's input topic, and that input topic does not exist in {{metadata}}, then it will be assigned as {{NOT_AVAILABLE}} and the corresponding topic will not be created, and any tasks that needs it as input topics will be silently dropped (see {{DefaultPartitionGrouper#maxNumPartitions}}). This is reported as in KAFKA-6437. 1.1. If we happen to pass the while loop because of the {{NOT_AVAILABLE}} tag, we may still hit the issue in {{CopartitionedTopicsValidator#validate}} that will cause runtime exception being thrown. 2. If there is a repartition topic whose num.partitions is dependent on the corresponding sub-topology's input topic, and that input topic is also a repartition topic with num.partition as {{NOT_AVAILABLE}}, then there is a bug that will cause the while loop to never finish. This is reported as in KAFKA-5037 and the PR 2815 contains the fix of it already. 3. Before 1.1, we have another while loop inside {{prepareTopic}}. If the metadata cannot be fetched then we will also fall into a blocking scenario. This is more related to KAFKA-6587 and should have been fixed automatically now. {code} // wait until each one of the topic metadata has been propagated to at least one broker while (!allTopicsCreated(topicNamesToMakeReady, topicsToMakeReady)) { try { Thread.sleep(50L); } catch (InterruptedException e) { // ignore } } {code} > Infinite loop if all input topics are unknown at startup > > > Key: KAFKA-5037 > URL: https://issues.apache.org/jira/browse/KAFKA-5037 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: user-experience > > See discusion: https://github.com/apache/kafka/pull/2815 > We will need some rewrite on {{StreamPartitionsAssignor}} and to add much > more test for all kind of corner cases, including pattern subscriptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524379#comment-16524379 ] Guozhang Wang commented on KAFKA-6437: -- I think this is again correlated to https://issues.apache.org/jira/browse/KAFKA-5037, I'll write the summary of this general issue in 5037. > Streams does not warn about missing input topics, but hangs > --- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 > Environment: Single client on single node broker >Reporter: Chris Schwarzfischer >Assignee: Mariam John >Priority: Minor > Labels: newbie > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the middle" of the topology (at …9, see below). Only parts of > the intermediate topics are created (up to …9) > When the missing input topic is created, the streams application resumes > processing. > {noformat} > Topology: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-11: > topics: > [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] > children: [KTABLE-AGGREGATE-12] > KTABLE-AGGREGATE-12: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KTABLE-TOSTREAM-20] > KTABLE-TOSTREAM-20: > children: [KSTREAM-SINK-21] > KSTREAM-SINK-21: > topic: data_udr_month_customer_aggregration > KSTREAM-SOURCE-17: > topics: > [mystreams_app-KSTREAM-MAP-14-repartition] > children: [KSTREAM-LEFTJOIN-18] > KSTREAM-LEFTJOIN-18: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KSTREAM-SINK-19] > KSTREAM-SINK-19: > topic: data_UDR_joined > Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, > mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] > {noformat} > *Why this matters* > The applications does quite a lot of preprocessing before joining with the > missing input topic. This preprocessing won't happen without the topic, > creating a huge backlog of data. > *Fix* > Issue an `warn` or `error` level message at start to inform about the missing > topic and it's consequences. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7104) ReplicaFetcher thread may die because of inconsistent log start offset in fetch response
[ https://issues.apache.org/jira/browse/KAFKA-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anna Povzner updated KAFKA-7104: Description: What we saw: The follower fetches offset 116617, which it was able successfully append. However, leader's log start offset in fetch request was 116753, which was higher than fetched offset 116617. When replica fetcher thread tried to increment log start offset to leader's log start offset, it failed with OffsetOutOfRangeException: [2018-06-23 00:45:37,409] ERROR Error due to (kafka.server.ReplicaFetcherThread) kafka.common.KafkaException: Error processing data for partition X-N offset 116617 Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 116753 of partition X-N since it is larger than the high watermark 116619 In leader's log, we see that log start offset was incremented almost at the same time (within one 100 ms or so). [2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N to 116753 In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() that reads from local log and returns LogReadResult that contains fetched data and leader's log start offset and HW. However, it then calls ReplicaManager#updateFollowerLogReadResults() which may move leader's log start offset and update leader's log start offset and HW in fetch response. If deleteRecords() happens in between, it is possible that log start offset may move beyond fetched offset. Or possibly, the leader moves log start offset because of deleting old log segments. Basically, the issue is that log start offset can move between records are read from the log and LogReadResult is updated with new log start offset. As a result, fetch response may contain fetched data but leader's log start offset in the response could be set beyond fetched offset (and indicate the state on leader that fetched data does not actually exist anymore on leader). When a follower receives such fetch response, it will first append, then move it's HW no further than its LEO, which maybe less than leader's log start offset in fetch response, and then call `replica.maybeIncrementLogStartOffset(leaderLogStartOffset)` which will throw OffsetOutOfRangeException exception causing the fetcher thread to stop. Note that this can happen if the follower is not in ISR, otherwise the leader will not move its log start offsets beyond follower's HW. *Suggested fix:* 1) Since ReplicaFetcher bounds follower's HW to follower's LEO, we should also bound follower's log start offset to its LEO. In this situation, the follower's log start offset will be updated to LEO. 2) In addition to #1, we could try to make sure that leader builds fetch response based on the state of the log as of time of reading data from replica (but including moving leader's HW based on the follower's fetch). That could be another JIRA potentially, since the fix could be more involved. was: What we saw: The follower fetches offset 116617, which it was able successfully append. However, leader's log start offset in fetch request was 116753, which was higher than fetched offset 116617. When replica fetcher thread tried to increment log start offset to leader's log start offset, it failed with OffsetOutOfRangeException: [2018-06-23 00:45:37,409] ERROR [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) kafka.common.KafkaException: Error processing data for partition X-N offset 116617 Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 116753 of partition X-N since it is larger than the high watermark 116619 In leader's log, we see that log start offset was incremented almost at the same time (within one 100 ms or so). [2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N to 116753 in dir /kafka/kafka-logs (kafka.log.Log) In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() that reads from local log and returns LogReadResult that contains fetched data and leader's log start offset and HW. However, it then calls ReplicaManager#updateFollowerLogReadResults() which may move leader's log start offset and update leader's log start offset and HW in fetch response. If deleteRecords() happens in between, it is possible that log start offset may move beyond fetched offset. Or possibly, the leader moves log start offset because of deleting old log segments. Basically, the issue is that log start offset can move between records are read from the log and LogReadResult is updated with new log start offset. As a result, fetch response may contain fetched data but leader's log start offset in the response could be set beyond fetched offset (and indicate the state on
[jira] [Updated] (KAFKA-7104) ReplicaFetcher thread may die because of inconsistent log start offset in fetch response
[ https://issues.apache.org/jira/browse/KAFKA-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anna Povzner updated KAFKA-7104: Description: What we saw: The follower fetches offset 116617, which it was able successfully append. However, leader's log start offset in fetch request was 116753, which was higher than fetched offset 116617. When replica fetcher thread tried to increment log start offset to leader's log start offset, it failed with OffsetOutOfRangeException: [2018-06-23 00:45:37,409] ERROR [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) kafka.common.KafkaException: Error processing data for partition X-N offset 116617 Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 116753 of partition X-N since it is larger than the high watermark 116619 In leader's log, we see that log start offset was incremented almost at the same time (within one 100 ms or so). [2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N to 116753 in dir /kafka/kafka-logs (kafka.log.Log) In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() that reads from local log and returns LogReadResult that contains fetched data and leader's log start offset and HW. However, it then calls ReplicaManager#updateFollowerLogReadResults() which may move leader's log start offset and update leader's log start offset and HW in fetch response. If deleteRecords() happens in between, it is possible that log start offset may move beyond fetched offset. Or possibly, the leader moves log start offset because of deleting old log segments. Basically, the issue is that log start offset can move between records are read from the log and LogReadResult is updated with new log start offset. As a result, fetch response may contain fetched data but leader's log start offset in the response could be set beyond fetched offset (and indicate the state on leader that fetched data does not actually exist anymore on leader). When a follower receives such fetch response, it will first append, then move it's HW no further than its LEO, which maybe less than leader's log start offset in fetch response, and then call `replica.maybeIncrementLogStartOffset(leaderLogStartOffset)` which will throw OffsetOutOfRangeException exception causing the fetcher thread to stop. Note that this can happen if the follower is not in ISR, otherwise the leader will not move its log start offsets beyond follower's HW. *Suggested fix:* 1) Since ReplicaFetcher bounds follower's HW to follower's LEO, we should also bound follower's log start offset to its LEO. In this situation, the follower's log start offset will be updated to LEO. 2) In addition to #1, we could try to make sure that leader builds fetch response based on the state of the log as of time of reading data from replica (but including moving leader's HW based on the follower's fetch). That could be another JIRA potentially, since the fix could be more involved. was: What we saw: The follower fetches offset 116617, which it was able successfully append. However, leader's log start offset in fetch request was 116753, which was higher than fetched offset 116617. When replica fetcher thread tried to increment log start offset to leader's log start offset, it failed with OffsetOutOfRangeException: [2018-06-23 00:45:37,409] ERROR [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) kafka.common.KafkaException: Error processing data for partition X-N offset 116617 Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 116753 of partition X-N since it is larger than the high watermark 116619 In leader's log, we see that log start offset was incremented almost at the same time (within one 100 ms or so). [2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N to 116753 in dir /kafka/kafka-logs (kafka.log.Log) In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() that reads from local log and returns LogReadResult that contains fetched data and leader's log start offset and HW. However, it then calls ReplicaManager#updateFollowerLogReadResults() that may move leader's log start offset and update leader's log start offset and HW in fetch response. If deleteRecords() happens in between, it is possible that log start offset may move beyond fetched offset. As a result, fetch response will contain fetched data but log start offset that is beyond fetched offset (and indicate the state on leader that fetched data does not actually exist anymore on leader). When a follower receives such fetch response, it will first append, then move it's HW no further
[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524331#comment-16524331 ] Guozhang Wang commented on KAFKA-6520: -- Hi [~milindjain], are you still working on this ticket? > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Milind Jain >Priority: Major > Labels: newbie, user-experience > > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. > [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a > related issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7107) Ability to configure state store for JoinWindows in KStream-KStream join
[ https://issues.apache.org/jira/browse/KAFKA-7107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7107: --- Labels: needs-kip (was: ) > Ability to configure state store for JoinWindows in KStream-KStream join > > > Key: KAFKA-7107 > URL: https://issues.apache.org/jira/browse/KAFKA-7107 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Currently, the KStream-KStream join operation internally provisions window > stores to support the JoinWindow configuration. > > However, unlike all the other stateful processors, it does not allow > configuration of the stores. We should consider adding DSL methods taking > Materialized configs for these stores. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7106) Remove segment/segmentInterval from Window definition
[ https://issues.apache.org/jira/browse/KAFKA-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7106: --- Labels: needs-kip (was: ) > Remove segment/segmentInterval from Window definition > - > > Key: KAFKA-7106 > URL: https://issues.apache.org/jira/browse/KAFKA-7106 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: needs-kip > > Currently, Window configures segment and segmentInterval properties, but > these aren't truly properties of a window in general. > Rather, they are properties of the particular implementation that we > currently have: a segmented store. Therefore, these properties should be > moved to configure only that implementation. > > This may be related to KAFKA-4730, since an in-memory window store wouldn't > necessarily need to be segmented. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6587) Kafka Streams hangs when not able to access internal topics
[ https://issues.apache.org/jira/browse/KAFKA-6587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524306#comment-16524306 ] Guozhang Wang commented on KAFKA-6587: -- I looked into the source code, and I believe this issue still exists in trunk, and the security setting is just one of the scenarios that can triggers it. The root cause is the same as https://issues.apache.org/jira/browse/KAFKA-5037. Here is the reasoning: Without the missed setting, the TopicMetadata response will filter out any topics that are not granted permission, and hence the {{metadata}} object passed in {{assign(Cluster metadata, Map subscriptions)}} will not contain this topic, and hence it will be blocked as described in KAFKA-5037. > Kafka Streams hangs when not able to access internal topics > --- > > Key: KAFKA-6587 > URL: https://issues.apache.org/jira/browse/KAFKA-6587 > Project: Kafka > Issue Type: Bug > Components: security, streams >Affects Versions: 1.0.0 >Reporter: Chris Medved >Priority: Minor > > *Expectation:* Kafka Streams client will throw an exception, log errors, or > crash when a fatal error occurs. > *Observation:* Kafka Streams does not log an error or throw an exception when > necessary permissions for internal state store topics are not granted. It > will hang indefinitely and not start running the topology. > *Steps to reproduce:* > # Create a Kafka Cluster with ACLs enabled (allow.everyone.if.no.acl.found > should be set to false, or deny permissions must be set on the intermediate > topics). > # Create a simple streams application that does a stateful operation such as > count. > # Grant ACLs on source and sink topics to principal used for testing (would > recommend using ANONYMOUS user if possible for ease of testing). > # Grant ACLs for consumer group and cluster create. Add deny permissions to > state store topics if the default is "allow". You can run the application to > create the topics or use the toplogy describe method to get the names. > # Run streams application. It should hang on "(Re-)joining group" with no > errors printed. > *Detailed Explanation* > I spent some time trying to figure out what was wrong with my streams app. > I'm using ACLs on my Kafka cluster and it turns out I forgot to grant > read/write access to the internal topic state store for an aggregation. > The streams client would hang on "(Re-)joining group" until killed (note ^C > is ctrl+c, which I used to kill the app): > {code:java} > 10:29:10.064 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] Discovered coordinator localhost:9092 (id: > 2147483647 rack: null) > 10:29:10.105 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] Revoking previously assigned partitions [] > 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] State transition from RUNNING to > PARTITIONS_REVOKED > 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [kafka-consumer-client]State transition from RUNNING to REBALANCING > 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] partition revocation took 1 ms. > suspended active tasks: [] > suspended standby tasks: [] > 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=kafka-consumer-client-StreamThread-1-consumer, > groupId=kafka-consumer-test] (Re-)joining group > ^C > 10:34:53.609 [Thread-3] INFO org.apache.kafka.streams.KafkaStreams - > stream-client [kafka-consumer-client]State transition from REBALANCING to > PENDING_SHUTDOWN > 10:34:53.610 [Thread-3] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] Informed to shut down > 10:34:53.610 [Thread-3] INFO > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [kafka-consumer-client-StreamThread-1] State transition from > PARTITIONS_REVOKED to PENDING_SHUTDOWN{code} > The server log would show: > {code:java} > [2018-02-23 10:29:10,408] INFO [Partition > kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-05-changelog-0 > broker=0] >
[jira] [Created] (KAFKA-7107) Ability to configure state store for JoinWindows in KStream-KStream join
John Roesler created KAFKA-7107: --- Summary: Ability to configure state store for JoinWindows in KStream-KStream join Key: KAFKA-7107 URL: https://issues.apache.org/jira/browse/KAFKA-7107 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, the KStream-KStream join operation internally provisions window stores to support the JoinWindow configuration. However, unlike all the other stateful processors, it does not allow configuration of the stores. We should consider adding DSL methods taking Materialized configs for these stores. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7106) Remove segment/segmentInterval from Window definition
John Roesler created KAFKA-7106: --- Summary: Remove segment/segmentInterval from Window definition Key: KAFKA-7106 URL: https://issues.apache.org/jira/browse/KAFKA-7106 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, Window configures segment and segmentInterval properties, but these aren't truly properties of a window in general. Rather, they are properties of the particular implementation that we currently have: a segmented store. Therefore, these properties should be moved to configure only that implementation. This may be related to KAFKA-4730, since an in-memory window store wouldn't necessarily need to be segmented. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7101) Session Window store should set topic policy `compact,cleanup`
[ https://issues.apache.org/jira/browse/KAFKA-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524222#comment-16524222 ] ASF GitHub Bot commented on KAFKA-7101: --- guozhangwang opened a new pull request #5298: KAFKA-7101: Consider session store for windowed store default configs URL: https://github.com/apache/kafka/pull/5298 1. extend `isWindowStore` to consider session store as well. 2. extend the existing unit test accordingly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Session Window store should set topic policy `compact,cleanup` > -- > > Key: KAFKA-7101 > URL: https://issues.apache.org/jira/browse/KAFKA-7101 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > With > [KIP-71|https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist] > (0.10.1.0) topic config `compact,delete` was introduce to apply to windowed > store changelog topics in Kafka Streams. Later (0.10.2.0), session windows > got added in > [KIP-94|https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows]. > However, session windows do not use `compact,delete` at the moment. This > result is the same issue window stores face before KIP-71. Thus, we should > enable `compact,delete` for session window changelog topics, too. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7105) Refactor RocksDBSegmentsBatchingRestoreCallback and RocksDBBatchingRestoreCallback into a single class
Liquan Pei created KAFKA-7105: - Summary: Refactor RocksDBSegmentsBatchingRestoreCallback and RocksDBBatchingRestoreCallback into a single class Key: KAFKA-7105 URL: https://issues.apache.org/jira/browse/KAFKA-7105 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 1.1.0 Reporter: Liquan Pei Assignee: Liquan Pei -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7104) ReplicaFetcher thread may die because of inconsistent log start offset in fetch response
Anna Povzner created KAFKA-7104: --- Summary: ReplicaFetcher thread may die because of inconsistent log start offset in fetch response Key: KAFKA-7104 URL: https://issues.apache.org/jira/browse/KAFKA-7104 Project: Kafka Issue Type: Bug Affects Versions: 1.1.0, 1.0.0 Reporter: Anna Povzner Assignee: Anna Povzner What we saw: The follower fetches offset 116617, which it was able successfully append. However, leader's log start offset in fetch request was 116753, which was higher than fetched offset 116617. When replica fetcher thread tried to increment log start offset to leader's log start offset, it failed with OffsetOutOfRangeException: [2018-06-23 00:45:37,409] ERROR [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) kafka.common.KafkaException: Error processing data for partition X-N offset 116617 Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 116753 of partition X-N since it is larger than the high watermark 116619 In leader's log, we see that log start offset was incremented almost at the same time (within one 100 ms or so). [2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N to 116753 in dir /kafka/kafka-logs (kafka.log.Log) In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() that reads from local log and returns LogReadResult that contains fetched data and leader's log start offset and HW. However, it then calls ReplicaManager#updateFollowerLogReadResults() that may move leader's log start offset and update leader's log start offset and HW in fetch response. If deleteRecords() happens in between, it is possible that log start offset may move beyond fetched offset. As a result, fetch response will contain fetched data but log start offset that is beyond fetched offset (and indicate the state on leader that fetched data does not actually exist anymore on leader). When a follower receives such fetch response, it will first append, then move it's HW no further than its LEO, which maybe less than leader's log start offset in fetch response, and then call `replica.maybeIncrementLogStartOffset(leaderLogStartOffset)` which will throw OffsetOutOfRangeException exception causing the fetcher thread to stop. *Suggested fix:* If the leader moves log start offset beyond fetched offset, ReplicaManager#updateFollowerLogReadResults() should update the log read result with OFFSET_OUT_OF_RANGE error, which will cause the follower to reset fetch offset to leader's log start offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7103) Use bulkloading for RocksDBSegmentedBytesStore during init
Liquan Pei created KAFKA-7103: - Summary: Use bulkloading for RocksDBSegmentedBytesStore during init Key: KAFKA-7103 URL: https://issues.apache.org/jira/browse/KAFKA-7103 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Liquan Pei Assignee: Liquan Pei We should use bulk loading for recovering RocksDBWindowStore, same as RocksDBStore. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6268) Tools should now swallow exceptions like resolving network names
[ https://issues.apache.org/jira/browse/KAFKA-6268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram reassigned KAFKA-6268: - Assignee: Stanislav Kozlovski Fix Version/s: 2.1.0 > Tools should now swallow exceptions like resolving network names > > > Key: KAFKA-6268 > URL: https://issues.apache.org/jira/browse/KAFKA-6268 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.1 >Reporter: Antony Stubbs >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.1.0 > > > The cli consumer client shows nothing when it can't resolve a domain. This > and other errors like it should be shown to the user by default. You have to > turn on DEBUG level logging in the tools log4j to find there is an error. > {{[2017-11-23 16:40:56,401] DEBUG Error connecting to node > as-broker-1-eu-west-1b-public:9092 (id: 1 rack: null) > (org.apache.kafka.clients.NetworkClient) > java.io.IOException: Can't resolve address: as-broker-1-eu-west-1b-public:9092 > at org.apache.kafka.common.network.Selector.connect(Selector.java:195) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:764) > at > org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:60) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:908) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:819) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:431) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:223) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:64) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:72) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > Caused by: java.nio.channels.UnresolvedAddressException > at sun.nio.ch.Net.checkAddress(Net.java:101) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) > at org.apache.kafka.common.network.Selector.connect(Selector.java:192) > ... 18 more > }} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524117#comment-16524117 ] Vahid Hashemian commented on KAFKA-7026: [~steven.aerts] is correct. The issue reported in this Jira occurs because the consumers involved keep state of their assignment and report that back to the leader when a rebalance occurs. Range Assignor is much simpler than that: every time a rebalance occurs the leader starts fresh (this is the case for Round Robin Assignor too). So a similar use case would work fine for those assignors. > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.1.0 > > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7102) Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@10c8df20
sankar created KAFKA-7102: - Summary: Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@10c8df20 Key: KAFKA-7102 URL: https://issues.apache.org/jira/browse/KAFKA-7102 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.10.1.0 Reporter: sankar Attachments: kafka_java_io_exception.txt we faced Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@10c8df20 Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@10c8df20 We have four node kafka cluster in production environment. We experienced suddenly kafka connect issue across cluster. manual restart kafka service on all the nodes fixed the issue. I attached the complete log. Please check the log. kindly let me know what information more needed from my side. Thanks in advance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6812) Async ConsoleProducer exits with 0 status even after data loss
[ https://issues.apache.org/jira/browse/KAFKA-6812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524080#comment-16524080 ] Andras Beni commented on KAFKA-6812: [~enether] In sync mode, when an exception occurs, it is logged, and the tool immediately exits with error status. In async mode, the same situation is handled by logging, continuing operation and exit code will still be 0. I also see how this behavior can be derived from current implementation. I was questioning if it is correct and by design, because loosing data should be unexpected and the difference in error handling is not obvious from sync-async distinction. If this difference is intended, I propose to either * document that in async mode dropping records is expected and accepted or * add a flag (e.g. --error-handler-strategy) so users can choose how they want to handle errors. If the latter case is acceptable for you, I am happy to write the KIP and implement. If neither option is acceptable, feel free to close this issue. > Async ConsoleProducer exits with 0 status even after data loss > -- > > Key: KAFKA-6812 > URL: https://issues.apache.org/jira/browse/KAFKA-6812 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 1.1.0 >Reporter: Andras Beni >Assignee: Stanislav Kozlovski >Priority: Minor > > When {{ConsoleProducer}} is run without {{--sync}} flag and one of the > batches times out, {{ErrorLoggingCallback}} logs the error: > {code:java} > 18/04/21 04:23:01 WARN clients.NetworkClient: [Producer > clientId=console-producer] Connection to node 10 could not be established. > Broker may not be available. > 18/04/21 04:23:02 ERROR internals.ErrorLoggingCallback: Error when sending > message to topic my-topic with key: null, value: 8 bytes with error: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > my-topic-0: 1530 ms has passed since batch creation plus linger time{code} > However, the tool exits with status code 0. > In my opinion the tool should indicate in the exit status that there was > data lost. Maybe it's reasonable to exit after the first error. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6809) connections-created metric does not behave as expected
[ https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524060#comment-16524060 ] Ismael Juma commented on KAFKA-6809: Yes. > connections-created metric does not behave as expected > -- > > Key: KAFKA-6809 > URL: https://issues.apache.org/jira/browse/KAFKA-6809 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.1.0, 1.1.2 > > > "connections-created" sensor is described as "new connections established". > It currently records only connections that the broker/client creates, but > does not count connections received. Seems like we should also count > connections received – either include them into this metric (and also clarify > the description) or add a new metric (separately counting two types of > connections). I am not sure how useful is to separate them, so I think we > should do the first approach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6964) Add ability to print all internal topic names
[ https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524058#comment-16524058 ] Jagadesh Adireddi commented on KAFKA-6964: -- Got it [~bbejeck]... Should we skip this feature for now and close?. > Add ability to print all internal topic names > - > > Key: KAFKA-6964 > URL: https://issues.apache.org/jira/browse/KAFKA-6964 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bill Bejeck >Assignee: Jagadesh Adireddi >Priority: Major > Labels: needs-kip > > For security access reasons some streams users need to build all internal > topics before deploying their streams application. While it's possible to > get all internal topic names from the {{Topology#describe()}} method, it > would be nice to have a separate method that prints out only the internal > topic names to ease the process. > I think this change will require a KIP, so I've added the appropriate label. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524049#comment-16524049 ] Shun Guan commented on KAFKA-6788: -- Is this story closed? I see there are code commits and pr but doesn't seem like anyone accepts the pr [~guozhang] > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6809) connections-created metric does not behave as expected
[ https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523985#comment-16523985 ] Stanislav Kozlovski commented on KAFKA-6809: I'm not sure I understand what the appropriate approach would be in that case. You're saying that a `Selector` would never be used to receive and create connections at once? In that case, this means that `Selector`s which are used for receiving connections show empty connections metrics. Is that so? > connections-created metric does not behave as expected > -- > > Key: KAFKA-6809 > URL: https://issues.apache.org/jira/browse/KAFKA-6809 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.1.0, 1.1.2 > > > "connections-created" sensor is described as "new connections established". > It currently records only connections that the broker/client creates, but > does not count connections received. Seems like we should also count > connections received – either include them into this metric (and also clarify > the description) or add a new metric (separately counting two types of > connections). I am not sure how useful is to separate them, so I think we > should do the first approach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica
[ https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523970#comment-16523970 ] ASF GitHub Bot commented on KAFKA-6949: --- lindong28 closed pull request #5293: KAFKA-6949; alterReplicaLogDirs() should grab partition lock when accessing log of the future replica URL: https://github.com/apache/kafka/pull/5293 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9ab1ec47af8..b80c34475d3 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -149,10 +149,10 @@ class Partition(val topic: String, * @return true iff the future replica is created */ def maybeCreateFutureReplica(logDir: String): Boolean = { -// The readLock is needed to make sure that while the caller checks the log directory of the +// The writeLock is needed to make sure that while the caller checks the log directory of the // current replica and the existence of the future replica, no other thread can update the log directory of the // current replica or remove the future replica. -inReadLock(leaderIsrUpdateLock) { +inWriteLock(leaderIsrUpdateLock) { val currentReplica = getReplica().get if (currentReplica.log.get.dir.getParent == logDir) false @@ -207,29 +207,52 @@ class Partition(val topic: String, allReplicasMap.remove(replicaId) } - def removeFutureLocalReplica() { + def futureReplicaDirChanged(newDestinationDir: String): Boolean = { +inReadLock(leaderIsrUpdateLock) { + getReplica(Request.FutureLocalReplicaId) match { +case Some(futureReplica) => + if (futureReplica.log.get.dir.getParent != newDestinationDir) +true + else +false +case None => false + } +} + } + + def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) { inWriteLock(leaderIsrUpdateLock) { allReplicasMap.remove(Request.FutureLocalReplicaId) + if (deleteFromLogDir) +logManager.asyncDelete(topicPartition, isFuture = true) } } - // Return true iff the future log has caught up with the current log for this partition + // Return true iff the future replica exists and it has caught up with the current replica for this partition // Only ReplicaAlterDirThread will call this method and ReplicaAlterDirThread should remove the partition // from its partitionStates if this method returns true def maybeReplaceCurrentWithFutureReplica(): Boolean = { val replica = getReplica().get -val futureReplica = getReplica(Request.FutureLocalReplicaId).get -if (replica.logEndOffset == futureReplica.logEndOffset) { +val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset) +if (futureReplicaLEO.contains(replica.logEndOffset)) { // The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the // current replica, no other thread can update LEO of the current replica via log truncation or log append operation. inWriteLock(leaderIsrUpdateLock) { -if (replica.logEndOffset == futureReplica.logEndOffset) { - logManager.replaceCurrentWithFutureLog(topicPartition) - replica.log = futureReplica.log - futureReplica.log = None - allReplicasMap.remove(Request.FutureLocalReplicaId) - true -} else false +getReplica(Request.FutureLocalReplicaId) match { + case Some(futureReplica) => +if (replica.logEndOffset == futureReplica.logEndOffset) { + logManager.replaceCurrentWithFutureLog(topicPartition) + replica.log = futureReplica.log + futureReplica.log = None + allReplicasMap.remove(Request.FutureLocalReplicaId) + true +} else false + case None => +// Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called +// In this case the partition should have been removed from state of the ReplicaAlterLogDirsThread +// Return false so that ReplicaAlterLogDirsThread does not have to remove this partition from the state again to avoid race condition +false +} } } else false } @@ -550,15 +573,22 @@ class Partition(val topic: String, } private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = { - if (isFuture) -
[jira] [Commented] (KAFKA-6809) connections-created metric does not behave as expected
[ https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523905#comment-16523905 ] Ismael Juma commented on KAFKA-6809: [~enether], these metrics exist on a per selector basis. The Selector used for receiving connections is never used for creating outbound connections. So, the scenario you outline wouldn't actually happen. Separating the metrics has compatibility implications and I'm not sure it's worth it in this case. > connections-created metric does not behave as expected > -- > > Key: KAFKA-6809 > URL: https://issues.apache.org/jira/browse/KAFKA-6809 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.1.0, 1.1.2 > > > "connections-created" sensor is described as "new connections established". > It currently records only connections that the broker/client creates, but > does not count connections received. Seems like we should also count > connections received – either include them into this metric (and also clarify > the description) or add a new metric (separately counting two types of > connections). I am not sure how useful is to separate them, so I think we > should do the first approach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6809) connections-created metric does not behave as expected
[ https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523885#comment-16523885 ] Stanislav Kozlovski commented on KAFKA-6809: Changing them to the first approach (count both inbound/outbound connections) might mess up some Kafka users' metrics - imagine watching your connections blow up in numbers over a patch update. I believe we should separate out the sensors and offer both metrics - inbound, outbound. This will enable the UIs to provide the three kinds of metrics (in, out, total). [~apovzner], what do you think? > connections-created metric does not behave as expected > -- > > Key: KAFKA-6809 > URL: https://issues.apache.org/jira/browse/KAFKA-6809 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.1.0, 1.1.2 > > > "connections-created" sensor is described as "new connections established". > It currently records only connections that the broker/client creates, but > does not count connections received. Seems like we should also count > connections received – either include them into this metric (and also clarify > the description) or add a new metric (separately counting two types of > connections). I am not sure how useful is to separate them, so I think we > should do the first approach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7091) AdminClient should handle FindCoordinatorResponse errors
[ https://issues.apache.org/jira/browse/KAFKA-7091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-7091: - Fix Version/s: (was: 2.0.0) 2.0.1 > AdminClient should handle FindCoordinatorResponse errors > > > Key: KAFKA-7091 > URL: https://issues.apache.org/jira/browse/KAFKA-7091 > Project: Kafka > Issue Type: Improvement >Reporter: Manikumar >Assignee: Manikumar >Priority: Major > Fix For: 2.0.1 > > > Currently KafkaAdminClient.deleteConsumerGroups, listConsumerGroupOffsets > method implementation ignoring FindCoordinatorResponse errors. This causes > admin client request timeouts incase of authorization errors. We should > handle these errors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6812) Async ConsoleProducer exits with 0 status even after data loss
[ https://issues.apache.org/jira/browse/KAFKA-6812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523867#comment-16523867 ] Stanislav Kozlovski commented on KAFKA-6812: I believe this current behavior is correct. You generally exit with a status code different of 0 when something unexpected happens. The console producer is given a `ErrorLoggingCallback` class, whose responsibility is to swallow errors and simply log them - https://github.com/apache/kafka/blob/581adb5013ed996705031ca8dd9e175c40675692/core/src/main/scala/kafka/tools/ConsoleProducer.scala#L75. I believe nothing unexpected has happened, therefore it should exit with 0 > Async ConsoleProducer exits with 0 status even after data loss > -- > > Key: KAFKA-6812 > URL: https://issues.apache.org/jira/browse/KAFKA-6812 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 1.1.0 >Reporter: Andras Beni >Assignee: Stanislav Kozlovski >Priority: Minor > > When {{ConsoleProducer}} is run without {{--sync}} flag and one of the > batches times out, {{ErrorLoggingCallback}} logs the error: > {code:java} > 18/04/21 04:23:01 WARN clients.NetworkClient: [Producer > clientId=console-producer] Connection to node 10 could not be established. > Broker may not be available. > 18/04/21 04:23:02 ERROR internals.ErrorLoggingCallback: Error when sending > message to topic my-topic with key: null, value: 8 bytes with error: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > my-topic-0: 1530 ms has passed since batch creation plus linger time{code} > However, the tool exits with status code 0. > In my opinion the tool should indicate in the exit status that there was > data lost. Maybe it's reasonable to exit after the first error. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-6964) Add ability to print all internal topic names
[ https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6964: --- Comment: was deleted (was: I did not double-check the code, but in general it is ok to use `InternalTopologyBuilder`. If (1) or (2) is better, I don't know atm. Maybe you try out both approaches to see which one works better and than open PR for the better one?) > Add ability to print all internal topic names > - > > Key: KAFKA-6964 > URL: https://issues.apache.org/jira/browse/KAFKA-6964 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bill Bejeck >Assignee: Jagadesh Adireddi >Priority: Major > Labels: needs-kip > > For security access reasons some streams users need to build all internal > topics before deploying their streams application. While it's possible to > get all internal topic names from the {{Topology#describe()}} method, it > would be nice to have a separate method that prints out only the internal > topic names to ease the process. > I think this change will require a KIP, so I've added the appropriate label. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6964) Add ability to print all internal topic names
[ https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523853#comment-16523853 ] Bill Bejeck commented on KAFKA-6964: [~jadireddi] I was thinking along the lines of printing the contents of {{InternalTopologyBuilder#internalTopicNames}} but **having it exposed on the {{KafkaStreams}} object something like {{KafkaStreams#printInternalTopicNames()}}. But before you start working on this Jira, with KIP-290 (ACL prefixes) will be in the 2.0 release meaning users can now grant ACLs allowing to create topics prefixed with the application-id (my-application-id*) of a Kafka Streams application, reducing the need for this feature. HTH, Bill > Add ability to print all internal topic names > - > > Key: KAFKA-6964 > URL: https://issues.apache.org/jira/browse/KAFKA-6964 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bill Bejeck >Assignee: Jagadesh Adireddi >Priority: Major > Labels: needs-kip > > For security access reasons some streams users need to build all internal > topics before deploying their streams application. While it's possible to > get all internal topic names from the {{Topology#describe()}} method, it > would be nice to have a separate method that prints out only the internal > topic names to ease the process. > I think this change will require a KIP, so I've added the appropriate label. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7099) KafkaLog4jAppender - not sending any message with level DEBUG
[ https://issues.apache.org/jira/browse/KAFKA-7099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523849#comment-16523849 ] Manikumar commented on KAFKA-7099: -- Existing issue: KAFKA-6415. This temporarily handled in KAFKA-6415 by changing the log level to WARN. > KafkaLog4jAppender - not sending any message with level DEBUG > - > > Key: KAFKA-7099 > URL: https://issues.apache.org/jira/browse/KAFKA-7099 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.2.0 >Reporter: Vincent Lebreil >Priority: Major > > KafkaLog4jAppender can be stuck if it is defined at root category with level > DEBUG > {{log4j.rootLogger=DEBUG, kafka}} > {{log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender}} > {quote}DEBUG org.apache.kafka.clients.producer.KafkaProducer:131 - Exception > occurred during message send: > org.apache.kafka.common.errors.TimeoutException: Failed to update metadata > after 6 ms. > {quote} > KafkaLog4jAppender is using a KafkaProducer using itself Log4j with messages > at levels TRACE and DEBUG. The appender used in this case is also the > KafkaLog4jAppender. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7099) KafkaLog4jAppender - not sending any message with level DEBUG
[ https://issues.apache.org/jira/browse/KAFKA-7099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523849#comment-16523849 ] Manikumar edited comment on KAFKA-7099 at 6/26/18 3:10 PM: --- Existing issue: KAFKA-6415. This was temporarily handled in KAFKA-6415 by changing the log level to WARN. was (Author: omkreddy): Existing issue: KAFKA-6415. This temporarily handled in KAFKA-6415 by changing the log level to WARN. > KafkaLog4jAppender - not sending any message with level DEBUG > - > > Key: KAFKA-7099 > URL: https://issues.apache.org/jira/browse/KAFKA-7099 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.2.0 >Reporter: Vincent Lebreil >Priority: Major > > KafkaLog4jAppender can be stuck if it is defined at root category with level > DEBUG > {{log4j.rootLogger=DEBUG, kafka}} > {{log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender}} > {quote}DEBUG org.apache.kafka.clients.producer.KafkaProducer:131 - Exception > occurred during message send: > org.apache.kafka.common.errors.TimeoutException: Failed to update metadata > after 6 ms. > {quote} > KafkaLog4jAppender is using a KafkaProducer using itself Log4j with messages > at levels TRACE and DEBUG. The appender used in this case is also the > KafkaLog4jAppender. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7100) kafka.tools.GetOffsetShell with enable Kerberos Security on Kafka1.0
[ https://issues.apache.org/jira/browse/KAFKA-7100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-7100. -- Resolution: Duplicate This is being tracked in KAFKA-5235. > kafka.tools.GetOffsetShell with enable Kerberos Security on Kafka1.0 > - > > Key: KAFKA-7100 > URL: https://issues.apache.org/jira/browse/KAFKA-7100 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.1 >Reporter: abdullah toraman >Priority: Major > > Hi All, > I enabled the Kerberos Authentication on Kafka 1.0. When I try to > kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list : > --topic --time -1, It hits error. Here are the Error, > [kafka@KLBIRKFKI1 kafka_2.11-1.0.1]$ bin/kafka-run-class.sh > kafka.tools.GetOffsetShell --broker-list klbirkfki1:9092 --topic abdullah > --time -1 > [2018-06-26 12:59:00,058] WARN Fetching topic metadata with correlation id 0 > for topics [Set(abdullah)] from broker [BrokerEndPoint(0,klbirkfki1,9092)] > failed (kafka.client.ClientUtils$) > java.io.EOFException > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) > at > kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) > at kafka.producer.SyncProducer.send(SyncProducer.scala:124) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98) > at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79) > at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala) > Exception in thread "main" kafka.common.KafkaException: fetching topic > metadata for topics [Set(abdullah)] from broker > [ArrayBuffer(BrokerEndPoint(0,klbirkfki1,9092))] failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98) > at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79) > at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala) > Caused by: java.io.EOFException > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) > at > kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) > at kafka.producer.SyncProducer.send(SyncProducer.scala:124) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63) > ... 3 more > [kafka@KLBIRKFKI1 kafka_2.11-1.0.1]$ bin/kafka-run-class.sh > kafka.tools.GetOffsetShell --broker-list klbirkfki1:9092 --topic abdullah > --time -1 > [2018-06-26 12:59:00,058] WARN Fetching topic metadata with correlation id 0 > for topics [Set(abdullah)] from broker [BrokerEndPoint(0,klbirkfki1,9092)] > failed (kafka.client.ClientUtils$) > java.io.EOFException > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) > at > kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) > at kafka.producer.SyncProducer.send(SyncProducer.scala:124) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98) > at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79) > at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala) > Exception in thread "main" kafka.common.KafkaException: fetching topic > metadata for topics [Set(abdullah)] from broker > [ArrayBuffer(BrokerEndPoint(0,klbirkfki1,9092))] failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98) > at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79) > at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala) > Caused by: java.io.EOFException > at >
[jira] [Commented] (KAFKA-7028) super.users doesn't work with custom principals
[ https://issues.apache.org/jira/browse/KAFKA-7028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523839#comment-16523839 ] Stanislav Kozlovski commented on KAFKA-7028: After investigation, me and [~rsivaram] found out that apart from {code} if (superUsers.contains(principal)) { {code} the ACL checking logic {code} (acl.principal == principal || acl.principal == Acl.WildCardPrincipal) && {code} under `SimpleAclAuthorizer#aclMatch` also calls the `equals()` method of the `KafkaPrincipal` class, which always returns `false` when given a subclass of `KafkaPrincipal`, because it checks if the classes are equal. This currently means that no custom principals can authorize at all. Changing the `equals()` method on the base `KafkaPrincipal` to check for subclasses would not work, as `subClass.equals(baseClass)` would return true but `baseClass.equals(subClass)` would return false. An alternative that could work but does not sound good to me is checking strings. Either way, I assume a fix would require a breaking change and therefore a KIP. [~ijuma], what do you think? > super.users doesn't work with custom principals > --- > > Key: KAFKA-7028 > URL: https://issues.apache.org/jira/browse/KAFKA-7028 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.1.0 > > > SimpleAclAuthorizer creates a KafkaPrincipal for the users defined in the > super.users broker config. However, it should use the configured > KafkaPrincipalBuilder so that it works with a custom defined one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7099) KafkaLog4jAppender - not sending any message with level DEBUG
[ https://issues.apache.org/jira/browse/KAFKA-7099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523836#comment-16523836 ] Bastien Bussy commented on KAFKA-7099: -- Hello, I am a colleague of Vincent. *Disclaimer* : we work on hadoop (HDP-2.6.4) with *log4j 1.2.17* After some digging, the problem is caused by the "kafka-producer-network-thread" which is in charge of updating the metadata. In this process, the Sender executes a poll on the NetworkClient, which executes the maybeUpdate method. In this method, there is a call to log.debug but there is a synchronized mecanism in log4j.Category.callAppenders. So, *there is a thread deadlock* between the main thread where we want to use the Kafka Appender and the network-thread. This is the stacktrace from the network-thread : {code:java} at org.apache.log4j.Category.callAppenders:204 at org.apache.log4j.Category.forcedLog:391 at org.apache.log4j.Category.log:856 at org.slf4j.impl.Log4jLoggerAdapter.debug:230 at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate:644 at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate:552 at org.apache.kafka.clients.NetworkClient.poll:258 at org.apache.kafka.clients.producer.internals.Sender.run:236 at org.apache.kafka.clients.producer.internals.Sender.run:135 at java.lang.Thread.run:748 {code} To fix this issue temporarly, we force the level on logger _org.apache.kafka_ to INFO when we have the root logger in DEBUG mode. I think this is a common issue in Log4j 1.2. Do you have any recommendations? > KafkaLog4jAppender - not sending any message with level DEBUG > - > > Key: KAFKA-7099 > URL: https://issues.apache.org/jira/browse/KAFKA-7099 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.2.0 >Reporter: Vincent Lebreil >Priority: Major > > KafkaLog4jAppender can be stuck if it is defined at root category with level > DEBUG > {{log4j.rootLogger=DEBUG, kafka}} > {{log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender}} > {quote}DEBUG org.apache.kafka.clients.producer.KafkaProducer:131 - Exception > occurred during message send: > org.apache.kafka.common.errors.TimeoutException: Failed to update metadata > after 6 ms. > {quote} > KafkaLog4jAppender is using a KafkaProducer using itself Log4j with messages > at levels TRACE and DEBUG. The appender used in this case is also the > KafkaLog4jAppender. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6964) Add ability to print all internal topic names
[ https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523810#comment-16523810 ] Jagadesh Adireddi edited comment on KAFKA-6964 at 6/26/18 2:38 PM: --- [~mjsax], After revisiting code, got 2 points in mind. Can you please help me in understanding, which one is valid. 1) As described in the ticket , we need to print only "internal topic". Does this mean we need to expose InternalTopologyBuilder#internalTopicNames Set, that got added through `InternalTopologyBuilder#addInternalTopic` 2) As mentioned in your comment, repartition topic and changelog topic constitutes internal topics. Can we call `InternalTopologyBuilder#topicGroups` and read `InternalTopicConfig#name` field from both repartitionTopics, stateChangelogTopics and print them. was (Author: adireddijagad...@gmail.com): [~mjsax], After revisiting code, got 2 points in mind. Can you please help me in understanding, which one is valid. 1) As described in the ticket , we need to print only "internal topic". Does this mean we need to expose InternalTopologyBuilder#internalTopicNames Set, that got added through `InternalTopologyBuilder#addInternalTopic` 2) As mentioned in your comment, repartition topic and changelog topic constitutes internal topics. Can we call `InternalTopologyBuilder#topicGroups` and read 'InternalTopicConfig name' filed from both repartitionTopics, stateChangelogTopics and print them. > Add ability to print all internal topic names > - > > Key: KAFKA-6964 > URL: https://issues.apache.org/jira/browse/KAFKA-6964 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bill Bejeck >Assignee: Jagadesh Adireddi >Priority: Major > Labels: needs-kip > > For security access reasons some streams users need to build all internal > topics before deploying their streams application. While it's possible to > get all internal topic names from the {{Topology#describe()}} method, it > would be nice to have a separate method that prints out only the internal > topic names to ease the process. > I think this change will require a KIP, so I've added the appropriate label. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6964) Add ability to print all internal topic names
[ https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523810#comment-16523810 ] Jagadesh Adireddi commented on KAFKA-6964: -- [~mjsax], After revisiting code, got 2 points in mind. Can you please help me in understanding, which one is valid. 1) As described in the ticket , we need to print only "internal topic". Does this mean we need to expose InternalTopologyBuilder#internalTopicNames Set, that got added through `InternalTopologyBuilder#addInternalTopic` 2) As mentioned in your comment, repartition topic and changelog topic constitutes internal topics. Can we call `InternalTopologyBuilder#topicGroups` and read 'InternalTopicConfig name' filed from both repartitionTopics, stateChangelogTopics and print them. > Add ability to print all internal topic names > - > > Key: KAFKA-6964 > URL: https://issues.apache.org/jira/browse/KAFKA-6964 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Bill Bejeck >Assignee: Jagadesh Adireddi >Priority: Major > Labels: needs-kip > > For security access reasons some streams users need to build all internal > topics before deploying their streams application. While it's possible to > get all internal topic names from the {{Topology#describe()}} method, it > would be nice to have a separate method that prints out only the internal > topic names to ease the process. > I think this change will require a KIP, so I've added the appropriate label. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7100) kafka.tools.GetOffsetShell with enable Kerberos Security on Kafka1.0
abdullah toraman created KAFKA-7100: --- Summary: kafka.tools.GetOffsetShell with enable Kerberos Security on Kafka1.0 Key: KAFKA-7100 URL: https://issues.apache.org/jira/browse/KAFKA-7100 Project: Kafka Issue Type: Bug Affects Versions: 1.0.1 Reporter: abdullah toraman Hi All, I enabled the Kerberos Authentication on Kafka 1.0. When I try to kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list : --topic --time -1, It hits error. Here are the Error, [kafka@KLBIRKFKI1 kafka_2.11-1.0.1]$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list klbirkfki1:9092 --topic abdullah --time -1 [2018-06-26 12:59:00,058] WARN Fetching topic metadata with correlation id 0 for topics [Set(abdullah)] from broker [BrokerEndPoint(0,klbirkfki1,9092)] failed (kafka.client.ClientUtils$) java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98) at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79) at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala) Exception in thread "main" kafka.common.KafkaException: fetching topic metadata for topics [Set(abdullah)] from broker [ArrayBuffer(BrokerEndPoint(0,klbirkfki1,9092))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98) at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79) at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala) Caused by: java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63) ... 3 more [kafka@KLBIRKFKI1 kafka_2.11-1.0.1]$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list klbirkfki1:9092 --topic abdullah --time -1 [2018-06-26 12:59:00,058] WARN Fetching topic metadata with correlation id 0 for topics [Set(abdullah)] from broker [BrokerEndPoint(0,klbirkfki1,9092)] failed (kafka.client.ClientUtils$) java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98) at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79) at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala) Exception in thread "main" kafka.common.KafkaException: fetching topic metadata for topics [Set(abdullah)] from broker [ArrayBuffer(BrokerEndPoint(0,klbirkfki1,9092))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98) at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79) at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala) Caused by: java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82) at
[jira] [Updated] (KAFKA-7093) Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0
[ https://issues.apache.org/jira/browse/KAFKA-7093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Suleyman updated KAFKA-7093: Issue Type: Bug (was: New Feature) > Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0 > > > Key: KAFKA-7093 > URL: https://issues.apache.org/jira/browse/KAFKA-7093 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.1.0 >Reporter: Suleyman >Priority: Major > > I upgraded to kafka version from 0.11.0.1 to 1.1.0. After the upgrade, I'm > getting the below warn message too much. > WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. > This implies messages have arrived out of order. New: \{epoch:0, > offset:793868383}, Current: \{epoch:4, offset:792201264} for Partition: > __consumer_offsets-42 (kafka.server.epoch.LeaderEpochFileCache) > How can I resolve this warn messages? And why I'm getting this warn messages? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init
[ https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523725#comment-16523725 ] Lukasz Gluchowski commented on KAFKA-7088: -- Sorry but DEBUG is still to verbose. It is impacting resources used by the application > Kafka streams thread waits infinitely on transaction init > - > > Key: KAFKA-7088 > URL: https://issues.apache.org/jira/browse/KAFKA-7088 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 > Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 > 20:07:43 UTC 2018 > kafka-streams (client) 1.0.1 > kafka broker 1.1.0 > Java version: > OpenJDK Runtime Environment (build 1.8.0_171-b10) > OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode) > kakfa config overrides: > num.stream.threads: 6 > session.timeout.ms: 1 > request.timeout.ms: 11000 > fetch.max.wait.ms: 500 > max.poll.records: 1000 > topic has 24 partitions >Reporter: Lukasz Gluchowski >Priority: Major > Labels: eos > > A kafka stream application thread stops processing without any feedback. The > topic has 24 partitions and I noticed that processing stopped only for some > partitions. I will describe what happened to partition:10. The application is > still running (now for about 8 hours) and that thread is hanging there and no > rebalancing that took place. > There is no error (we have a custom `Thread.UncaughtExceptionHandler` which > was not called). I noticed that after couple of minutes stream stopped > processing (at offset 32606948 where log-end-offset is 33472402). > Broker itself is not reporting any active consumer in that consumer group and > the only info I was able to gather was from thread dump: > {code:java} > "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33" > #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition > [0x7fe0215d4000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfec6a2f8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50) > at > org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) > at > org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code} > > I tried restarting application once but the situation repeated. Thread read > some data, committed offset and
[jira] [Created] (KAFKA-7099) KafkaLog4jAppender - not sending any message with level DEBUG
Vincent Lebreil created KAFKA-7099: -- Summary: KafkaLog4jAppender - not sending any message with level DEBUG Key: KAFKA-7099 URL: https://issues.apache.org/jira/browse/KAFKA-7099 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.10.2.0 Reporter: Vincent Lebreil KafkaLog4jAppender can be stuck if it is defined at root category with level DEBUG {{log4j.rootLogger=DEBUG, kafka}} {{log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender}} {quote}DEBUG org.apache.kafka.clients.producer.KafkaProducer:131 - Exception occurred during message send: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. {quote} KafkaLog4jAppender is using a KafkaProducer using itself Log4j with messages at levels TRACE and DEBUG. The appender used in this case is also the KafkaLog4jAppender. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka
[ https://issues.apache.org/jira/browse/KAFKA-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523439#comment-16523439 ] Aldo Sinanaj edited comment on KAFKA-7095 at 6/26/18 12:26 PM: --- Hi [~hachikuji] thanks for replying. The consumer configuration for this property is: _auto.offset.reset = earliest_ Since the consumer is part of a consumer group I want to consume from the earliest not committed message. With anonymous consumers I don't have this issue since the property is _auto.offset.reset = latest_ Any idea how to avoid my problem? Thanks was (Author: aldex32): Hi [~hachikuji] thanks for replying. The consumer configuration for this property is: _auto.offset.reset = earliest_ So should I set this to _latest_ in order to continue reading without restarting the consumer? Thanks > Low traffic consumer is not consuming messages after the offsets is deleted > by Kafka > > > Key: KAFKA-7095 > URL: https://issues.apache.org/jira/browse/KAFKA-7095 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 >Reporter: Aldo Sinanaj >Priority: Minor > > Hello guys. > I have a low traffic consumers for a given consumer group and I have the > default broker setting for this property *offsets.retention.minutes*. So if a > messages is coming after 2 days and Kafka has deleted the offset for that > given consumer, then the consumer will not consume the new incoming messages. > If I restart the application it will consume from the earliest which is > obvious since the offset is deleted. > My question is why it doesn't consume the new messages if I don't restart the > application? And how does this version of Kafka understands if a consumer is > active or inactive? Is my consumer considered inactive in this case? > Thanks, > Aldo -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka
[ https://issues.apache.org/jira/browse/KAFKA-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523439#comment-16523439 ] Aldo Sinanaj edited comment on KAFKA-7095 at 6/26/18 12:06 PM: --- Hi [~hachikuji] thanks for replying. The consumer configuration for this property is: _auto.offset.reset = earliest_ So should I set this to _latest_ in order to continue reading without restarting the consumer? Thanks was (Author: aldex32): Hi [~hachikuji] thanks for replying. The consumer configuration for this property is: _auto.offset.reset = latest_ Any idea? > Low traffic consumer is not consuming messages after the offsets is deleted > by Kafka > > > Key: KAFKA-7095 > URL: https://issues.apache.org/jira/browse/KAFKA-7095 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 >Reporter: Aldo Sinanaj >Priority: Minor > > Hello guys. > I have a low traffic consumers for a given consumer group and I have the > default broker setting for this property *offsets.retention.minutes*. So if a > messages is coming after 2 days and Kafka has deleted the offset for that > given consumer, then the consumer will not consume the new incoming messages. > If I restart the application it will consume from the earliest which is > obvious since the offset is deleted. > My question is why it doesn't consume the new messages if I don't restart the > application? And how does this version of Kafka understands if a consumer is > active or inactive? Is my consumer considered inactive in this case? > Thanks, > Aldo -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4513) Support migration of old consumers to new consumers without downtime
[ https://issues.apache.org/jira/browse/KAFKA-4513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523625#comment-16523625 ] Manikumar commented on KAFKA-4513: -- [~ijuma] Can we close this JIRA? > Support migration of old consumers to new consumers without downtime > > > Key: KAFKA-4513 > URL: https://issues.apache.org/jira/browse/KAFKA-4513 > Project: Kafka > Issue Type: New Feature >Reporter: Ismael Juma >Assignee: Onur Karaman >Priority: Major > > Some ideas were discussed in the following thread: > http://markmail.org/message/ovngfw3ibixlquxh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6211) PartitionAssignmentState is protected whereas KafkaConsumerGroupService.describeGroup() returns PartitionAssignmentState Object
[ https://issues.apache.org/jira/browse/KAFKA-6211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523616#comment-16523616 ] Manikumar edited comment on KAFKA-6211 at 6/26/18 11:56 AM: ConsumerGroupCommand internal classes/methods are not public API. You are encouraged to use kafka AdminClient consumer group management methods. was (Author: omkreddy): ConsumerGroupCommand internal classes/methods are not public API. You encouraged to use kafka AdminClient consumer group management methods. > PartitionAssignmentState is protected whereas > KafkaConsumerGroupService.describeGroup() returns PartitionAssignmentState > Object > --- > > Key: KAFKA-6211 > URL: https://issues.apache.org/jira/browse/KAFKA-6211 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, > 0.10.2.1, 0.11.0.0, 0.11.0.1, 1.0.0 >Reporter: Subhransu Acharya >Priority: Critical > Labels: patch > Attachments: a.png > > Original Estimate: 1m > Remaining Estimate: 1m > > KafkaConsumerGroupService.describeGroup() is returning > Tuple2, Option>> but > ConsumerGroupCommand has PartitionAssignmentState as a protected class inside > it. > There is no way to create an instance of PartitionAssignmentState. > make it default in order to use the describe command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6211) PartitionAssignmentState is protected whereas KafkaConsumerGroupService.describeGroup() returns PartitionAssignmentState Object
[ https://issues.apache.org/jira/browse/KAFKA-6211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523616#comment-16523616 ] Manikumar commented on KAFKA-6211: -- ConsumerGroupCommand internal classes/methods are not public API. You encouraged to use kafka AdminClient consumer group management methods. > PartitionAssignmentState is protected whereas > KafkaConsumerGroupService.describeGroup() returns PartitionAssignmentState > Object > --- > > Key: KAFKA-6211 > URL: https://issues.apache.org/jira/browse/KAFKA-6211 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, > 0.10.2.1, 0.11.0.0, 0.11.0.1, 1.0.0 >Reporter: Subhransu Acharya >Priority: Critical > Labels: patch > Attachments: a.png > > Original Estimate: 1m > Remaining Estimate: 1m > > KafkaConsumerGroupService.describeGroup() is returning > Tuple2, Option>> but > ConsumerGroupCommand has PartitionAssignmentState as a protected class inside > it. > There is no way to create an instance of PartitionAssignmentState. > make it default in order to use the describe command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5079) ProducerBounceTest fails occasionally with a SocketTimeoutException
[ https://issues.apache.org/jira/browse/KAFKA-5079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-5079. -- Resolution: Fixed ProducerBounceTest is removed as part old consumer changes. > ProducerBounceTest fails occasionally with a SocketTimeoutException > --- > > Key: KAFKA-5079 > URL: https://issues.apache.org/jira/browse/KAFKA-5079 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta >Priority: Major > > {noformat} > java.net.SocketTimeoutException > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:85) > at > kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:100) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:84) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:133) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:133) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:133) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:32) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:132) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:132) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:132) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:32) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:131) > at > kafka.api.ProducerBounceTest$$anonfun$2.apply(ProducerBounceTest.scala:116) > at > kafka.api.ProducerBounceTest$$anonfun$2.apply(ProducerBounceTest.scala:113) > {noformat} > This is expected occasionally, since the ports are preallocated and the > brokers are bounced in quick succession. Here is the relevant comment from > the code: > {noformat} > // This is the one of the few tests we currently allow to preallocate > ports, despite the fact that this can result in transient > // failures due to ports getting reused. We can't use random ports because > of bad behavior that can result from bouncing > // brokers too quickly when they get new, random ports. If we're not > careful, the client can end up in a situation > // where metadata is not refreshed quickly enough, and by the time it's > actually trying to, all the servers have > // been bounced and have new addresses. None of the bootstrap nodes or > current metadata can get them connected to a > // running server. > // > // Since such quick rotation of servers is incredibly unrealistic, we allow > this one test to preallocate ports, leaving > // a small risk of hitting errors due to port conflicts. Hopefully this is > infrequent enough to not cause problems. > {noformat} > We should try to look into handling this exception better so that the test > doesn't fail occasionally. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5080) Convert ProducerBounceTest to use the new KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-5080. -- Resolution: Fixed ProducerBounceTest is removed as part of old consumer changes. > Convert ProducerBounceTest to use the new KafkaConsumer > --- > > Key: KAFKA-5080 > URL: https://issues.apache.org/jira/browse/KAFKA-5080 > Project: Kafka > Issue Type: Sub-task >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > > In KAFKA-5079, the `SocketTimeoutException` indicates that the consumer is > stuck waiting for a response from a particular broker, even after the broker > has been bounced. Since a given broker in that test always uses the same port > is is possible that this is a symptom of a bug in the SimpleConsumer where it > doesn't detect the bounce (and hence disconnect), causing it to time out. > We should use the new consumer to rule out a client bug, or fix it if it also > exists in the new consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523582#comment-16523582 ] Steven Aerts commented on KAFKA-7026: - [~nperiwal] looking at the code of the {{RangeAssignor}}, I do not think this is the same issue, as this topic does not have any state. Do you have a dump of the consumer groups as I did above? So we can have an idea if it follows a similar pattern? > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.1.0 > > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6809) connections-created metric does not behave as expected
[ https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski reassigned KAFKA-6809: -- Assignee: Stanislav Kozlovski > connections-created metric does not behave as expected > -- > > Key: KAFKA-6809 > URL: https://issues.apache.org/jira/browse/KAFKA-6809 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.1.0, 1.1.2 > > > "connections-created" sensor is described as "new connections established". > It currently records only connections that the broker/client creates, but > does not count connections received. Seems like we should also count > connections received – either include them into this metric (and also clarify > the description) or add a new metric (separately counting two types of > connections). I am not sure how useful is to separate them, so I think we > should do the first approach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-2488) System tests: updated console_consumer.py to support new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-2488. -- Resolution: Fixed Support was added in > 0.10.1 Kafka versions. > System tests: updated console_consumer.py to support new consumer > - > > Key: KAFKA-2488 > URL: https://issues.apache.org/jira/browse/KAFKA-2488 > Project: Kafka > Issue Type: Bug >Reporter: Geoff Anderson >Assignee: Geoff Anderson >Priority: Major > > Console consumer now supports new consumer > Update console_consumer.py to allow this as well -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-2215) Improve Randomness for ConsoleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-2215. -- Resolution: Not A Problem Closing inactive issue. Also the default for console consumer's enable.auto.commit is set to false for auto-generated group Ids. > Improve Randomness for ConsoleConsumer > -- > > Key: KAFKA-2215 > URL: https://issues.apache.org/jira/browse/KAFKA-2215 > Project: Kafka > Issue Type: Bug >Reporter: Fabian Lange >Priority: Major > > Right now the console consumer does a new Random().nextInt(100_000) > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L123 > I would propose to use UUID.randomUUID().toString() instead. > I know this is quite edgy, but Random has shown its quirks from time to time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka
[ https://issues.apache.org/jira/browse/KAFKA-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523439#comment-16523439 ] Aldo Sinanaj commented on KAFKA-7095: - Hi [~hachikuji] thanks for replying. The consumer configuration for this property is: _auto.offset.reset = latest_ Any idea? > Low traffic consumer is not consuming messages after the offsets is deleted > by Kafka > > > Key: KAFKA-7095 > URL: https://issues.apache.org/jira/browse/KAFKA-7095 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 >Reporter: Aldo Sinanaj >Priority: Minor > > Hello guys. > I have a low traffic consumers for a given consumer group and I have the > default broker setting for this property *offsets.retention.minutes*. So if a > messages is coming after 2 days and Kafka has deleted the offset for that > given consumer, then the consumer will not consume the new incoming messages. > If I restart the application it will consume from the earliest which is > obvious since the offset is deleted. > My question is why it doesn't consume the new messages if I don't restart the > application? And how does this version of Kafka understands if a consumer is > active or inactive? Is my consumer considered inactive in this case? > Thanks, > Aldo -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica
[ https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6949: -- [~lindong] You can cherry-pick and push the commit to 2.0 branch without a PR or further reviews if major changes were not required for cherry-picking (I am assuming that is the case here). We don't currently have any blockers for which we need to create a new RC. If this issue is a blocker for 2.0 and we need to create a new RC, then do let me know. Otherwise, I will change the fix version to 2.0.1 for now and change it back to 2.0 if we require another RC. Thanks. > alterReplicaLogDirs() should grab partition lock when accessing log of the > future replica > - > > Key: KAFKA-6949 > URL: https://issues.apache.org/jira/browse/KAFKA-6949 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Dong Lin >Priority: Blocker > Fix For: 2.0.0 > > > I found this in a failed execution of > kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like > we're missing some option checking. > {code} > [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while > changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76) > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576) > at > kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037) > at kafka.server.KafkaApis.handle(KafkaApis.scala:138) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6944) Add system tests testing the new throttling behavior using older clients/brokers
[ https://issues.apache.org/jira/browse/KAFKA-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523348#comment-16523348 ] ASF GitHub Bot commented on KAFKA-6944: --- jonlee2 opened a new pull request #5294: KAFKA-6944: Add system tests testing the new throttling behavior usin… URL: https://github.com/apache/kafka/pull/5294 …g older clients/brokers Added two additional test cases to quota_test.py, which run between brokers and clients with different throttling behaviors. More specifically, 1. clients with new throttling behavior (i.e., post-KIP-219) and brokers with old throttling behavior (i.e., pre-KIP-219) 2. clients with old throttling behavior and brokers with new throttling behavior ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add system tests testing the new throttling behavior using older > clients/brokers > > > Key: KAFKA-6944 > URL: https://issues.apache.org/jira/browse/KAFKA-6944 > Project: Kafka > Issue Type: Test > Components: system tests >Affects Versions: 2.0.0 >Reporter: Jon Lee >Priority: Major > > KAFKA-6028 (KIP-219) changes the throttling behavior on quota violation as > follows: > * the broker will send out a response with throttle time to the client > immediately and mute the channel > * upon receiving a response with a non-zero throttle time, the client will > also block sending further requests to the broker until the throttle time is > over. > The current system tests assume that both clients and brokers are of the same > version. We'll need an additional set of quota tests that test throttling > behavior between older clients and newer brokers and between newer clients > and older brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps
[ https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523343#comment-16523343 ] Odin Standal commented on KAFKA-6817: - Would it be possible to change transactional.id.expiration.ms to a long? While not a proper solution to the problem, at least that would enable us to reprocess old messages, which would greatly increase the value of Kafka for us. > UnknownProducerIdException when writing messages with old timestamps > > > Key: KAFKA-6817 > URL: https://issues.apache.org/jira/browse/KAFKA-6817 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.1.0 >Reporter: Odin Standal >Priority: Major > > We are seeing the following exception in our Kafka application: > {code:java} > ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer > due to the following error: org.apache.kafka.streams.errors.StreamsException: > task [0_0] Abort sending since an error caught with a previous record (key > 22 value some-value timestamp 1519200902670) to topic > exactly-once-test-topic- v2 due to This exception is raised by the broker if > it could not locate the producer metadata associated with the producerId in > question. This could happen if, for instance, the producer's records were > deleted because their retention time had elapsed. Once the last records of > the producerId are removed, the producer's metadata is removed from the > broker, and future appends by the producer will return this exception. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at > java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.kafka.common.errors.UnknownProducerIdException > {code} > We discovered this error when we had the need to reprocess old messages. See > more details on > [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827] > We have reproduced the error with a smaller example application. The error > occurs after 10 minutes of producing messages that have old timestamps (type > 1 year old). The topic we are writing to has a retention.ms set to 1 year so > we are expecting the messages to stay there. > After digging through the ProducerStateManager-code in the Kafka source code > we have a theory of what might be wrong. > The ProducerStateManager.removeExpiredProducers() seems to remove producers > from memory erroneously when processing records which are older than the > maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` > configuration), which is set by default to 7 days. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1
[ https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523303#comment-16523303 ] Dong Lin commented on KAFKA-7012: - Great to know that the fix works! > Performance issue upgrading to kafka 1.0.1 or 1.1 > - > > Key: KAFKA-7012 > URL: https://issues.apache.org/jira/browse/KAFKA-7012 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: rajadayalan perumalsamy >Assignee: Rajini Sivaram >Priority: Critical > Labels: regression > Fix For: 2.0.0, 1.0.2, 1.1.1 > > Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, > Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, > Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, > Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png > > > We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. > After upgrading 1 node on the cluster, we notice that network threads use > most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. > With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% > vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is > high depending on the number of network threads used. If networks threads is > set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 > then the cpu usage is around 450%(5 vcpus). Using the same kafka > server.properties for both. > Did further analysis with git bisect, couple of build and deploys, traced the > issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine > for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit > 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have > attached screenshots of profiling done with both the commits. Screenshot > Commit-f15cdbc91b-profile shows less cpu usage by network threads and > Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show > higher cpu usage(almost entire cpu usage) by network threads. Also noticed > that kafka.network.Processor.poll() method is invoked 10 times more with > commit 47ee8e954df62b9a79099e944ec4be29afe046f6. > We need the issue to be resolved to upgrade the cluster. Please let me know > if you need any additional information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica
[ https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523297#comment-16523297 ] Dong Lin commented on KAFKA-6949: - [~rsivaram] The patch has been merged into trunk. I opened PR [https://github.com/apache/kafka/pull/5293] to merge it into 2.0. Typically do we need review from another committer in order to cherry-pick patch from trunk into a release branch? Also, I added fix version 2.0.0 in this JIRA. Please feel free to change it as you see appropriate. Thanks. > alterReplicaLogDirs() should grab partition lock when accessing log of the > future replica > - > > Key: KAFKA-6949 > URL: https://issues.apache.org/jira/browse/KAFKA-6949 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Dong Lin >Priority: Blocker > Fix For: 2.0.0 > > > I found this in a failed execution of > kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like > we're missing some option checking. > {code} > [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while > changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76) > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576) > at > kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037) > at kafka.server.KafkaApis.handle(KafkaApis.scala:138) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica
[ https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-6949: Fix Version/s: 2.0.0 > alterReplicaLogDirs() should grab partition lock when accessing log of the > future replica > - > > Key: KAFKA-6949 > URL: https://issues.apache.org/jira/browse/KAFKA-6949 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Dong Lin >Priority: Blocker > Fix For: 2.0.0 > > > I found this in a failed execution of > kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like > we're missing some option checking. > {code} > [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while > changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76) > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576) > at > kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037) > at kafka.server.KafkaApis.handle(KafkaApis.scala:138) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica
[ https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523295#comment-16523295 ] ASF GitHub Bot commented on KAFKA-6949: --- lindong28 opened a new pull request #5293: KAFKA-6949; alterReplicaLogDirs() should grab partition lock when accessing log of the future replica URL: https://github.com/apache/kafka/pull/5293 NoSuchElementException will be thrown if ReplicaAlterDirThread replaces the current replica with future replica right before the request handler thread executes `futureReplica.log.get.dir.getParent` in the ReplicaManager.alterReplicaLogDirs(). The solution is to grab the partition lock when request handler thread attempts to check the destination log directory of the future replica. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > alterReplicaLogDirs() should grab partition lock when accessing log of the > future replica > - > > Key: KAFKA-6949 > URL: https://issues.apache.org/jira/browse/KAFKA-6949 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Dong Lin >Priority: Blocker > Fix For: 2.0.0 > > > I found this in a failed execution of > kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like > we're missing some option checking. > {code} > [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while > changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76) > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576) > at > kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037) > at kafka.server.KafkaApis.handle(KafkaApis.scala:138) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica
[ https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523292#comment-16523292 ] ASF GitHub Bot commented on KAFKA-6949: --- lindong28 closed pull request #5081: KAFKA-6949; alterReplicaLogDirs() should grab partition lock when accessing log of the future replica URL: https://github.com/apache/kafka/pull/5081 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9ab1ec47af8..b80c34475d3 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -149,10 +149,10 @@ class Partition(val topic: String, * @return true iff the future replica is created */ def maybeCreateFutureReplica(logDir: String): Boolean = { -// The readLock is needed to make sure that while the caller checks the log directory of the +// The writeLock is needed to make sure that while the caller checks the log directory of the // current replica and the existence of the future replica, no other thread can update the log directory of the // current replica or remove the future replica. -inReadLock(leaderIsrUpdateLock) { +inWriteLock(leaderIsrUpdateLock) { val currentReplica = getReplica().get if (currentReplica.log.get.dir.getParent == logDir) false @@ -207,29 +207,52 @@ class Partition(val topic: String, allReplicasMap.remove(replicaId) } - def removeFutureLocalReplica() { + def futureReplicaDirChanged(newDestinationDir: String): Boolean = { +inReadLock(leaderIsrUpdateLock) { + getReplica(Request.FutureLocalReplicaId) match { +case Some(futureReplica) => + if (futureReplica.log.get.dir.getParent != newDestinationDir) +true + else +false +case None => false + } +} + } + + def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) { inWriteLock(leaderIsrUpdateLock) { allReplicasMap.remove(Request.FutureLocalReplicaId) + if (deleteFromLogDir) +logManager.asyncDelete(topicPartition, isFuture = true) } } - // Return true iff the future log has caught up with the current log for this partition + // Return true iff the future replica exists and it has caught up with the current replica for this partition // Only ReplicaAlterDirThread will call this method and ReplicaAlterDirThread should remove the partition // from its partitionStates if this method returns true def maybeReplaceCurrentWithFutureReplica(): Boolean = { val replica = getReplica().get -val futureReplica = getReplica(Request.FutureLocalReplicaId).get -if (replica.logEndOffset == futureReplica.logEndOffset) { +val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset) +if (futureReplicaLEO.contains(replica.logEndOffset)) { // The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the // current replica, no other thread can update LEO of the current replica via log truncation or log append operation. inWriteLock(leaderIsrUpdateLock) { -if (replica.logEndOffset == futureReplica.logEndOffset) { - logManager.replaceCurrentWithFutureLog(topicPartition) - replica.log = futureReplica.log - futureReplica.log = None - allReplicasMap.remove(Request.FutureLocalReplicaId) - true -} else false +getReplica(Request.FutureLocalReplicaId) match { + case Some(futureReplica) => +if (replica.logEndOffset == futureReplica.logEndOffset) { + logManager.replaceCurrentWithFutureLog(topicPartition) + replica.log = futureReplica.log + futureReplica.log = None + allReplicasMap.remove(Request.FutureLocalReplicaId) + true +} else false + case None => +// Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called +// In this case the partition should have been removed from state of the ReplicaAlterLogDirsThread +// Return false so that ReplicaAlterLogDirsThread does not have to remove this partition from the state again to avoid race condition +false +} } } else false } @@ -550,15 +573,22 @@ class Partition(val topic: String, } private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = { - if (isFuture) -