[jira] [Comment Edited] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-26 Thread Narayan Periwal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524543#comment-16524543
 ] 

Narayan Periwal edited comment on KAFKA-7026 at 6/27/18 4:11 AM:
-

[~vahid], unfortunately we are not able to reproduce this in our QA setup. Only 
co-relation that we have seen is this seems to happen when there is spike in 
the number of under replicated partitions in the kafka cluster. 

One more thing is when this issue happens, we have seen our consumers not 
processing data for more than "max.poll.interval.ms", thus the consumer.poll() 
call is not invoked for "max.poll.interval.ms", which means the consumer is 
considered failed and the group will rebalance in order to reassign the 
partitions to another member. Looks like, the first consumer, after 
recovery(able to process now), is still getting data from the earlier assigned 
partition, leading to this issue.


was (Author: nperiwal):
[~vahid], unfortunately we are not able to reproduce this in our QA setup. Only 
co-relation that we have seen is this seems to happen when there is spike in 
the number of under replicated partitions in the kafka cluster. 

One more thing is when this issue happens, we have seen our consumers not 
processing data for more than "max.poll.interval.ms", thus the consumer.poll() 
call is not invoked for "max.poll.interval.ms", which means the consumer is 
considered failed and the group will rebalance in order to reassign the 
partitions to another member. Looks like, the old consumer, after recovery, is 
still getting data from the earlier assigned partition, leading to this issue.

> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.1.0
>
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-26 Thread Narayan Periwal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524543#comment-16524543
 ] 

Narayan Periwal edited comment on KAFKA-7026 at 6/27/18 4:10 AM:
-

[~vahid], unfortunately we are not able to reproduce this in our QA setup. Only 
co-relation that we have seen is this seems to happen when there is spike in 
the number of under replicated partitions in the kafka cluster. 

One more thing is when this issue happens, we have seen our consumers not 
processing data for more than "max.poll.interval.ms", thus the consumer.poll() 
call is not invoked for "max.poll.interval.ms", which means the consumer is 
considered failed and the group will rebalance in order to reassign the 
partitions to another member. Looks like, the old consumer, after recovery, is 
still getting data from the earlier assigned partition, leading to this issue.


was (Author: nperiwal):
[~vahid], unfortunately we are not able to reproduce this in our QA setup. Only 
co-relation that we have seen is this seems to happen when there is spike in 
the number of under replicated partitions in the kafka cluster. 

> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.1.0
>
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-26 Thread Narayan Periwal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524543#comment-16524543
 ] 

Narayan Periwal commented on KAFKA-7026:


[~vahid], unfortunately we are not able to reproduce this in our QA setup. Only 
co-relation that we have seen is this seems to happen when there is spike in 
the number of under replicated partitions in the kafka cluster. 

> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.1.0
>
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-26 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524538#comment-16524538
 ] 

Vahid Hashemian commented on KAFKA-7026:


[~nperiwal] Is the issue you described reproducible by following certain steps?

> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.1.0
>
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-26 Thread Narayan Periwal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524517#comment-16524517
 ] 

Narayan Periwal commented on KAFKA-7026:


[~vahid], Agree that this may not be the actual cause of the issue. But we have 
seen this occuring  multiple times in our production setup, with consumers 
continuing to consume the same partition unless a manual restart is triggered. 
So, it could be due to some other issue.

[~steven.aerts], we are using custom checkpointing in zookeeper, so the 
kafka-consumer-groups.sh script to describe the consumer group does not work 
for us. 
However, we have a mechanism to detect multiple consumers when consuming from 
the same partition. I am sharing the distribution of one such case. Topic - 
test, consumer group - group1, consumers - c1,c2,c3,c4,c5.
Partition 3,4,5 of this topic were being consumed by multiple consumer 
instances.
{noformat}
group: group1, topic: test, partition: 0, consumer: c2
group: group1, topic: test, partition: 1, consumer: c4
group: group1, topic: test, partition: 2, consumer: c4
group: group1, topic: test, partition: 3, consumer: c3,c4
group: group1, topic: test, partition: 4, consumer: c3,c5
group: group1, topic: test, partition: 5, consumer: c3,c5
group: group1, topic: test, partition: 6, consumer: c5
group: group1, topic: test, partition: 7, consumer: c1
group: group1, topic: test, partition: 8, consumer: c1
group: group1, topic: test, partition: 9, consumer: c1
{noformat} 

> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.1.0
>
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka

2018-06-26 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524432#comment-16524432
 ] 

Jason Gustafson commented on KAFKA-7095:


The easy thing to do is increase the offsets.retention.minutes to something 
larger to ensure that the committed offset is not lost. However, there 
shouldn't be any need to restart the application. The consumer should just 
reset the offset if it can't find a committed offset to use. I tried this out 
on 0.10.1 using a very low offsets.retention.minutes and it works as expected. 
I would suggest using a more recent client version, and if that doesn't work, 
collect and post DEBUG logging from the consumer.

> Low traffic consumer is not consuming messages after the offsets is deleted 
> by Kafka
> 
>
> Key: KAFKA-7095
> URL: https://issues.apache.org/jira/browse/KAFKA-7095
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: Aldo Sinanaj
>Priority: Minor
>
> Hello guys.
> I have a low traffic consumers for a given consumer group and I have the 
> default broker setting for this property *offsets.retention.minutes*. So if a 
> messages is coming after 2 days and Kafka has deleted the offset for that 
> given consumer, then the consumer will not consume the new incoming messages. 
> If I restart the application it will consume from the earliest which is 
> obvious since the offset is deleted.
> My question is why it doesn't consume the new messages if I don't restart the 
> application? And how does this version of Kafka understands if a consumer is 
> active or inactive? Is my consumer considered inactive in this case?
> Thanks,
> Aldo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup

2018-06-26 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524421#comment-16524421
 ] 

Guozhang Wang commented on KAFKA-5037:
--

If we want to only fix-forward this issue beyond 2.0. I'd propose the following 
to refactor the {{StreamsPartitionAssignor#assign()}} function as follows. The 
principle is that, unlike consumer who would assign partial available topics to 
members hoping that when the metadata gets refreshed again another rebalance is 
triggered to complete the full subscription, in Streams we would not start at 
all if not all the requested source topics are available. This is a simpler 
rule and easier for users to reason: in the past the most complaints are that 
"I need to be notified when this happens, but I still see application in 
RUNNING, and was not aware we are actually only executing partial topology".

1. At the beginning of {{assign}}, we first check that all the non-repartition 
source topics are included in the {{metadata}}. If not, we log an error at the 
leader and set an error in the Assignment userData bytes, indicating that 
leader cannot complete assignment and the error code would indicate the root 
cause of it.

2. Upon receiving the assignment, if the error is not NONE the streams will 
shutdown itself with a log entry re-stating the root cause interpreted from the 
error code.

3. With 1) / 2) above, the non repartition source topics should always been 
available, and hence we should never encounter the `NOT_AVAILABLE` case for 
num.partitions of those repartition topics, we will remove `NOT_AVAILABLE` from 
the possible values, and remove the corresponding logic in 
DefaultPartitionGrouper for `NOT_AVAILABLE`.

Note that 1) above requires us to bump up the version for assignment userData 
again, which should be fine with version probing added in 2.0. There is an 
alternative walkaround, to use negative version numbers as the error codes, and 
as long as clients knows how to interpret them that is fine. Personally I felt 
it is a bit hacky and would suggest we just bump up the version.

WDYT?

> Infinite loop if all input topics are unknown at startup
> 
>
> Key: KAFKA-5037
> URL: https://issues.apache.org/jira/browse/KAFKA-5037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: user-experience
>
> See discusion: https://github.com/apache/kafka/pull/2815
> We will need some rewrite on {{StreamPartitionsAssignor}} and to add much 
> more test for all kind of corner cases, including pattern subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup

2018-06-26 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524389#comment-16524389
 ] 

Guozhang Wang commented on KAFKA-5037:
--

I'm now grouping these three tickets: 5037, 6587 and 6437 into a single general 
issue, and here is my summary of it:

1. If there is a repartition topic whose num.partitions is dependent on the 
corresponding sub-topology's input topic, and that input topic does not exist 
in {{metadata}}, then it will be assigned as {{NOT_AVAILABLE}} and the 
corresponding topic will not be created, and any tasks that needs it as input 
topics will be silently dropped (see 
{{DefaultPartitionGrouper#maxNumPartitions}}). This is reported as in 
KAFKA-6437.

1.1. If we happen to pass the while loop because of the {{NOT_AVAILABLE}} tag, 
we may still hit the issue in {{CopartitionedTopicsValidator#validate}} that 
will cause runtime exception being thrown.

2. If there is a repartition topic whose num.partitions is dependent on the 
corresponding sub-topology's input topic, and that input topic is also a 
repartition topic with num.partition as {{NOT_AVAILABLE}}, then there is a bug 
that will cause the while loop to never finish. This is reported as in 
KAFKA-5037 and the PR 2815 contains the fix of it already.

3. Before 1.1, we have another while loop inside {{prepareTopic}}. If the 
metadata cannot be fetched then we will also fall into a blocking scenario. 
This is more related to KAFKA-6587 and should have been fixed automatically now.

{code}
// wait until each one of the topic metadata has been propagated to 
at least one broker
while (!allTopicsCreated(topicNamesToMakeReady, topicsToMakeReady)) 
{
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
// ignore
}
}
{code}

> Infinite loop if all input topics are unknown at startup
> 
>
> Key: KAFKA-5037
> URL: https://issues.apache.org/jira/browse/KAFKA-5037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: user-experience
>
> See discusion: https://github.com/apache/kafka/pull/2815
> We will need some rewrite on {{StreamPartitionsAssignor}} and to add much 
> more test for all kind of corner cases, including pattern subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-06-26 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524379#comment-16524379
 ] 

Guozhang Wang commented on KAFKA-6437:
--

I think this is again correlated to 
https://issues.apache.org/jira/browse/KAFKA-5037, I'll write the summary of 
this general issue in 5037.

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Assignee: Mariam John
>Priority: Minor
>  Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7104) ReplicaFetcher thread may die because of inconsistent log start offset in fetch response

2018-06-26 Thread Anna Povzner (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-7104:

Description: 
What we saw:

The follower fetches offset 116617, which it was able successfully append. 
However, leader's log start offset in fetch request was 116753, which was 
higher than fetched offset 116617. When replica fetcher thread tried to 
increment log start offset to leader's log start offset, it failed with 
OffsetOutOfRangeException: 

[2018-06-23 00:45:37,409] ERROR  Error due to 
(kafka.server.ReplicaFetcherThread) 
 kafka.common.KafkaException: Error processing data for partition X-N offset 
116617 

Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
increment the log start offset to 116753 of partition X-N since it is larger 
than the high watermark 116619

 

In leader's log, we see that log start offset was incremented almost at the 
same time (within one 100 ms or so). 

[2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N 
to 116753

 

In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() 
that reads from local log and returns LogReadResult that contains fetched data 
and leader's log start offset and HW. However, it then calls 
ReplicaManager#updateFollowerLogReadResults() which may move leader's log start 
offset and update leader's log start offset and HW in fetch response. If 
deleteRecords() happens in between, it is possible that log start offset may 
move beyond fetched offset. Or possibly, the leader moves log start offset 
because of deleting old log segments. Basically, the issue is that log start 
offset can move between records are read from the log and LogReadResult is 
updated with new log start offset. As a result, fetch response may contain 
fetched data but leader's log start offset in the response could be set beyond 
fetched offset (and indicate the state on leader that fetched data does not 
actually exist anymore on leader). 

When a follower receives such fetch response, it will first append, then move 
it's HW no further than its LEO, which maybe less than leader's log start 
offset in fetch response, and then call 
`replica.maybeIncrementLogStartOffset(leaderLogStartOffset)` which will throw 
OffsetOutOfRangeException exception causing the fetcher thread to stop. 

Note that this can happen if the follower is not in ISR, otherwise the leader 
will not move its log start offsets beyond follower's HW. 

 

*Suggested fix:*

1) Since ReplicaFetcher bounds follower's HW to follower's LEO, we should also 
bound follower's log start offset to its LEO. In this situation, the follower's 
log start offset will be updated to LEO.

2) In addition to #1, we could try to make sure that leader builds fetch 
response based on the state of the log as of time of reading data from replica 
(but including moving leader's HW based on the follower's fetch). That could be 
another JIRA potentially, since the fix could be more involved.

 

  was:
What we saw:

The follower fetches offset 116617, which it was able successfully append. 
However, leader's log start offset in fetch request was 116753, which was 
higher than fetched offset 116617. When replica fetcher thread tried to 
increment log start offset to leader's log start offset, it failed with 
OffsetOutOfRangeException: 

[2018-06-23 00:45:37,409] ERROR [ReplicaFetcher replicaId=1002, leaderId=1001, 
fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) 
 kafka.common.KafkaException: Error processing data for partition X-N offset 
116617 

Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
increment the log start offset to 116753 of partition X-N since it is larger 
than the high watermark 116619

 

In leader's log, we see that log start offset was incremented almost at the 
same time (within one 100 ms or so). 

[2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N 
to 116753 in dir /kafka/kafka-logs (kafka.log.Log)

 

In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() 
that reads from local log and returns LogReadResult that contains fetched data 
and leader's log start offset and HW. However, it then calls 
ReplicaManager#updateFollowerLogReadResults() which may move leader's log start 
offset and update leader's log start offset and HW in fetch response. If 
deleteRecords() happens in between, it is possible that log start offset may 
move beyond fetched offset. Or possibly, the leader moves log start offset 
because of deleting old log segments. Basically, the issue is that log start 
offset can move between records are read from the log and LogReadResult is 
updated with new log start offset. As a result, fetch response may contain 
fetched data but leader's log start offset in the response could be set beyond 
fetched offset (and indicate the state on 

[jira] [Updated] (KAFKA-7104) ReplicaFetcher thread may die because of inconsistent log start offset in fetch response

2018-06-26 Thread Anna Povzner (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-7104:

Description: 
What we saw:

The follower fetches offset 116617, which it was able successfully append. 
However, leader's log start offset in fetch request was 116753, which was 
higher than fetched offset 116617. When replica fetcher thread tried to 
increment log start offset to leader's log start offset, it failed with 
OffsetOutOfRangeException: 

[2018-06-23 00:45:37,409] ERROR [ReplicaFetcher replicaId=1002, leaderId=1001, 
fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) 
 kafka.common.KafkaException: Error processing data for partition X-N offset 
116617 

Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
increment the log start offset to 116753 of partition X-N since it is larger 
than the high watermark 116619

 

In leader's log, we see that log start offset was incremented almost at the 
same time (within one 100 ms or so). 

[2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N 
to 116753 in dir /kafka/kafka-logs (kafka.log.Log)

 

In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() 
that reads from local log and returns LogReadResult that contains fetched data 
and leader's log start offset and HW. However, it then calls 
ReplicaManager#updateFollowerLogReadResults() which may move leader's log start 
offset and update leader's log start offset and HW in fetch response. If 
deleteRecords() happens in between, it is possible that log start offset may 
move beyond fetched offset. Or possibly, the leader moves log start offset 
because of deleting old log segments. Basically, the issue is that log start 
offset can move between records are read from the log and LogReadResult is 
updated with new log start offset. As a result, fetch response may contain 
fetched data but leader's log start offset in the response could be set beyond 
fetched offset (and indicate the state on leader that fetched data does not 
actually exist anymore on leader). 

When a follower receives such fetch response, it will first append, then move 
it's HW no further than its LEO, which maybe less than leader's log start 
offset in fetch response, and then call 
`replica.maybeIncrementLogStartOffset(leaderLogStartOffset)` which will throw 
OffsetOutOfRangeException exception causing the fetcher thread to stop. 

Note that this can happen if the follower is not in ISR, otherwise the leader 
will not move its log start offsets beyond follower's HW. 

 

*Suggested fix:*

1) Since ReplicaFetcher bounds follower's HW to follower's LEO, we should also 
bound follower's log start offset to its LEO. In this situation, the follower's 
log start offset will be updated to LEO.

2) In addition to #1, we could try to make sure that leader builds fetch 
response based on the state of the log as of time of reading data from replica 
(but including moving leader's HW based on the follower's fetch). That could be 
another JIRA potentially, since the fix could be more involved.

 

  was:
What we saw:

The follower fetches offset 116617, which it was able successfully append. 
However, leader's log start offset in fetch request was 116753, which was 
higher than fetched offset 116617. When replica fetcher thread tried to 
increment log start offset to leader's log start offset, it failed with 
OffsetOutOfRangeException: 

[2018-06-23 00:45:37,409] ERROR [ReplicaFetcher replicaId=1002, leaderId=1001, 
fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) 
 kafka.common.KafkaException: Error processing data for partition X-N offset 
116617 

Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
increment the log start offset to 116753 of partition X-N since it is larger 
than the high watermark 116619

 

In leader's log, we see that log start offset was incremented almost at the 
same time (within one 100 ms or so). 

[2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N 
to 116753 in dir /kafka/kafka-logs (kafka.log.Log)

 

In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() 
that reads from local log and returns LogReadResult that contains fetched data 
and leader's log start offset and HW. However, it then calls 
ReplicaManager#updateFollowerLogReadResults() that may move leader's log start 
offset and update leader's log start offset and HW in fetch response. If 
deleteRecords() happens in between, it is possible that log start offset may 
move beyond fetched offset. As a result, fetch response will contain fetched 
data but log start offset that is beyond fetched offset (and indicate the state 
on leader that fetched data does not actually exist anymore on leader).

When a follower receives such fetch response, it will first append, then move 
it's HW no further 

[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-06-26 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524331#comment-16524331
 ] 

Guozhang Wang commented on KAFKA-6520:
--

Hi [~milindjain], are you still working on this ticket?

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Milind Jain
>Priority: Major
>  Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7107) Ability to configure state store for JoinWindows in KStream-KStream join

2018-06-26 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7107:
---
Labels: needs-kip  (was: )

> Ability to configure state store for JoinWindows in KStream-KStream join
> 
>
> Key: KAFKA-7107
> URL: https://issues.apache.org/jira/browse/KAFKA-7107
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, the KStream-KStream join operation internally provisions window 
> stores to support the JoinWindow configuration.
>  
> However, unlike all the other stateful processors, it does not allow 
> configuration of the stores. We should consider adding DSL methods taking 
> Materialized configs for these stores.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7106) Remove segment/segmentInterval from Window definition

2018-06-26 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7106:
---
Labels: needs-kip  (was: )

> Remove segment/segmentInterval from Window definition
> -
>
> Key: KAFKA-7106
> URL: https://issues.apache.org/jira/browse/KAFKA-7106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: needs-kip
>
> Currently, Window configures segment and segmentInterval properties, but 
> these aren't truly properties of a window in general.
> Rather, they are properties of the particular implementation that we 
> currently have: a segmented store. Therefore, these properties should be 
> moved to configure only that implementation.
>  
> This may be related to KAFKA-4730, since an in-memory window store wouldn't 
> necessarily need to be segmented.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6587) Kafka Streams hangs when not able to access internal topics

2018-06-26 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524306#comment-16524306
 ] 

Guozhang Wang commented on KAFKA-6587:
--

I looked into the source code, and I believe this issue still exists in trunk, 
and the security setting is just one of the scenarios that can triggers it. The 
root cause is the same as https://issues.apache.org/jira/browse/KAFKA-5037. 
Here is the reasoning:

Without the missed setting, the TopicMetadata response will filter out any 
topics that are not granted permission, and hence the {{metadata}} object 
passed in {{assign(Cluster metadata, Map subscriptions)}} 
will not contain this topic, and hence it will be blocked as described in 
KAFKA-5037.

> Kafka Streams hangs when not able to access internal topics
> ---
>
> Key: KAFKA-6587
> URL: https://issues.apache.org/jira/browse/KAFKA-6587
> Project: Kafka
>  Issue Type: Bug
>  Components: security, streams
>Affects Versions: 1.0.0
>Reporter: Chris Medved
>Priority: Minor
>
> *Expectation:* Kafka Streams client will throw an exception, log errors, or 
> crash when a fatal error occurs.
> *Observation:* Kafka Streams does not log an error or throw an exception when 
> necessary permissions for internal state store topics are not granted. It 
> will hang indefinitely and not start running the topology.
> *Steps to reproduce:*
>  # Create a Kafka Cluster with ACLs enabled (allow.everyone.if.no.acl.found 
> should be set to false, or deny permissions must be set on the intermediate 
> topics).
>  # Create a simple streams application that does a stateful operation such as 
> count.
>  # Grant ACLs on source and sink topics to principal used for testing (would 
> recommend using ANONYMOUS user if possible for ease of testing).
>  # Grant ACLs for consumer group and cluster create. Add deny permissions to 
> state store topics if the default is "allow". You can run the application to 
> create the topics or use the toplogy describe method to get the names.
>  # Run streams application. It should hang on "(Re-)joining group" with no 
> errors printed.
> *Detailed Explanation*
> I spent some time trying to figure out what was wrong with my streams app. 
> I'm using ACLs on my Kafka cluster and it turns out I forgot to grant 
> read/write access to the internal topic state store for an aggregation.
> The streams client would hang on "(Re-)joining group" until killed (note ^C 
> is ctrl+c, which I used to kill the app): 
> {code:java}
> 10:29:10.064 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=kafka-consumer-client-StreamThread-1-consumer, 
> groupId=kafka-consumer-test] Discovered coordinator localhost:9092 (id: 
> 2147483647 rack: null)
> 10:29:10.105 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=kafka-consumer-client-StreamThread-1-consumer, 
> groupId=kafka-consumer-test] Revoking previously assigned partitions []
> 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
> [kafka-consumer-client-StreamThread-1] State transition from RUNNING to 
> PARTITIONS_REVOKED
> 10:29:10.106 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [kafka-consumer-client]State transition from RUNNING to REBALANCING
> 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
> [kafka-consumer-client-StreamThread-1] partition revocation took 1 ms.
> suspended active tasks: []
> suspended standby tasks: []
> 10:29:10.107 [kafka-consumer-client-StreamThread-1] INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=kafka-consumer-client-StreamThread-1-consumer, 
> groupId=kafka-consumer-test] (Re-)joining group
> ^C
> 10:34:53.609 [Thread-3] INFO org.apache.kafka.streams.KafkaStreams - 
> stream-client [kafka-consumer-client]State transition from REBALANCING to 
> PENDING_SHUTDOWN
> 10:34:53.610 [Thread-3] INFO 
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
> [kafka-consumer-client-StreamThread-1] Informed to shut down
> 10:34:53.610 [Thread-3] INFO 
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
> [kafka-consumer-client-StreamThread-1] State transition from 
> PARTITIONS_REVOKED to PENDING_SHUTDOWN{code}
> The server log would show:
> {code:java}
> [2018-02-23 10:29:10,408] INFO [Partition 
> kafka-consumer-test-KSTREAM-AGGREGATE-STATE-STORE-05-changelog-0 
> broker=0] 
> 

[jira] [Created] (KAFKA-7107) Ability to configure state store for JoinWindows in KStream-KStream join

2018-06-26 Thread John Roesler (JIRA)
John Roesler created KAFKA-7107:
---

 Summary: Ability to configure state store for JoinWindows in 
KStream-KStream join
 Key: KAFKA-7107
 URL: https://issues.apache.org/jira/browse/KAFKA-7107
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, the KStream-KStream join operation internally provisions window 
stores to support the JoinWindow configuration.

 

However, unlike all the other stateful processors, it does not allow 
configuration of the stores. We should consider adding DSL methods taking 
Materialized configs for these stores.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7106) Remove segment/segmentInterval from Window definition

2018-06-26 Thread John Roesler (JIRA)
John Roesler created KAFKA-7106:
---

 Summary: Remove segment/segmentInterval from Window definition
 Key: KAFKA-7106
 URL: https://issues.apache.org/jira/browse/KAFKA-7106
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, Window configures segment and segmentInterval properties, but these 
aren't truly properties of a window in general.

Rather, they are properties of the particular implementation that we currently 
have: a segmented store. Therefore, these properties should be moved to 
configure only that implementation.

 

This may be related to KAFKA-4730, since an in-memory window store wouldn't 
necessarily need to be segmented.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7101) Session Window store should set topic policy `compact,cleanup`

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524222#comment-16524222
 ] 

ASF GitHub Bot commented on KAFKA-7101:
---

guozhangwang opened a new pull request #5298: KAFKA-7101: Consider session 
store for windowed store default configs
URL: https://github.com/apache/kafka/pull/5298
 
 
   1. extend `isWindowStore` to consider session store as well.
   2. extend the existing unit test accordingly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Session Window store should set topic policy `compact,cleanup`
> --
>
> Key: KAFKA-7101
> URL: https://issues.apache.org/jira/browse/KAFKA-7101
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> With 
> [KIP-71|https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist]
>  (0.10.1.0) topic config `compact,delete` was introduce to apply to windowed 
> store changelog topics in Kafka Streams. Later (0.10.2.0), session windows 
> got added in 
> [KIP-94|https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows].
>  However, session windows do not use `compact,delete` at the moment. This 
> result is the same issue window stores face before KIP-71. Thus, we should 
> enable `compact,delete` for session window changelog topics, too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7105) Refactor RocksDBSegmentsBatchingRestoreCallback and RocksDBBatchingRestoreCallback into a single class

2018-06-26 Thread Liquan Pei (JIRA)
Liquan Pei created KAFKA-7105:
-

 Summary: Refactor RocksDBSegmentsBatchingRestoreCallback and 
RocksDBBatchingRestoreCallback into a single class
 Key: KAFKA-7105
 URL: https://issues.apache.org/jira/browse/KAFKA-7105
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.1.0
Reporter: Liquan Pei
Assignee: Liquan Pei






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7104) ReplicaFetcher thread may die because of inconsistent log start offset in fetch response

2018-06-26 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-7104:
---

 Summary: ReplicaFetcher thread may die because of inconsistent log 
start offset in fetch response
 Key: KAFKA-7104
 URL: https://issues.apache.org/jira/browse/KAFKA-7104
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0, 1.0.0
Reporter: Anna Povzner
Assignee: Anna Povzner


What we saw:

The follower fetches offset 116617, which it was able successfully append. 
However, leader's log start offset in fetch request was 116753, which was 
higher than fetched offset 116617. When replica fetcher thread tried to 
increment log start offset to leader's log start offset, it failed with 
OffsetOutOfRangeException: 

[2018-06-23 00:45:37,409] ERROR [ReplicaFetcher replicaId=1002, leaderId=1001, 
fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) 
 kafka.common.KafkaException: Error processing data for partition X-N offset 
116617 

Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
increment the log start offset to 116753 of partition X-N since it is larger 
than the high watermark 116619

 

In leader's log, we see that log start offset was incremented almost at the 
same time (within one 100 ms or so). 

[2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N 
to 116753 in dir /kafka/kafka-logs (kafka.log.Log)

 

In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() 
that reads from local log and returns LogReadResult that contains fetched data 
and leader's log start offset and HW. However, it then calls 
ReplicaManager#updateFollowerLogReadResults() that may move leader's log start 
offset and update leader's log start offset and HW in fetch response. If 
deleteRecords() happens in between, it is possible that log start offset may 
move beyond fetched offset. As a result, fetch response will contain fetched 
data but log start offset that is beyond fetched offset (and indicate the state 
on leader that fetched data does not actually exist anymore on leader).

When a follower receives such fetch response, it will first append, then move 
it's HW no further than its LEO, which maybe less than leader's log start 
offset in fetch response, and then call 
`replica.maybeIncrementLogStartOffset(leaderLogStartOffset)` which will throw 
OffsetOutOfRangeException exception causing the fetcher thread to stop. 

 

*Suggested fix:*

If the leader moves log start offset beyond fetched offset, 
ReplicaManager#updateFollowerLogReadResults()  should update the log read 
result with OFFSET_OUT_OF_RANGE error, which will cause the follower to reset 
fetch offset to leader's log start offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7103) Use bulkloading for RocksDBSegmentedBytesStore during init

2018-06-26 Thread Liquan Pei (JIRA)
Liquan Pei created KAFKA-7103:
-

 Summary: Use bulkloading for RocksDBSegmentedBytesStore during init
 Key: KAFKA-7103
 URL: https://issues.apache.org/jira/browse/KAFKA-7103
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: Liquan Pei
Assignee: Liquan Pei


We should use bulk loading for recovering RocksDBWindowStore, same as 
RocksDBStore. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6268) Tools should now swallow exceptions like resolving network names

2018-06-26 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reassigned KAFKA-6268:
-

 Assignee: Stanislav Kozlovski
Fix Version/s: 2.1.0

> Tools should now swallow exceptions like resolving network names
> 
>
> Key: KAFKA-6268
> URL: https://issues.apache.org/jira/browse/KAFKA-6268
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Antony Stubbs
>Assignee: Stanislav Kozlovski
>Priority: Major
> Fix For: 2.1.0
>
>
> The cli consumer client shows nothing when it can't resolve a domain. This 
> and other errors like it should be shown to the user by default. You have to 
> turn on DEBUG level logging in the tools log4j to find there is an error.
> {{[2017-11-23 16:40:56,401] DEBUG Error connecting to node 
> as-broker-1-eu-west-1b-public:9092 (id: 1 rack: null) 
> (org.apache.kafka.clients.NetworkClient)
> java.io.IOException: Can't resolve address: as-broker-1-eu-west-1b-public:9092
>   at org.apache.kafka.common.network.Selector.connect(Selector.java:195)
>   at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:764)
>   at 
> org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:60)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:908)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:819)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:431)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:223)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>   at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:64)
>   at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:72)
>   at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
>   at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: java.nio.channels.UnresolvedAddressException
>   at sun.nio.ch.Net.checkAddress(Net.java:101)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
>   at org.apache.kafka.common.network.Selector.connect(Selector.java:192)
>   ... 18 more
> }}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-26 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524117#comment-16524117
 ] 

Vahid Hashemian commented on KAFKA-7026:


[~steven.aerts] is correct. The issue reported in this Jira occurs because the 
consumers involved keep state of their assignment and report that back to the 
leader when a rebalance occurs. Range Assignor is much simpler than that: every 
time a rebalance occurs the leader starts fresh (this is the case for Round 
Robin Assignor too). So  a similar use case would work fine for those assignors.

> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.1.0
>
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7102) Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@10c8df20

2018-06-26 Thread sankar (JIRA)
sankar created KAFKA-7102:
-

 Summary:  Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@10c8df20
 Key: KAFKA-7102
 URL: https://issues.apache.org/jira/browse/KAFKA-7102
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.10.1.0
Reporter: sankar
 Attachments: kafka_java_io_exception.txt

we faced  Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@10c8df20

 Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@10c8df20

We have four node kafka cluster in production environment. We experienced 
suddenly kafka connect issue across cluster. 

manual restart kafka service on all the nodes fixed the issue.

I attached the complete log. Please check the log.

kindly let me know what information more needed from my side.

Thanks in advance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6812) Async ConsoleProducer exits with 0 status even after data loss

2018-06-26 Thread Andras Beni (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524080#comment-16524080
 ] 

Andras Beni commented on KAFKA-6812:


[~enether] 
In sync mode, when an exception occurs, it is logged, and the tool immediately 
exits with error status. In async mode, the same situation is  handled by 
logging, continuing operation and exit code will still be 0. I also see how 
this behavior can be derived from current implementation. I was questioning if 
it is correct and by design, because loosing data should be unexpected and the 
difference in error handling is not obvious from sync-async distinction. If 
this difference is intended, I propose to either
* document that in async mode dropping records is expected and accepted or
* add a flag (e.g. --error-handler-strategy) so users can choose how they want 
to handle errors.

If the latter case is acceptable for you, I am happy to write the KIP and 
implement. 
If neither option is acceptable, feel free to close this issue.

> Async ConsoleProducer exits with 0 status even after data loss
> --
>
> Key: KAFKA-6812
> URL: https://issues.apache.org/jira/browse/KAFKA-6812
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0
>Reporter: Andras Beni
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> When {{ConsoleProducer}} is run without {{--sync}} flag and one of the 
> batches times out, {{ErrorLoggingCallback}} logs the error:
> {code:java}
>  18/04/21 04:23:01 WARN clients.NetworkClient: [Producer 
> clientId=console-producer] Connection to node 10 could not be established. 
> Broker may not be available.
>  18/04/21 04:23:02 ERROR internals.ErrorLoggingCallback: Error when sending 
> message to topic my-topic with key: null, value: 8 bytes with error:
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> my-topic-0: 1530 ms has passed since batch creation plus linger time{code}
>  However, the tool exits with status code 0. 
>  In my opinion the tool should indicate in the exit status that there was 
> data lost. Maybe it's reasonable to exit after the first error.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6809) connections-created metric does not behave as expected

2018-06-26 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524060#comment-16524060
 ] 

Ismael Juma commented on KAFKA-6809:


Yes.

> connections-created metric does not behave as expected
> --
>
> Key: KAFKA-6809
> URL: https://issues.apache.org/jira/browse/KAFKA-6809
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Anna Povzner
>Assignee: Stanislav Kozlovski
>Priority: Major
> Fix For: 2.1.0, 1.1.2
>
>
> "connections-created" sensor is described as "new connections established". 
> It currently records only connections that the broker/client creates, but 
> does not count connections received. Seems like we should also count 
> connections received – either include them into this metric (and also clarify 
> the description) or add a new metric (separately counting two types of 
> connections). I am not sure how useful is to separate them, so I think we 
> should do the first approach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6964) Add ability to print all internal topic names

2018-06-26 Thread Jagadesh Adireddi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524058#comment-16524058
 ] 

Jagadesh Adireddi commented on KAFKA-6964:
--

Got it [~bbejeck]... Should we skip this feature for now and close?.

> Add ability to print all internal topic names
> -
>
> Key: KAFKA-6964
> URL: https://issues.apache.org/jira/browse/KAFKA-6964
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: needs-kip
>
> For security access reasons some streams users need to build all internal 
> topics before deploying their streams application.  While it's possible to 
> get all internal topic names from the {{Topology#describe()}} method, it 
> would be nice to have a separate method that prints out only the internal 
> topic names to ease the process.
> I think this change will require a KIP, so I've added the appropriate label.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client

2018-06-26 Thread Shun Guan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524049#comment-16524049
 ] 

Shun Guan commented on KAFKA-6788:
--

Is this story closed? I see there are code commits and pr but doesn't seem like 
anyone accepts the pr [~guozhang]

> Grouping consumer requests per consumer coordinator in admin client
> ---
>
> Key: KAFKA-6788
> URL: https://issues.apache.org/jira/browse/KAFKA-6788
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
> will first try to get the coordinator for each requested group id, and then 
> send the corresponding request for that group id. However, different group 
> ids could be hosted on the same coordinator, and these requests do support 
> multi group ids be sent within the same request. So we can consider optimize 
> it by grouping the requests per coordinator destination.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6809) connections-created metric does not behave as expected

2018-06-26 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523985#comment-16523985
 ] 

Stanislav Kozlovski commented on KAFKA-6809:


I'm not sure I understand what the appropriate approach would be in that case. 
You're saying that a `Selector` would never be used to receive and create 
connections at once? In that case, this means that `Selector`s which are used 
for receiving connections show empty connections metrics. Is that so? 

> connections-created metric does not behave as expected
> --
>
> Key: KAFKA-6809
> URL: https://issues.apache.org/jira/browse/KAFKA-6809
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Anna Povzner
>Assignee: Stanislav Kozlovski
>Priority: Major
> Fix For: 2.1.0, 1.1.2
>
>
> "connections-created" sensor is described as "new connections established". 
> It currently records only connections that the broker/client creates, but 
> does not count connections received. Seems like we should also count 
> connections received – either include them into this metric (and also clarify 
> the description) or add a new metric (separately counting two types of 
> connections). I am not sure how useful is to separate them, so I think we 
> should do the first approach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523970#comment-16523970
 ] 

ASF GitHub Bot commented on KAFKA-6949:
---

lindong28 closed pull request #5293: KAFKA-6949; alterReplicaLogDirs() should 
grab partition lock when accessing log of the future replica
URL: https://github.com/apache/kafka/pull/5293
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 9ab1ec47af8..b80c34475d3 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -149,10 +149,10 @@ class Partition(val topic: String,
 * @return true iff the future replica is created
 */
   def maybeCreateFutureReplica(logDir: String): Boolean = {
-// The readLock is needed to make sure that while the caller checks the 
log directory of the
+// The writeLock is needed to make sure that while the caller checks the 
log directory of the
 // current replica and the existence of the future replica, no other 
thread can update the log directory of the
 // current replica or remove the future replica.
-inReadLock(leaderIsrUpdateLock) {
+inWriteLock(leaderIsrUpdateLock) {
   val currentReplica = getReplica().get
   if (currentReplica.log.get.dir.getParent == logDir)
 false
@@ -207,29 +207,52 @@ class Partition(val topic: String,
 allReplicasMap.remove(replicaId)
   }
 
-  def removeFutureLocalReplica() {
+  def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
+inReadLock(leaderIsrUpdateLock) {
+  getReplica(Request.FutureLocalReplicaId) match {
+case Some(futureReplica) =>
+  if (futureReplica.log.get.dir.getParent != newDestinationDir)
+true
+  else
+false
+case None => false
+  }
+}
+  }
+
+  def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) {
 inWriteLock(leaderIsrUpdateLock) {
   allReplicasMap.remove(Request.FutureLocalReplicaId)
+  if (deleteFromLogDir)
+logManager.asyncDelete(topicPartition, isFuture = true)
 }
   }
 
-  // Return true iff the future log has caught up with the current log for 
this partition
+  // Return true iff the future replica exists and it has caught up with the 
current replica for this partition
   // Only ReplicaAlterDirThread will call this method and 
ReplicaAlterDirThread should remove the partition
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
 val replica = getReplica().get
-val futureReplica = getReplica(Request.FutureLocalReplicaId).get
-if (replica.logEndOffset == futureReplica.logEndOffset) {
+val futureReplicaLEO = 
getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset)
+if (futureReplicaLEO.contains(replica.logEndOffset)) {
   // The write lock is needed to make sure that while 
ReplicaAlterDirThread checks the LEO of the
   // current replica, no other thread can update LEO of the current 
replica via log truncation or log append operation.
   inWriteLock(leaderIsrUpdateLock) {
-if (replica.logEndOffset == futureReplica.logEndOffset) {
-  logManager.replaceCurrentWithFutureLog(topicPartition)
-  replica.log = futureReplica.log
-  futureReplica.log = None
-  allReplicasMap.remove(Request.FutureLocalReplicaId)
-  true
-} else false
+getReplica(Request.FutureLocalReplicaId) match {
+  case Some(futureReplica) =>
+if (replica.logEndOffset == futureReplica.logEndOffset) {
+  logManager.replaceCurrentWithFutureLog(topicPartition)
+  replica.log = futureReplica.log
+  futureReplica.log = None
+  allReplicasMap.remove(Request.FutureLocalReplicaId)
+  true
+} else false
+  case None =>
+// Future replica is removed by a non-ReplicaAlterLogDirsThread 
before this method is called
+// In this case the partition should have been removed from state 
of the ReplicaAlterLogDirsThread
+// Return false so that ReplicaAlterLogDirsThread does not have to 
remove this partition from the state again to avoid race condition
+false
+}
   }
 } else false
   }
@@ -550,15 +573,22 @@ class Partition(val topic: String,
   }
 
   private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, 
isFuture: Boolean): Unit = {
-  if (isFuture)
-

[jira] [Commented] (KAFKA-6809) connections-created metric does not behave as expected

2018-06-26 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523905#comment-16523905
 ] 

Ismael Juma commented on KAFKA-6809:


[~enether], these metrics exist on a per selector basis. The Selector used for 
receiving connections is never used for creating outbound connections. So, the 
scenario you outline wouldn't actually happen. Separating the metrics has 
compatibility implications and I'm not sure it's worth it in this case.

> connections-created metric does not behave as expected
> --
>
> Key: KAFKA-6809
> URL: https://issues.apache.org/jira/browse/KAFKA-6809
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Anna Povzner
>Assignee: Stanislav Kozlovski
>Priority: Major
> Fix For: 2.1.0, 1.1.2
>
>
> "connections-created" sensor is described as "new connections established". 
> It currently records only connections that the broker/client creates, but 
> does not count connections received. Seems like we should also count 
> connections received – either include them into this metric (and also clarify 
> the description) or add a new metric (separately counting two types of 
> connections). I am not sure how useful is to separate them, so I think we 
> should do the first approach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6809) connections-created metric does not behave as expected

2018-06-26 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523885#comment-16523885
 ] 

Stanislav Kozlovski commented on KAFKA-6809:


Changing them to the first approach (count both inbound/outbound connections) 
might mess up some Kafka users' metrics - imagine watching your connections 
blow up in numbers over a patch update.

I believe we should separate out the sensors and offer both metrics - inbound, 
outbound. This will enable the UIs to provide the three kinds of metrics (in, 
out, total).
[~apovzner], what do you think?


> connections-created metric does not behave as expected
> --
>
> Key: KAFKA-6809
> URL: https://issues.apache.org/jira/browse/KAFKA-6809
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Anna Povzner
>Assignee: Stanislav Kozlovski
>Priority: Major
> Fix For: 2.1.0, 1.1.2
>
>
> "connections-created" sensor is described as "new connections established". 
> It currently records only connections that the broker/client creates, but 
> does not count connections received. Seems like we should also count 
> connections received – either include them into this metric (and also clarify 
> the description) or add a new metric (separately counting two types of 
> connections). I am not sure how useful is to separate them, so I think we 
> should do the first approach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7091) AdminClient should handle FindCoordinatorResponse errors

2018-06-26 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-7091:
-
Fix Version/s: (was: 2.0.0)
   2.0.1

> AdminClient should handle FindCoordinatorResponse errors
> 
>
> Key: KAFKA-7091
> URL: https://issues.apache.org/jira/browse/KAFKA-7091
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.1
>
>
> Currently KafkaAdminClient.deleteConsumerGroups, listConsumerGroupOffsets 
> method implementation ignoring FindCoordinatorResponse errors. This causes 
> admin client request timeouts incase of authorization errors.  We should 
> handle these errors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6812) Async ConsoleProducer exits with 0 status even after data loss

2018-06-26 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523867#comment-16523867
 ] 

Stanislav Kozlovski commented on KAFKA-6812:


I believe this current behavior is correct. You generally exit with a status 
code different of 0 when something unexpected happens.
The console producer is given a `ErrorLoggingCallback` class, whose 
responsibility is to swallow errors and simply log them - 
https://github.com/apache/kafka/blob/581adb5013ed996705031ca8dd9e175c40675692/core/src/main/scala/kafka/tools/ConsoleProducer.scala#L75.
I believe nothing unexpected has happened, therefore it should exit with 0


> Async ConsoleProducer exits with 0 status even after data loss
> --
>
> Key: KAFKA-6812
> URL: https://issues.apache.org/jira/browse/KAFKA-6812
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0
>Reporter: Andras Beni
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> When {{ConsoleProducer}} is run without {{--sync}} flag and one of the 
> batches times out, {{ErrorLoggingCallback}} logs the error:
> {code:java}
>  18/04/21 04:23:01 WARN clients.NetworkClient: [Producer 
> clientId=console-producer] Connection to node 10 could not be established. 
> Broker may not be available.
>  18/04/21 04:23:02 ERROR internals.ErrorLoggingCallback: Error when sending 
> message to topic my-topic with key: null, value: 8 bytes with error:
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> my-topic-0: 1530 ms has passed since batch creation plus linger time{code}
>  However, the tool exits with status code 0. 
>  In my opinion the tool should indicate in the exit status that there was 
> data lost. Maybe it's reasonable to exit after the first error.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-6964) Add ability to print all internal topic names

2018-06-26 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6964:
---
Comment: was deleted

(was: I did not double-check the code, but in general it is ok to use 
`InternalTopologyBuilder`. If (1) or (2) is better, I don't know atm. Maybe you 
try out both approaches to see which one works better and than open PR for the 
better one?)

> Add ability to print all internal topic names
> -
>
> Key: KAFKA-6964
> URL: https://issues.apache.org/jira/browse/KAFKA-6964
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: needs-kip
>
> For security access reasons some streams users need to build all internal 
> topics before deploying their streams application.  While it's possible to 
> get all internal topic names from the {{Topology#describe()}} method, it 
> would be nice to have a separate method that prints out only the internal 
> topic names to ease the process.
> I think this change will require a KIP, so I've added the appropriate label.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6964) Add ability to print all internal topic names

2018-06-26 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523853#comment-16523853
 ] 

Bill Bejeck commented on KAFKA-6964:


[~jadireddi]  I was thinking along the lines of printing the contents of 
{{InternalTopologyBuilder#internalTopicNames}} but **having it exposed on the 
{{KafkaStreams}} object something like 
{{KafkaStreams#printInternalTopicNames()}}. 

But before you start working on this Jira, with KIP-290 (ACL prefixes) will be 
in the 2.0 release meaning users can now grant ACLs allowing to create topics 
prefixed with the application-id (my-application-id*) of a Kafka Streams 
application, reducing the need for this feature.

 

HTH,

Bill

> Add ability to print all internal topic names
> -
>
> Key: KAFKA-6964
> URL: https://issues.apache.org/jira/browse/KAFKA-6964
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: needs-kip
>
> For security access reasons some streams users need to build all internal 
> topics before deploying their streams application.  While it's possible to 
> get all internal topic names from the {{Topology#describe()}} method, it 
> would be nice to have a separate method that prints out only the internal 
> topic names to ease the process.
> I think this change will require a KIP, so I've added the appropriate label.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7099) KafkaLog4jAppender - not sending any message with level DEBUG

2018-06-26 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523849#comment-16523849
 ] 

Manikumar commented on KAFKA-7099:
--

Existing issue:  KAFKA-6415. This temporarily handled in  KAFKA-6415 by 
changing the log level to WARN.

> KafkaLog4jAppender - not sending any message with level DEBUG
> -
>
> Key: KAFKA-7099
> URL: https://issues.apache.org/jira/browse/KAFKA-7099
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Vincent Lebreil
>Priority: Major
>
> KafkaLog4jAppender can be stuck if it is defined at root category with level 
> DEBUG
> {{log4j.rootLogger=DEBUG, kafka}}
> {{log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender}}
> {quote}DEBUG org.apache.kafka.clients.producer.KafkaProducer:131 - Exception 
> occurred during message send:
> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
> after 6 ms.
> {quote}
> KafkaLog4jAppender is using a KafkaProducer using itself Log4j with messages 
> at levels TRACE and DEBUG. The appender used in this case is also the 
> KafkaLog4jAppender.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7099) KafkaLog4jAppender - not sending any message with level DEBUG

2018-06-26 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523849#comment-16523849
 ] 

Manikumar edited comment on KAFKA-7099 at 6/26/18 3:10 PM:
---

Existing issue:  KAFKA-6415. This was temporarily handled in  KAFKA-6415 by 
changing the log level to WARN.


was (Author: omkreddy):
Existing issue:  KAFKA-6415. This temporarily handled in  KAFKA-6415 by 
changing the log level to WARN.

> KafkaLog4jAppender - not sending any message with level DEBUG
> -
>
> Key: KAFKA-7099
> URL: https://issues.apache.org/jira/browse/KAFKA-7099
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Vincent Lebreil
>Priority: Major
>
> KafkaLog4jAppender can be stuck if it is defined at root category with level 
> DEBUG
> {{log4j.rootLogger=DEBUG, kafka}}
> {{log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender}}
> {quote}DEBUG org.apache.kafka.clients.producer.KafkaProducer:131 - Exception 
> occurred during message send:
> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
> after 6 ms.
> {quote}
> KafkaLog4jAppender is using a KafkaProducer using itself Log4j with messages 
> at levels TRACE and DEBUG. The appender used in this case is also the 
> KafkaLog4jAppender.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7100) kafka.tools.GetOffsetShell with enable Kerberos Security on Kafka1.0

2018-06-26 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-7100.
--
Resolution: Duplicate

This is being tracked in KAFKA-5235.

> kafka.tools.GetOffsetShell with enable Kerberos Security on Kafka1.0  
> -
>
> Key: KAFKA-7100
> URL: https://issues.apache.org/jira/browse/KAFKA-7100
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.1
>Reporter: abdullah toraman
>Priority: Major
>
> Hi All,
> I enabled the Kerberos Authentication on Kafka 1.0. When I try to 
> kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list : 
> --topic  --time -1, It hits error. Here are the Error,
> [kafka@KLBIRKFKI1 kafka_2.11-1.0.1]$ bin/kafka-run-class.sh 
> kafka.tools.GetOffsetShell --broker-list klbirkfki1:9092 --topic abdullah 
> --time -1
> [2018-06-26 12:59:00,058] WARN Fetching topic metadata with correlation id 0 
> for topics [Set(abdullah)] from broker [BrokerEndPoint(0,klbirkfki1,9092)] 
> failed (kafka.client.ClientUtils$)
> java.io.EOFException
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
>     at 
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
>     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
>     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
>     at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
>     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)
>     at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79)
>     at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)
> Exception in thread "main" kafka.common.KafkaException: fetching topic 
> metadata for topics [Set(abdullah)] from broker 
> [ArrayBuffer(BrokerEndPoint(0,klbirkfki1,9092))] failed
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77)
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)
>     at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79)
>     at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)
> Caused by: java.io.EOFException
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
>     at 
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
>     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
>     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
>     at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
>     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)
>     ... 3 more
> [kafka@KLBIRKFKI1 kafka_2.11-1.0.1]$ bin/kafka-run-class.sh 
> kafka.tools.GetOffsetShell --broker-list klbirkfki1:9092 --topic abdullah 
> --time -1
> [2018-06-26 12:59:00,058] WARN Fetching topic metadata with correlation id 0 
> for topics [Set(abdullah)] from broker [BrokerEndPoint(0,klbirkfki1,9092)] 
> failed (kafka.client.ClientUtils$)
> java.io.EOFException
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
>     at 
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
>     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
>     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
>     at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
>     at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)
>     at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79)
>     at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)
> Exception in thread "main" kafka.common.KafkaException: fetching topic 
> metadata for topics [Set(abdullah)] from broker 
> [ArrayBuffer(BrokerEndPoint(0,klbirkfki1,9092))] failed
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77)
>     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)
>     at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79)
>     at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)
> Caused by: java.io.EOFException
>     at 
> 

[jira] [Commented] (KAFKA-7028) super.users doesn't work with custom principals

2018-06-26 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523839#comment-16523839
 ] 

Stanislav Kozlovski commented on KAFKA-7028:


After investigation, me and [~rsivaram] found out that apart from
{code}
if (superUsers.contains(principal)) {
{code}
the ACL checking logic
{code}
(acl.principal == principal || acl.principal == Acl.WildCardPrincipal) &&
{code}
under `SimpleAclAuthorizer#aclMatch` also calls the `equals()` method of the 
`KafkaPrincipal` class, which always returns `false` when given a subclass of 
`KafkaPrincipal`, because it checks if the classes are equal.

This currently means that no custom principals can authorize at all.

Changing the `equals()` method on the base `KafkaPrincipal` to check for 
subclasses would not work, as `subClass.equals(baseClass)` would return true 
but `baseClass.equals(subClass)` would return false.
An alternative that could work but does not sound good to me is checking 
strings.

Either way, I assume a fix would require a breaking change and therefore a KIP.
[~ijuma], what do you think?

> super.users doesn't work with custom principals
> ---
>
> Key: KAFKA-7028
> URL: https://issues.apache.org/jira/browse/KAFKA-7028
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Stanislav Kozlovski
>Priority: Major
> Fix For: 2.1.0
>
>
> SimpleAclAuthorizer creates a KafkaPrincipal for the users defined in the 
> super.users broker config. However, it should use the configured 
> KafkaPrincipalBuilder so that it works with a custom defined one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7099) KafkaLog4jAppender - not sending any message with level DEBUG

2018-06-26 Thread Bastien Bussy (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523836#comment-16523836
 ] 

Bastien Bussy commented on KAFKA-7099:
--

Hello,

I am a colleague of Vincent.

*Disclaimer* : we work on hadoop (HDP-2.6.4) with *log4j 1.2.17*

After some digging, the problem is caused by the 
"kafka-producer-network-thread" which is in charge of updating the metadata.
In this process, the Sender executes a poll on the NetworkClient, which 
executes the maybeUpdate method.
In this method, there is a call to log.debug but there is a synchronized 
mecanism in log4j.Category.callAppenders.

So, *there is a thread deadlock* between the main thread where we want to use 
the Kafka Appender and the network-thread.

This is the stacktrace from the network-thread :
{code:java}
at org.apache.log4j.Category.callAppenders:204
at org.apache.log4j.Category.forcedLog:391
at org.apache.log4j.Category.log:856
at org.slf4j.impl.Log4jLoggerAdapter.debug:230
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate:644
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate:552
at org.apache.kafka.clients.NetworkClient.poll:258
at org.apache.kafka.clients.producer.internals.Sender.run:236
at org.apache.kafka.clients.producer.internals.Sender.run:135
at java.lang.Thread.run:748
{code}

To fix this issue temporarly, we force the level on logger _org.apache.kafka_ 
to INFO when we have the root logger in DEBUG mode.
I think this is a common issue in Log4j 1.2.
Do you have any recommendations?

> KafkaLog4jAppender - not sending any message with level DEBUG
> -
>
> Key: KAFKA-7099
> URL: https://issues.apache.org/jira/browse/KAFKA-7099
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Vincent Lebreil
>Priority: Major
>
> KafkaLog4jAppender can be stuck if it is defined at root category with level 
> DEBUG
> {{log4j.rootLogger=DEBUG, kafka}}
> {{log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender}}
> {quote}DEBUG org.apache.kafka.clients.producer.KafkaProducer:131 - Exception 
> occurred during message send:
> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
> after 6 ms.
> {quote}
> KafkaLog4jAppender is using a KafkaProducer using itself Log4j with messages 
> at levels TRACE and DEBUG. The appender used in this case is also the 
> KafkaLog4jAppender.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6964) Add ability to print all internal topic names

2018-06-26 Thread Jagadesh Adireddi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523810#comment-16523810
 ] 

Jagadesh Adireddi edited comment on KAFKA-6964 at 6/26/18 2:38 PM:
---

[~mjsax],  After revisiting code, got 2 points in mind. Can you please help me 
in understanding, which one is valid.

1) As described in the ticket , we need to print only "internal topic". Does 
this mean we need to expose InternalTopologyBuilder#internalTopicNames Set,  
that got added through `InternalTopologyBuilder#addInternalTopic` 
 2) As mentioned in your comment, repartition topic and changelog topic 
constitutes internal topics. Can we call `InternalTopologyBuilder#topicGroups` 
and read  `InternalTopicConfig#name` field from both repartitionTopics, 
stateChangelogTopics and print them.


was (Author: adireddijagad...@gmail.com):
[~mjsax],  After revisiting code, got 2 points in mind. Can you please help me 
in understanding, which one is valid.

1) As described in the ticket , we need to print only "internal topic". Does 
this mean we need to expose InternalTopologyBuilder#internalTopicNames Set,  
that got added through `InternalTopologyBuilder#addInternalTopic` 
2) As mentioned in your comment, repartition topic and changelog topic 
constitutes internal topics. Can we call `InternalTopologyBuilder#topicGroups` 
and read  'InternalTopicConfig name'  filed from both repartitionTopics, 
stateChangelogTopics and print them.

> Add ability to print all internal topic names
> -
>
> Key: KAFKA-6964
> URL: https://issues.apache.org/jira/browse/KAFKA-6964
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: needs-kip
>
> For security access reasons some streams users need to build all internal 
> topics before deploying their streams application.  While it's possible to 
> get all internal topic names from the {{Topology#describe()}} method, it 
> would be nice to have a separate method that prints out only the internal 
> topic names to ease the process.
> I think this change will require a KIP, so I've added the appropriate label.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6964) Add ability to print all internal topic names

2018-06-26 Thread Jagadesh Adireddi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523810#comment-16523810
 ] 

Jagadesh Adireddi commented on KAFKA-6964:
--

[~mjsax],  After revisiting code, got 2 points in mind. Can you please help me 
in understanding, which one is valid.

1) As described in the ticket , we need to print only "internal topic". Does 
this mean we need to expose InternalTopologyBuilder#internalTopicNames Set,  
that got added through `InternalTopologyBuilder#addInternalTopic` 
2) As mentioned in your comment, repartition topic and changelog topic 
constitutes internal topics. Can we call `InternalTopologyBuilder#topicGroups` 
and read  'InternalTopicConfig name'  filed from both repartitionTopics, 
stateChangelogTopics and print them.

> Add ability to print all internal topic names
> -
>
> Key: KAFKA-6964
> URL: https://issues.apache.org/jira/browse/KAFKA-6964
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: needs-kip
>
> For security access reasons some streams users need to build all internal 
> topics before deploying their streams application.  While it's possible to 
> get all internal topic names from the {{Topology#describe()}} method, it 
> would be nice to have a separate method that prints out only the internal 
> topic names to ease the process.
> I think this change will require a KIP, so I've added the appropriate label.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7100) kafka.tools.GetOffsetShell with enable Kerberos Security on Kafka1.0

2018-06-26 Thread abdullah toraman (JIRA)
abdullah toraman created KAFKA-7100:
---

 Summary: kafka.tools.GetOffsetShell with enable Kerberos Security 
on Kafka1.0  
 Key: KAFKA-7100
 URL: https://issues.apache.org/jira/browse/KAFKA-7100
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.1
Reporter: abdullah toraman


Hi All,

I enabled the Kerberos Authentication on Kafka 1.0. When I try to 
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list : --topic 
 --time -1, It hits error. Here are the Error,

[kafka@KLBIRKFKI1 kafka_2.11-1.0.1]$ bin/kafka-run-class.sh 
kafka.tools.GetOffsetShell --broker-list klbirkfki1:9092 --topic abdullah 
--time -1

[2018-06-26 12:59:00,058] WARN Fetching topic metadata with correlation id 0 
for topics [Set(abdullah)] from broker [BrokerEndPoint(0,klbirkfki1,9092)] 
failed (kafka.client.ClientUtils$)

java.io.EOFException

    at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)

    at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)

    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)

    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)

    at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)

    at kafka.producer.SyncProducer.send(SyncProducer.scala:124)

    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)

    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)

    at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79)

    at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)

Exception in thread "main" kafka.common.KafkaException: fetching topic metadata 
for topics [Set(abdullah)] from broker 
[ArrayBuffer(BrokerEndPoint(0,klbirkfki1,9092))] failed

    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77)

    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)

    at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79)

    at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)

Caused by: java.io.EOFException

    at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)

    at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)

    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)

    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)

    at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)

    at kafka.producer.SyncProducer.send(SyncProducer.scala:124)

    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)

    ... 3 more

[kafka@KLBIRKFKI1 kafka_2.11-1.0.1]$ bin/kafka-run-class.sh 
kafka.tools.GetOffsetShell --broker-list klbirkfki1:9092 --topic abdullah 
--time -1

[2018-06-26 12:59:00,058] WARN Fetching topic metadata with correlation id 0 
for topics [Set(abdullah)] from broker [BrokerEndPoint(0,klbirkfki1,9092)] 
failed (kafka.client.ClientUtils$)

java.io.EOFException

    at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)

    at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)

    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)

    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)

    at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)

    at kafka.producer.SyncProducer.send(SyncProducer.scala:124)

    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)

    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)

    at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79)

    at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)

Exception in thread "main" kafka.common.KafkaException: fetching topic metadata 
for topics [Set(abdullah)] from broker 
[ArrayBuffer(BrokerEndPoint(0,klbirkfki1,9092))] failed

    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77)

    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)

    at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:79)

    at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)

Caused by: java.io.EOFException

    at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)

    at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)

    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)

    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)

    at 

[jira] [Updated] (KAFKA-7093) Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0

2018-06-26 Thread Suleyman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suleyman updated KAFKA-7093:

Issue Type: Bug  (was: New Feature)

> Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0
> 
>
> Key: KAFKA-7093
> URL: https://issues.apache.org/jira/browse/KAFKA-7093
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.1.0
>Reporter: Suleyman
>Priority: Major
>
> I upgraded to kafka version from 0.11.0.1 to 1.1.0. After the upgrade, I'm 
> getting the below warn message too much.
> WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. 
> This implies messages have arrived out of order. New: \{epoch:0, 
> offset:793868383}, Current: \{epoch:4, offset:792201264} for Partition: 
> __consumer_offsets-42 (kafka.server.epoch.LeaderEpochFileCache) 
> How can I resolve this warn messages? And why I'm getting this warn messages?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

2018-06-26 Thread Lukasz Gluchowski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523725#comment-16523725
 ] 

Lukasz Gluchowski commented on KAFKA-7088:
--

Sorry but DEBUG is still to verbose. It is impacting resources used by the 
application

> Kafka streams thread waits infinitely on transaction init
> -
>
> Key: KAFKA-7088
> URL: https://issues.apache.org/jira/browse/KAFKA-7088
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
> Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 
> 20:07:43 UTC 2018 
> kafka-streams (client) 1.0.1
> kafka broker 1.1.0
> Java version:
> OpenJDK Runtime Environment (build 1.8.0_171-b10)
> OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)
> kakfa config overrides:
> num.stream.threads: 6
> session.timeout.ms: 1
> request.timeout.ms: 11000
> fetch.max.wait.ms: 500
> max.poll.records: 1000
> topic has 24 partitions
>Reporter: Lukasz Gluchowski
>Priority: Major
>  Labels: eos
>
> A kafka stream application thread stops processing without any feedback. The 
> topic has 24 partitions and I noticed that processing stopped only for some 
> partitions. I will describe what happened to partition:10. The application is 
> still running (now for about 8 hours) and that thread is hanging there and no 
> rebalancing that took place.
> There is no error (we have a custom `Thread.UncaughtExceptionHandler` which 
> was not called). I noticed that after couple of minutes stream stopped 
> processing (at offset 32606948 where log-end-offset is 33472402). 
> Broker itself is not reporting any active consumer in that consumer group and 
> the only info I was able to gather was from thread dump:
> {code:java}
> "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33"
>  #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition 
> [0x7fe0215d4000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0xfec6a2f8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at 
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code}
>  
> I tried restarting application once but the situation repeated. Thread read 
> some data, committed offset and 

[jira] [Created] (KAFKA-7099) KafkaLog4jAppender - not sending any message with level DEBUG

2018-06-26 Thread Vincent Lebreil (JIRA)
Vincent Lebreil created KAFKA-7099:
--

 Summary: KafkaLog4jAppender - not sending any message with level 
DEBUG
 Key: KAFKA-7099
 URL: https://issues.apache.org/jira/browse/KAFKA-7099
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.10.2.0
Reporter: Vincent Lebreil


KafkaLog4jAppender can be stuck if it is defined at root category with level 
DEBUG

{{log4j.rootLogger=DEBUG, kafka}}

{{log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender}}
{quote}DEBUG org.apache.kafka.clients.producer.KafkaProducer:131 - Exception 
occurred during message send:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
{quote}
KafkaLog4jAppender is using a KafkaProducer using itself Log4j with messages at 
levels TRACE and DEBUG. The appender used in this case is also the 
KafkaLog4jAppender.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka

2018-06-26 Thread Aldo Sinanaj (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523439#comment-16523439
 ] 

Aldo Sinanaj edited comment on KAFKA-7095 at 6/26/18 12:26 PM:
---

Hi [~hachikuji] thanks for replying.

The consumer configuration for this property is: _auto.offset.reset = earliest_

Since the consumer is part of a consumer group I want to consume from the 
earliest not committed message. With anonymous consumers I don't have this 
issue since the property is _auto.offset.reset = latest_

Any idea how to avoid my problem?

Thanks


was (Author: aldex32):
Hi [~hachikuji] thanks for replying.

The consumer configuration for this property is: _auto.offset.reset = earliest_

So should I set this to _latest_ in order to continue reading without 
restarting the consumer?

Thanks

> Low traffic consumer is not consuming messages after the offsets is deleted 
> by Kafka
> 
>
> Key: KAFKA-7095
> URL: https://issues.apache.org/jira/browse/KAFKA-7095
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: Aldo Sinanaj
>Priority: Minor
>
> Hello guys.
> I have a low traffic consumers for a given consumer group and I have the 
> default broker setting for this property *offsets.retention.minutes*. So if a 
> messages is coming after 2 days and Kafka has deleted the offset for that 
> given consumer, then the consumer will not consume the new incoming messages. 
> If I restart the application it will consume from the earliest which is 
> obvious since the offset is deleted.
> My question is why it doesn't consume the new messages if I don't restart the 
> application? And how does this version of Kafka understands if a consumer is 
> active or inactive? Is my consumer considered inactive in this case?
> Thanks,
> Aldo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka

2018-06-26 Thread Aldo Sinanaj (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523439#comment-16523439
 ] 

Aldo Sinanaj edited comment on KAFKA-7095 at 6/26/18 12:06 PM:
---

Hi [~hachikuji] thanks for replying.

The consumer configuration for this property is: _auto.offset.reset = earliest_

So should I set this to _latest_ in order to continue reading without 
restarting the consumer?

Thanks


was (Author: aldex32):
Hi [~hachikuji] thanks for replying.

The consumer configuration for this property is: _auto.offset.reset = latest_

Any idea?

> Low traffic consumer is not consuming messages after the offsets is deleted 
> by Kafka
> 
>
> Key: KAFKA-7095
> URL: https://issues.apache.org/jira/browse/KAFKA-7095
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: Aldo Sinanaj
>Priority: Minor
>
> Hello guys.
> I have a low traffic consumers for a given consumer group and I have the 
> default broker setting for this property *offsets.retention.minutes*. So if a 
> messages is coming after 2 days and Kafka has deleted the offset for that 
> given consumer, then the consumer will not consume the new incoming messages. 
> If I restart the application it will consume from the earliest which is 
> obvious since the offset is deleted.
> My question is why it doesn't consume the new messages if I don't restart the 
> application? And how does this version of Kafka understands if a consumer is 
> active or inactive? Is my consumer considered inactive in this case?
> Thanks,
> Aldo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4513) Support migration of old consumers to new consumers without downtime

2018-06-26 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523625#comment-16523625
 ] 

Manikumar commented on KAFKA-4513:
--

[~ijuma] Can we close this JIRA?

> Support migration of old consumers to new consumers without downtime
> 
>
> Key: KAFKA-4513
> URL: https://issues.apache.org/jira/browse/KAFKA-4513
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Ismael Juma
>Assignee: Onur Karaman
>Priority: Major
>
> Some ideas were discussed in the following thread:
> http://markmail.org/message/ovngfw3ibixlquxh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6211) PartitionAssignmentState is protected whereas KafkaConsumerGroupService.describeGroup() returns PartitionAssignmentState Object

2018-06-26 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523616#comment-16523616
 ] 

Manikumar edited comment on KAFKA-6211 at 6/26/18 11:56 AM:


ConsumerGroupCommand internal classes/methods are not public API. You are 
encouraged to use kafka AdminClient consumer group management methods.


was (Author: omkreddy):
ConsumerGroupCommand internal classes/methods are not public API. You 
encouraged to use kafka AdminClient consumer group management methods.

> PartitionAssignmentState is protected whereas 
> KafkaConsumerGroupService.describeGroup() returns PartitionAssignmentState 
> Object
> ---
>
> Key: KAFKA-6211
> URL: https://issues.apache.org/jira/browse/KAFKA-6211
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, 
> 0.10.2.1, 0.11.0.0, 0.11.0.1, 1.0.0
>Reporter: Subhransu Acharya
>Priority: Critical
>  Labels: patch
> Attachments: a.png
>
>   Original Estimate: 1m
>  Remaining Estimate: 1m
>
>  KafkaConsumerGroupService.describeGroup() is returning 
> Tuple2, Option>> but 
> ConsumerGroupCommand has PartitionAssignmentState as a protected class inside 
> it.
> There is no way to create an instance of PartitionAssignmentState.
> make it default in order to use the describe command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6211) PartitionAssignmentState is protected whereas KafkaConsumerGroupService.describeGroup() returns PartitionAssignmentState Object

2018-06-26 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523616#comment-16523616
 ] 

Manikumar commented on KAFKA-6211:
--

ConsumerGroupCommand internal classes/methods are not public API. You 
encouraged to use kafka AdminClient consumer group management methods.

> PartitionAssignmentState is protected whereas 
> KafkaConsumerGroupService.describeGroup() returns PartitionAssignmentState 
> Object
> ---
>
> Key: KAFKA-6211
> URL: https://issues.apache.org/jira/browse/KAFKA-6211
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, 
> 0.10.2.1, 0.11.0.0, 0.11.0.1, 1.0.0
>Reporter: Subhransu Acharya
>Priority: Critical
>  Labels: patch
> Attachments: a.png
>
>   Original Estimate: 1m
>  Remaining Estimate: 1m
>
>  KafkaConsumerGroupService.describeGroup() is returning 
> Tuple2, Option>> but 
> ConsumerGroupCommand has PartitionAssignmentState as a protected class inside 
> it.
> There is no way to create an instance of PartitionAssignmentState.
> make it default in order to use the describe command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5079) ProducerBounceTest fails occasionally with a SocketTimeoutException

2018-06-26 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5079.
--
Resolution: Fixed

ProducerBounceTest is removed as part old consumer changes.

> ProducerBounceTest fails occasionally with a SocketTimeoutException
> ---
>
> Key: KAFKA-5079
> URL: https://issues.apache.org/jira/browse/KAFKA-5079
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Priority: Major
>
> {noformat}
> java.net.SocketTimeoutException
>   at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
>   at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>   at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:85)
>   at 
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:100)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:84)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:133)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:133)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:133)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:32)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:132)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:132)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:132)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:32)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:131)
>   at 
> kafka.api.ProducerBounceTest$$anonfun$2.apply(ProducerBounceTest.scala:116)
>   at 
> kafka.api.ProducerBounceTest$$anonfun$2.apply(ProducerBounceTest.scala:113)
> {noformat}
> This is expected occasionally, since the ports are preallocated and the 
> brokers are bounced in quick succession. Here is the relevant comment from 
> the code: 
> {noformat}
>   // This is the one of the few tests we currently allow to preallocate 
> ports, despite the fact that this can result in transient
>   // failures due to ports getting reused. We can't use random ports because 
> of bad behavior that can result from bouncing
>   // brokers too quickly when they get new, random ports. If we're not 
> careful, the client can end up in a situation
>   // where metadata is not refreshed quickly enough, and by the time it's 
> actually trying to, all the servers have
>   // been bounced and have new addresses. None of the bootstrap nodes or 
> current metadata can get them connected to a
>   // running server.
>   //
>   // Since such quick rotation of servers is incredibly unrealistic, we allow 
> this one test to preallocate ports, leaving
>   // a small risk of hitting errors due to port conflicts. Hopefully this is 
> infrequent enough to not cause problems.
> {noformat}
> We should try to look into handling this exception better so that the test 
> doesn't fail occasionally. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5080) Convert ProducerBounceTest to use the new KafkaConsumer

2018-06-26 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5080.
--
Resolution: Fixed

ProducerBounceTest is removed as part of old consumer changes.

> Convert ProducerBounceTest to use the new KafkaConsumer
> ---
>
> Key: KAFKA-5080
> URL: https://issues.apache.org/jira/browse/KAFKA-5080
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
>
> In KAFKA-5079, the `SocketTimeoutException` indicates that the consumer is 
> stuck waiting for a response from a particular broker, even after the broker 
> has been bounced. Since a given broker in that test always uses the same port 
> is is possible that this is a symptom of a bug in the SimpleConsumer where it 
> doesn't detect the bounce (and hence disconnect), causing it to time out. 
> We should use the new consumer to rule out a client bug, or fix it if it also 
> exists in the new consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers

2018-06-26 Thread Steven Aerts (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523582#comment-16523582
 ] 

Steven Aerts commented on KAFKA-7026:
-

[~nperiwal] looking at the code of the {{RangeAssignor}}, I do not think this 
is the same issue, as this topic does not have any state.

Do you have a dump of the consumer groups as I did above?  So we can have an 
idea if it follows a similar pattern?

> Sticky assignor could assign a partition to multiple consumers
> --
>
> Key: KAFKA-7026
> URL: https://issues.apache.org/jira/browse/KAFKA-7026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.1.0
>
>
> In the following scenario sticky assignor assigns a topic partition to two 
> consumers in the group:
>  # Create a topic {{test}} with a single partition
>  # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group 
> leader and gets {{test-0}})
>  # Start consumer {{c2}}  in group {{sticky-group}} ({{c1}} holds onto 
> {{test-0}}, {{c2}} does not get any partition) 
>  # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes 
> over {{test-0}}, {{c1}} leaves the group)
>  # Resume {{c1}}
> At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them.
>  
> The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from 
> the last assignment it received from the leader (itself) and did not get the 
> next round of assignments (when {{c2}} became leader) because it was paused. 
> Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their 
> existing assignment. The sticky assignor code does not currently check and 
> avoid this duplication.
>  
> Note: This issue was originally reported on 
> [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6809) connections-created metric does not behave as expected

2018-06-26 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski reassigned KAFKA-6809:
--

Assignee: Stanislav Kozlovski

> connections-created metric does not behave as expected
> --
>
> Key: KAFKA-6809
> URL: https://issues.apache.org/jira/browse/KAFKA-6809
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Anna Povzner
>Assignee: Stanislav Kozlovski
>Priority: Major
> Fix For: 2.1.0, 1.1.2
>
>
> "connections-created" sensor is described as "new connections established". 
> It currently records only connections that the broker/client creates, but 
> does not count connections received. Seems like we should also count 
> connections received – either include them into this metric (and also clarify 
> the description) or add a new metric (separately counting two types of 
> connections). I am not sure how useful is to separate them, so I think we 
> should do the first approach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2488) System tests: updated console_consumer.py to support new consumer

2018-06-26 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2488.
--
Resolution: Fixed

Support was added in > 0.10.1 Kafka versions.

> System tests: updated console_consumer.py to support new consumer
> -
>
> Key: KAFKA-2488
> URL: https://issues.apache.org/jira/browse/KAFKA-2488
> Project: Kafka
>  Issue Type: Bug
>Reporter: Geoff Anderson
>Assignee: Geoff Anderson
>Priority: Major
>
> Console consumer now supports new consumer
> Update console_consumer.py to allow this as well



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2215) Improve Randomness for ConsoleConsumer

2018-06-26 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2215.
--
Resolution: Not A Problem

Closing inactive issue. Also the default for console consumer's 
enable.auto.commit is set to false for auto-generated group Ids. 

> Improve Randomness for ConsoleConsumer
> --
>
> Key: KAFKA-2215
> URL: https://issues.apache.org/jira/browse/KAFKA-2215
> Project: Kafka
>  Issue Type: Bug
>Reporter: Fabian Lange
>Priority: Major
>
> Right now the console consumer does a new Random().nextInt(100_000)
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L123
> I would propose to use UUID.randomUUID().toString() instead.
> I know this is quite edgy, but Random has shown its quirks from time to time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka

2018-06-26 Thread Aldo Sinanaj (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523439#comment-16523439
 ] 

Aldo Sinanaj commented on KAFKA-7095:
-

Hi [~hachikuji] thanks for replying.

The consumer configuration for this property is: _auto.offset.reset = latest_

Any idea?

> Low traffic consumer is not consuming messages after the offsets is deleted 
> by Kafka
> 
>
> Key: KAFKA-7095
> URL: https://issues.apache.org/jira/browse/KAFKA-7095
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: Aldo Sinanaj
>Priority: Minor
>
> Hello guys.
> I have a low traffic consumers for a given consumer group and I have the 
> default broker setting for this property *offsets.retention.minutes*. So if a 
> messages is coming after 2 days and Kafka has deleted the offset for that 
> given consumer, then the consumer will not consume the new incoming messages. 
> If I restart the application it will consume from the earliest which is 
> obvious since the offset is deleted.
> My question is why it doesn't consume the new messages if I don't restart the 
> application? And how does this version of Kafka understands if a consumer is 
> active or inactive? Is my consumer considered inactive in this case?
> Thanks,
> Aldo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-06-26 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6949:
--

[~lindong] You can cherry-pick and push the commit to 2.0 branch without a PR 
or further reviews if major changes were not required for cherry-picking (I am 
assuming that is the case here). 

We don't currently have any blockers for which we need to create a new RC. If 
this issue is a blocker for 2.0 and we need to create a new RC, then do let me 
know. Otherwise, I will change the fix version to 2.0.1 for now and change it 
back to 2.0 if we require another RC. Thanks.

> alterReplicaLogDirs() should grab partition lock when accessing log of the 
> future replica
> -
>
> Key: KAFKA-6949
> URL: https://issues.apache.org/jira/browse/KAFKA-6949
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dong Lin
>Priority: Blocker
> Fix For: 2.0.0
>
>
> I found this in a failed execution of 
> kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like 
> we're missing some option checking.
> {code}
> [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while 
> changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76)
> java.util.NoSuchElementException: None.get
>   at scala.None$.get(Option.scala:347)
>   at scala.None$.get(Option.scala:345)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576)
>   at 
> kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:138)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6944) Add system tests testing the new throttling behavior using older clients/brokers

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523348#comment-16523348
 ] 

ASF GitHub Bot commented on KAFKA-6944:
---

jonlee2 opened a new pull request #5294: KAFKA-6944: Add system tests testing 
the new throttling behavior usin…
URL: https://github.com/apache/kafka/pull/5294
 
 
   …g older clients/brokers
   
   Added two additional test cases to quota_test.py, which run between brokers 
and clients with different throttling behaviors. More specifically,
   1. clients with new throttling behavior (i.e., post-KIP-219) and brokers 
with old throttling behavior (i.e., pre-KIP-219)
   2. clients with old throttling behavior and brokers with new throttling 
behavior
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add system tests testing the new throttling behavior using older 
> clients/brokers
> 
>
> Key: KAFKA-6944
> URL: https://issues.apache.org/jira/browse/KAFKA-6944
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 2.0.0
>Reporter: Jon Lee
>Priority: Major
>
> KAFKA-6028 (KIP-219) changes the throttling behavior on quota violation as 
> follows:
>  * the broker will send out a response with throttle time to the client 
> immediately and mute the channel
>  * upon receiving a response with a non-zero throttle time, the client will 
> also block sending further requests to the broker until the throttle time is 
> over.
> The current system tests assume that both clients and brokers are of the same 
> version. We'll need an additional set of quota tests that test throttling 
> behavior between older clients and newer brokers and between newer clients 
> and older brokers. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2018-06-26 Thread Odin Standal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523343#comment-16523343
 ] 

Odin Standal commented on KAFKA-6817:
-

Would it be possible to change transactional.id.expiration.ms to a long?

While not a proper solution to the problem, at least that would enable us to 
reprocess old messages, which would greatly increase the value of Kafka for us.

> UnknownProducerIdException when writing messages with old timestamps
> 
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Odin Standal
>Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 22 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error 
> occurs after 10 minutes of producing messages that have old timestamps (type 
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so 
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code 
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers 
> from memory erroneously when processing records which are older than the 
> maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
> configuration), which is set by default to 7 days. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-26 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523303#comment-16523303
 ] 

Dong Lin commented on KAFKA-7012:
-

Great to know that the fix works!

 

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: regression
> Fix For: 2.0.0, 1.0.2, 1.1.1
>
> Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-06-26 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523297#comment-16523297
 ] 

Dong Lin commented on KAFKA-6949:
-

[~rsivaram] The patch has been merged into trunk. I opened PR 
[https://github.com/apache/kafka/pull/5293] to merge it into 2.0. Typically do 
we need review from another committer in order to cherry-pick patch from trunk 
into a release branch?

Also, I added fix version 2.0.0 in this JIRA. Please feel free to change it as 
you see appropriate. Thanks.

 

> alterReplicaLogDirs() should grab partition lock when accessing log of the 
> future replica
> -
>
> Key: KAFKA-6949
> URL: https://issues.apache.org/jira/browse/KAFKA-6949
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dong Lin
>Priority: Blocker
> Fix For: 2.0.0
>
>
> I found this in a failed execution of 
> kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like 
> we're missing some option checking.
> {code}
> [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while 
> changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76)
> java.util.NoSuchElementException: None.get
>   at scala.None$.get(Option.scala:347)
>   at scala.None$.get(Option.scala:345)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576)
>   at 
> kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:138)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-06-26 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6949:

Fix Version/s: 2.0.0

> alterReplicaLogDirs() should grab partition lock when accessing log of the 
> future replica
> -
>
> Key: KAFKA-6949
> URL: https://issues.apache.org/jira/browse/KAFKA-6949
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dong Lin
>Priority: Blocker
> Fix For: 2.0.0
>
>
> I found this in a failed execution of 
> kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like 
> we're missing some option checking.
> {code}
> [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while 
> changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76)
> java.util.NoSuchElementException: None.get
>   at scala.None$.get(Option.scala:347)
>   at scala.None$.get(Option.scala:345)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576)
>   at 
> kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:138)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523295#comment-16523295
 ] 

ASF GitHub Bot commented on KAFKA-6949:
---

lindong28 opened a new pull request #5293: KAFKA-6949; alterReplicaLogDirs() 
should grab partition lock when accessing log of the future replica
URL: https://github.com/apache/kafka/pull/5293
 
 
   NoSuchElementException will be thrown if ReplicaAlterDirThread replaces the 
current replica with future replica right before the request handler thread 
executes `futureReplica.log.get.dir.getParent` in the 
ReplicaManager.alterReplicaLogDirs(). The solution is to grab the partition 
lock when request handler thread attempts to check the destination log 
directory of the future replica.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> alterReplicaLogDirs() should grab partition lock when accessing log of the 
> future replica
> -
>
> Key: KAFKA-6949
> URL: https://issues.apache.org/jira/browse/KAFKA-6949
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dong Lin
>Priority: Blocker
> Fix For: 2.0.0
>
>
> I found this in a failed execution of 
> kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like 
> we're missing some option checking.
> {code}
> [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while 
> changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76)
> java.util.NoSuchElementException: None.get
>   at scala.None$.get(Option.scala:347)
>   at scala.None$.get(Option.scala:345)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576)
>   at 
> kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:138)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523292#comment-16523292
 ] 

ASF GitHub Bot commented on KAFKA-6949:
---

lindong28 closed pull request #5081: KAFKA-6949; alterReplicaLogDirs() should 
grab partition lock when accessing log of the future replica
URL: https://github.com/apache/kafka/pull/5081
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 9ab1ec47af8..b80c34475d3 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -149,10 +149,10 @@ class Partition(val topic: String,
 * @return true iff the future replica is created
 */
   def maybeCreateFutureReplica(logDir: String): Boolean = {
-// The readLock is needed to make sure that while the caller checks the 
log directory of the
+// The writeLock is needed to make sure that while the caller checks the 
log directory of the
 // current replica and the existence of the future replica, no other 
thread can update the log directory of the
 // current replica or remove the future replica.
-inReadLock(leaderIsrUpdateLock) {
+inWriteLock(leaderIsrUpdateLock) {
   val currentReplica = getReplica().get
   if (currentReplica.log.get.dir.getParent == logDir)
 false
@@ -207,29 +207,52 @@ class Partition(val topic: String,
 allReplicasMap.remove(replicaId)
   }
 
-  def removeFutureLocalReplica() {
+  def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
+inReadLock(leaderIsrUpdateLock) {
+  getReplica(Request.FutureLocalReplicaId) match {
+case Some(futureReplica) =>
+  if (futureReplica.log.get.dir.getParent != newDestinationDir)
+true
+  else
+false
+case None => false
+  }
+}
+  }
+
+  def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) {
 inWriteLock(leaderIsrUpdateLock) {
   allReplicasMap.remove(Request.FutureLocalReplicaId)
+  if (deleteFromLogDir)
+logManager.asyncDelete(topicPartition, isFuture = true)
 }
   }
 
-  // Return true iff the future log has caught up with the current log for 
this partition
+  // Return true iff the future replica exists and it has caught up with the 
current replica for this partition
   // Only ReplicaAlterDirThread will call this method and 
ReplicaAlterDirThread should remove the partition
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
 val replica = getReplica().get
-val futureReplica = getReplica(Request.FutureLocalReplicaId).get
-if (replica.logEndOffset == futureReplica.logEndOffset) {
+val futureReplicaLEO = 
getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset)
+if (futureReplicaLEO.contains(replica.logEndOffset)) {
   // The write lock is needed to make sure that while 
ReplicaAlterDirThread checks the LEO of the
   // current replica, no other thread can update LEO of the current 
replica via log truncation or log append operation.
   inWriteLock(leaderIsrUpdateLock) {
-if (replica.logEndOffset == futureReplica.logEndOffset) {
-  logManager.replaceCurrentWithFutureLog(topicPartition)
-  replica.log = futureReplica.log
-  futureReplica.log = None
-  allReplicasMap.remove(Request.FutureLocalReplicaId)
-  true
-} else false
+getReplica(Request.FutureLocalReplicaId) match {
+  case Some(futureReplica) =>
+if (replica.logEndOffset == futureReplica.logEndOffset) {
+  logManager.replaceCurrentWithFutureLog(topicPartition)
+  replica.log = futureReplica.log
+  futureReplica.log = None
+  allReplicasMap.remove(Request.FutureLocalReplicaId)
+  true
+} else false
+  case None =>
+// Future replica is removed by a non-ReplicaAlterLogDirsThread 
before this method is called
+// In this case the partition should have been removed from state 
of the ReplicaAlterLogDirsThread
+// Return false so that ReplicaAlterLogDirsThread does not have to 
remove this partition from the state again to avoid race condition
+false
+}
   }
 } else false
   }
@@ -550,15 +573,22 @@ class Partition(val topic: String,
   }
 
   private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, 
isFuture: Boolean): Unit = {
-  if (isFuture)
-