[jira] [Updated] (KAFKA-5341) Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics

2017-07-26 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-5341:
-
Fix Version/s: 1.0.0

> Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics
> ---
>
> Key: KAFKA-5341
> URL: https://issues.apache.org/jira/browse/KAFKA-5341
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Dong Lin
>Assignee: Dong Lin
> Fix For: 1.0.0
>
>
> We currently have under replicated partitions, but we do not have a metric to 
> track the number of partitions whose in-sync replicas count < minIsr. 
> Partitions whose in-syn replicas count < minIsr will be unavailable to those 
> producers who uses ack = all. It is important for Kafka operators to be 
> notified of the existence of such partition because their existence reduces 
> the availability of the Kafka service.
> More specifically, we can define a per-broker metric 
> UnderMinIsrPartitionCount as "The number of partitions that this broker leads 
> for which in-sync replicas count < minIsr." So if the RF was 3, and min ISR 
> is 2, then when there are 2 replicas in ISR this partition would be in the 
> under replicated partitions count. When there is 1 replica in ISR, this 
> partition would also be in the UnderMinIsrPartitionCount.
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-164-+Add+UnderMinIsrPartitionCount+and+per-partition+UnderMinIsr+metrics
>  for more detail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101659#comment-16101659
 ] 

Matthias J. Sax commented on KAFKA-4327:


It's Streams tool, so it belongs to package `o.a.k.streams.tools` -- we only 
put it into core because of the ZK dependency and we did not want to add ZK 
dependency to streams module. \cc [~ijuma] [~guozhang] [~ewencp]

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5501:

Description: Synchronous zookeeper apis means that we wait an entire round 
trip before doing the next operation. We should introduce a zookeeper client 
that encourages pipelined requests to zookeeper.  (was: Synchronous zookeeper 
writes means that we wait an entire round trip before doing the next write. 
These synchronous writes are happening at a per-partition granularity in 
several places, so partition-heavy clusters suffer from the controller doing 
many sequential round trips to zookeeper.
* PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
zookeeper on transition to OnlinePartition. This gets triggered per-partition 
sequentially with synchronous writes during controlled shutdown of the shutting 
down broker's replicas for which it is the leader.
* ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
triggered per-partition sequentially with synchronous writes for failed or 
controlled shutdown brokers.)

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper apis means that we wait an entire round trip before 
> doing the next operation. We should introduce a zookeeper client that 
> encourages pipelined requests to zookeeper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5644) Transient test failure: ResetConsumerGroupOffsetTest.testResetOffsetsToZonedDateTime

2017-07-26 Thread Manikumar (JIRA)
Manikumar created KAFKA-5644:


 Summary: Transient test failure: 
ResetConsumerGroupOffsetTest.testResetOffsetsToZonedDateTime
 Key: KAFKA-5644
 URL: https://issues.apache.org/jira/browse/KAFKA-5644
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0
Reporter: Manikumar
Priority: Minor


{quote}
unit.kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToZonedDateTime 
FAILED
java.lang.AssertionError: Expected the consumer group to reset to when 
offset was 50.
at kafka.utils.TestUtils$.fail(TestUtils.scala:339)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:853)
at 
unit.kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsToZonedDateTime(ResetConsumerGroupOffsetTest.scala:188)
{quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101652#comment-16101652
 ] 

Matthias J. Sax commented on KAFKA-5386:


Understood. Technically, it would be possible to allow users to create 
changelog topic manually. But there are some strings attached. But we got the 
issues with naming conventions multiple times already. Maybe we need to do 
something about it. If you want to work on this, we would be more than happy. 
However, this change would require a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals 
Let us know if you need any help preparing a KIP in case you want to pick it 
up. We can also discuss a little more on this JIRA. \cc [~miguno] [~guozhang] 
[~damianguy] [~enothereska] [~bbejeck]

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>  Labels: needs-kip
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5501) use async zookeeper apis everywhere

2017-07-26 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-5501.

   Resolution: Fixed
Fix Version/s: 1.0.0

Issue resolved by pull request 3427
[https://github.com/apache/kafka/pull/3427]

> use async zookeeper apis everywhere
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper writes means that we wait an entire round trip before 
> doing the next write. These synchronous writes are happening at a 
> per-partition granularity in several places, so partition-heavy clusters 
> suffer from the controller doing many sequential round trips to zookeeper.
> * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
> zookeeper on transition to OnlinePartition. This gets triggered per-partition 
> sequentially with synchronous writes during controlled shutdown of the 
> shutting down broker's replicas for which it is the leader.
> * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
> OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
> triggered per-partition sequentially with synchronous writes for failed or 
> controlled shutdown brokers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4602) KIP-72 Allow putting a bound on memory consumed by Incoming requests

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101220#comment-16101220
 ] 

ASF GitHub Bot commented on KAFKA-4602:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2330


> KIP-72 Allow putting a bound on memory consumed by Incoming requests
> 
>
> Key: KAFKA-4602
> URL: https://issues.apache.org/jira/browse/KAFKA-4602
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
> Fix For: 1.0.0
>
> Attachments: screenshot-1.png
>
>
> this issue tracks the implementation of KIP-72, as outlined here - 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection

2017-07-26 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101249#comment-16101249
 ] 

huxihx commented on KAFKA-5641:
---

[~ijuma]
"The broker only processes one request at a time, so even if you send the 
metadata request, it won't be processed until the processing of the previous 
request is completed"
 Seems this does not apply for METADATA request since sender thread 
only drains ProducerBatch and ensures the ordering by muting all batches from 
the same partitions before sending them out. 

In my opinion, NetworkClient could always process Metatadata request. The idea 
behind this jira is to say we could not honor 
`max.in.flight.requests.per.connection` when updating the metadata. That's to 
say, create a new method `canSendMoreNonProduceRequest` which does not care 
queue size. 

Do I make myself clear?

> Metadata request should always be allowed to send no regardless of value for 
> max.in.flight.requests.per.connection
> --
>
> Key: KAFKA-5641
> URL: https://issues.apache.org/jira/browse/KAFKA-5641
> Project: Kafka
>  Issue Type: Improvement
>  Components: network, producer 
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>
> Metadata request might not be able to be sent when 
> `max.in.flight.requests.per.connection` is set to 1 and there is already an 
> inflight request in the same node's queue, as show below:
> {code:title=NetworkClient.java|borderStyle=solid}
> private long maybeUpdate(long now, Node node) {
> String nodeConnectionId = node.idString();
> if (canSendRequest(nodeConnectionId)) {
> ..
> }
> {code}
> However, setting `max.in.flight.requests.per.connection` to 1 actually means 
> no out-of-order for the produced records, Metadata requests should have no 
> related with this config. We don't have to check the inflight request's queue 
> size when sending Metadata request.
> [~ijuma] Does it make any sense? If yes, I will work on it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101268#comment-16101268
 ] 

Onur Karaman commented on KAFKA-5501:
-

[~ijuma] I went ahead and reworded this ticket to be about making the client 
and KAFKA-5642 to be about using the client. With that, I went ahead and closed 
this ticket.

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper apis means that we wait an entire round trip before 
> doing the next operation. We should introduce a zookeeper client that 
> encourages pipelined requests to zookeeper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5562) Do streams state directory cleanup on a single thread

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101401#comment-16101401
 ] 

ASF GitHub Bot commented on KAFKA-5562:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3516


> Do streams state directory cleanup on a single thread
> -
>
> Key: KAFKA-5562
> URL: https://issues.apache.org/jira/browse/KAFKA-5562
> Project: Kafka
>  Issue Type: Bug
>Reporter: Damian Guy
>Assignee: Damian Guy
>
> Currently in streams we clean up old state directories every so often (as 
> defined by {{state.cleanup.delay.ms}}). However, every StreamThread runs the 
> cleanup, which is both unnecessary and can potentially lead to race 
> conditions.
> It would be better to perform the state cleanup on a single thread and only 
> when the {{KafkaStreams}} instance is in a running state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5643) Using _DUCKTAPE_OPTIONS has no effect on executing tests

2017-07-26 Thread Paolo Patierno (JIRA)
Paolo Patierno created KAFKA-5643:
-

 Summary: Using _DUCKTAPE_OPTIONS has no effect on executing tests
 Key: KAFKA-5643
 URL: https://issues.apache.org/jira/browse/KAFKA-5643
 Project: Kafka
  Issue Type: Bug
  Components: system tests
Reporter: Paolo Patierno
Assignee: Paolo Patierno


Hi,
as described in the documentation, you should be able to enable debugging using 
the following line :

_DUCKTAPE_OPTIONS="--debug" bash tests/docker/run_tests.sh | tee debug_logs.txt

Instead the _DUCKTAPE_OPTIONS isn't available in the run_tests.sh script so 
it's not passed to the ducker-ak and finally on the ducktape command line.

Thanks,
Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5642) use async ZookeeperClient everywhere

2017-07-26 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5642:
---

 Summary: use async ZookeeperClient everywhere
 Key: KAFKA-5642
 URL: https://issues.apache.org/jira/browse/KAFKA-5642
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


Synchronous zookeeper writes means that we wait an entire round trip before 
doing the next write. These synchronous writes are happening at a per-partition 
granularity in several places, so partition-heavy clusters suffer from the 
controller doing many sequential round trips to zookeeper.
* PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
zookeeper on transition to OnlinePartition. This gets triggered per-partition 
sequentially with synchronous writes during controlled shutdown of the shutting 
down broker's replicas for which it is the leader.
* ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
triggered per-partition sequentially with synchronous writes for failed or 
controlled shutdown brokers.

KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined 
requests to zookeeper. We should replace ZkClient's usage with this client.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5501) use async zookeeper apis everywhere

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101230#comment-16101230
 ] 

ASF GitHub Bot commented on KAFKA-5501:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3427


> use async zookeeper apis everywhere
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper writes means that we wait an entire round trip before 
> doing the next write. These synchronous writes are happening at a 
> per-partition granularity in several places, so partition-heavy clusters 
> suffer from the controller doing many sequential round trips to zookeeper.
> * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
> zookeeper on transition to OnlinePartition. This gets triggered per-partition 
> sequentially with synchronous writes during controlled shutdown of the 
> shutting down broker's replicas for which it is the leader.
> * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
> OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
> triggered per-partition sequentially with synchronous writes for failed or 
> controlled shutdown brokers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection

2017-07-26 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101231#comment-16101231
 ] 

Ismael Juma commented on KAFKA-5641:


The broker only processes one request at a time, so even if you send the 
metadata request, it won't be processed until the processing of the previous 
request is completed. Given that, the benefit is reasonably small and not clear 
if it's worth adding the special case. Also note that we try to find a node 
with no inflight connections in NetworkClient.leastLoadedNode for this reason.

> Metadata request should always be allowed to send no regardless of value for 
> max.in.flight.requests.per.connection
> --
>
> Key: KAFKA-5641
> URL: https://issues.apache.org/jira/browse/KAFKA-5641
> Project: Kafka
>  Issue Type: Improvement
>  Components: network, producer 
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>
> Metadata request might not be able to be sent when 
> `max.in.flight.requests.per.connection` is set to 1 and there is already an 
> inflight request in the same node's queue, as show below:
> {code:title=NetworkClient.java|borderStyle=solid}
> private long maybeUpdate(long now, Node node) {
> String nodeConnectionId = node.idString();
> if (canSendRequest(nodeConnectionId)) {
> ..
> }
> {code}
> However, setting `max.in.flight.requests.per.connection` to 1 actually means 
> no out-of-order for the produced records, Metadata requests should have no 
> related with this config. We don't have to check the inflight request's queue 
> size when sending Metadata request.
> [~ijuma] Does it make any sense? If yes, I will work on it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-5501) use async zookeeper apis everywhere

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reopened KAFKA-5501:


> use async zookeeper apis everywhere
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper writes means that we wait an entire round trip before 
> doing the next write. These synchronous writes are happening at a 
> per-partition granularity in several places, so partition-heavy clusters 
> suffer from the controller doing many sequential round trips to zookeeper.
> * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
> zookeeper on transition to OnlinePartition. This gets triggered per-partition 
> sequentially with synchronous writes during controlled shutdown of the 
> shutting down broker's replicas for which it is the leader.
> * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
> OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
> triggered per-partition sequentially with synchronous writes for failed or 
> controlled shutdown brokers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5501) use async zookeeper apis everywhere

2017-07-26 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101244#comment-16101244
 ] 

Ismael Juma commented on KAFKA-5501:


The merged PR adds the async ZookeeperClient, but it doesn't include the 
changes to use said client. [~onurkaraman], I reopened, but perhaps we should 
repurpose his JIRA and create new ones for using the APIs. Probably one for the 
controller, one for the authorizer, one for configs, etc. Does that make sense?

> use async zookeeper apis everywhere
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper writes means that we wait an entire round trip before 
> doing the next write. These synchronous writes are happening at a 
> per-partition granularity in several places, so partition-heavy clusters 
> suffer from the controller doing many sequential round trips to zookeeper.
> * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
> zookeeper on transition to OnlinePartition. This gets triggered per-partition 
> sequentially with synchronous writes during controlled shutdown of the 
> shutting down broker's replicas for which it is the leader.
> * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
> OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
> triggered per-partition sequentially with synchronous writes for failed or 
> controlled shutdown brokers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-5501.
-
Resolution: Fixed

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper apis means that we wait an entire round trip before 
> doing the next operation. We should introduce a zookeeper client that 
> encourages pipelined requests to zookeeper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection

2017-07-26 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101273#comment-16101273
 ] 

Ismael Juma edited comment on KAFKA-5641 at 7/26/17 7:27 AM:
-

My comment was about the broker. And yes, the broker applies head of line 
blocking so that it processes one request at a time for a given connection. If 
you send multiple requests, all but one are kept in the socket buffer. There is 
logic for this in the Selector.


was (Author: ijuma):
My comment was about the broker. And yes, the broker applies head of line 
blocking so that it processes one request at a time. If you send multiple 
requests, all but one are kept in the socket buffer. There is logic for this in 
the Selector.

> Metadata request should always be allowed to send no regardless of value for 
> max.in.flight.requests.per.connection
> --
>
> Key: KAFKA-5641
> URL: https://issues.apache.org/jira/browse/KAFKA-5641
> Project: Kafka
>  Issue Type: Improvement
>  Components: network, producer 
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>
> Metadata request might not be able to be sent when 
> `max.in.flight.requests.per.connection` is set to 1 and there is already an 
> inflight request in the same node's queue, as show below:
> {code:title=NetworkClient.java|borderStyle=solid}
> private long maybeUpdate(long now, Node node) {
> String nodeConnectionId = node.idString();
> if (canSendRequest(nodeConnectionId)) {
> ..
> }
> {code}
> However, setting `max.in.flight.requests.per.connection` to 1 actually means 
> no out-of-order for the produced records, Metadata requests should have no 
> related with this config. We don't have to check the inflight request's queue 
> size when sending Metadata request.
> [~ijuma] Does it make any sense? If yes, I will work on it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5501:

Summary: introduce async ZookeeperClient  (was: use async zookeeper apis 
everywhere)

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper writes means that we wait an entire round trip before 
> doing the next write. These synchronous writes are happening at a 
> per-partition granularity in several places, so partition-heavy clusters 
> suffer from the controller doing many sequential round trips to zookeeper.
> * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
> zookeeper on transition to OnlinePartition. This gets triggered per-partition 
> sequentially with synchronous writes during controlled shutdown of the 
> shutting down broker's replicas for which it is the leader.
> * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
> OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
> triggered per-partition sequentially with synchronous writes for failed or 
> controlled shutdown brokers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-3210) Using asynchronous calls through the raw ZK API in ZkUtils

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-3210.

Resolution: Won't Fix

We are following a slightly different approach, see KAFKA-5501.

> Using asynchronous calls through the raw ZK API in ZkUtils
> --
>
> Key: KAFKA-3210
> URL: https://issues.apache.org/jira/browse/KAFKA-3210
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, zkclient
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>
> We have observed a number of issues with the controller interaction with 
> ZooKeeper mainly because ZkClient creates new sessions transparently under 
> the hood. Creating sessions transparently enables, for example, old 
> controller to successfully update znodes in ZooKeeper even when they aren't 
> the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass 
> the ZkClient lib like we did with ZKWatchedEphemeral.
> In addition to fixing such races with the controller, it would improve 
> performance significantly if we used the async API (see KAFKA-3038). The 
> async API is more efficient because it pipelines the requests to ZooKeeper, 
> and the number of requests upon controller recovery can be large.
> This jira proposes to make these two changes to the calls in ZkUtils and to 
> do it, one path is to first replace the calls in ZkUtils with raw async ZK 
> calls and block so that we don't have to change the controller code in this 
> phase. Once this step is accomplished and it is stable, we make changes to 
> the controller to handle the asynchronous calls to ZooKeeper.
> Note that in the first step, we will need to introduce some new logic for 
> session management, which is currently handled entirely by ZkClient. We will 
> also need to implement the subscription mechanism for event notifications 
> (see ZooKeeperLeaderElector as a an exemple).  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5328) consider switching json parser from scala to jackson

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-5328.

Resolution: Duplicate

Duplicate of KAFKA-1595.

> consider switching json parser from scala to jackson
> 
>
> Key: KAFKA-5328
> URL: https://issues.apache.org/jira/browse/KAFKA-5328
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> The scala json parser is significantly slower than jackson.
> This can have a nontrivial impact on controller initialization since the 
> controller loads and parses almost all zookeeper state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1595) Remove deprecated and slower scala JSON parser

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101715#comment-16101715
 ] 

ASF GitHub Bot commented on KAFKA-1595:
---

Github user resetius closed the pull request at:

https://github.com/apache/kafka/pull/2214


> Remove deprecated and slower scala JSON parser
> --
>
> Key: KAFKA-1595
> URL: https://issues.apache.org/jira/browse/KAFKA-1595
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.1.1
>Reporter: Jagbir
>Assignee: Ismael Juma
>  Labels: newbie
>
> The following issue is created as a follow up suggested by Jun Rao
> in a kafka news group message with the Subject
> "Blocking Recursive parsing from 
> kafka.consumer.TopicCount$.constructTopicCount"
> SUMMARY:
> An issue was detected in a typical cluster of 3 kafka instances backed
> by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3,
> java version 1.7.0_65). On consumer end, when consumers get recycled,
> there is a troubling JSON parsing recursion which takes a busy lock and
> blocks consumers thread pool.
> In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes
> a global lock (0xd3a7e1d0) during the rebalance, and fires an
> expensive JSON parsing, while keeping the other consumers from shutting
> down, see, e.g,
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> The deep recursive JSON parsing should be deprecated in favor
> of a better JSON parser, see, e.g,
> http://engineering.ooyala.com/blog/comparing-scala-json-libraries?
> DETAILS:
> The first dump is for a recursive blocking thread holding the lock for 
> 0xd3a7e1d0
> and the subsequent dump is for a waiting thread.
> (Please grep for 0xd3a7e1d0 to see the locked object.)
> Â 
> -8<-
> "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"
> prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000]
> java.lang.Thread.State: RUNNABLE
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 

[jira] [Commented] (KAFKA-3038) Speeding up partition reassignment after broker failure

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101716#comment-16101716
 ] 

ASF GitHub Bot commented on KAFKA-3038:
---

Github user resetius closed the pull request at:

https://github.com/apache/kafka/pull/2213


> Speeding up partition reassignment after broker failure
> ---
>
> Key: KAFKA-3038
> URL: https://issues.apache.org/jira/browse/KAFKA-3038
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 0.9.0.0
>Reporter: Eno Thereska
>
> After a broker failure the controller does several writes to Zookeeper for 
> each partition on the failed broker. Writes are done one at a time, in closed 
> loop, which is slow especially under high latency networks. Zookeeper has 
> support for batching operations (the "multi" API). It is expected that 
> substituting serial writes with batched ones should reduce failure handling 
> time by an order of magnitude.
> This is identified as an issue in 
> https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3
>  (section End-to-end latency during a broker failure)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5648) make Merger extend Aggregator

2017-07-26 Thread Clemens Valiente (JIRA)
Clemens Valiente created KAFKA-5648:
---

 Summary: make Merger extend Aggregator
 Key: KAFKA-5648
 URL: https://issues.apache.org/jira/browse/KAFKA-5648
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 0.11.0.0
Reporter: Clemens Valiente
Assignee: Clemens Valiente
Priority: Minor


Hi,

I suggest that Merger should extend Aggregator.
reason:
Both classes usually do very similar things. A merger takes two sessions and 
combines them, an aggregator takes an existing session and aggregates new 
values into it.
in some use cases it is actually the same thing, e.g.:
 -> .map() to  -> 
.groupByKey().aggregate() to 
In this case both merger and aggregator do the same thing: take two lists and 
combine them into one.
With the proposed change we could pass the Merger as both the merger and 
aggregator to the .aggregate() method and keep our business logic within one 
merger class.

Or in other words: The Merger is simply an Aggregator that happens to aggregate 
two objects of the same class




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5646) Use async ZookeeperClient for Config and ISR management

2017-07-26 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-5646:
--

 Summary: Use async ZookeeperClient for Config and ISR management
 Key: KAFKA-5646
 URL: https://issues.apache.org/jira/browse/KAFKA-5646
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5642) Use async ZookeeperClient in Controller

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5642:
---
Summary: Use async ZookeeperClient in Controller  (was: use async 
ZookeeperClient everywhere)

> Use async ZookeeperClient in Controller
> ---
>
> Key: KAFKA-5642
> URL: https://issues.apache.org/jira/browse/KAFKA-5642
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Synchronous zookeeper writes means that we wait an entire round trip before 
> doing the next write. These synchronous writes are happening at a 
> per-partition granularity in several places, so partition-heavy clusters 
> suffer from the controller doing many sequential round trips to zookeeper.
> * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
> zookeeper on transition to OnlinePartition. This gets triggered per-partition 
> sequentially with synchronous writes during controlled shutdown of the 
> shutting down broker's replicas for which it is the leader.
> * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
> OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
> triggered per-partition sequentially with synchronous writes for failed or 
> controlled shutdown brokers.
> KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined 
> requests to zookeeper. We should replace ZkClient's usage with this client.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-26 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101966#comment-16101966
 ] 

Ewen Cheslack-Postava commented on KAFKA-4327:
--

[~mjsax] We'd have to decide whether the java api for the command is considered 
public or just the sh script that executes it. If so we could move the tool but 
would want some deprecation period for the original w/ some logging about the 
deprecation. But otherwise I agree, the natural home for the tool is in streams.


> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-26 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16102147#comment-16102147
 ] 

Ismael Juma commented on KAFKA-4327:


We discussed this at the time and we documented that it's _not_ part of public 
API so that we could move it to the right location:

{code}
This class is not part of public API. For backward compatibility, use 
the provided script in "bin/" instead of calling this class directly from your 
code.
{code}

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5657) Connect REST API should include the connector type when describing a connector

2017-07-26 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-5657:


 Summary: Connect REST API should include the connector type when 
describing a connector
 Key: KAFKA-5657
 URL: https://issues.apache.org/jira/browse/KAFKA-5657
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 0.11.0.0
Reporter: Randall Hauch
 Fix For: 1.0.0


Kafka Connect's REST API's {{connectors/}} and {{connectors/{name}}} endpoints 
should include whether the connector is a source or a sink.

See KAFKA-4343 and KIP-151 for the related modification of the 
{{connector-plugins}} endpoint.

Also see KAFKA-4279 for converter-related endpoints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4279) REST endpoint to list converter plugins

2017-07-26 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-4279:
-
Labels: needs-kip newbie  (was: )

> REST endpoint to list converter plugins
> ---
>
> Key: KAFKA-4279
> URL: https://issues.apache.org/jira/browse/KAFKA-4279
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We have a REST resource that allows users to see the available plugins, but 
> we have no equivalent that allows listing available converters.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled

2017-07-26 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16102176#comment-16102176
 ] 

Jiangjie Qin commented on KAFKA-5621:
-

[~apurva] Yes, I agree that expiry ms is a new concept as it is an additional 
thing users may want to think, i.e. "If I have a partition unavailable 
temporarily, how long am I willing to wait for it to come back?" Arguably this 
can also be derived from request timeout and retries. But the difference here 
is that those two configs are primarily for other cases, and in practice we 
found it is quite tricky (if possible) to get them right for the batch 
expiration.

> The producer should retry expired batches when retries are enabled
> --
>
> Key: KAFKA-5621
> URL: https://issues.apache.org/jira/browse/KAFKA-5621
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
> Fix For: 1.0.0
>
>
> Today, when a batch is expired in the accumulator, a {{TimeoutException}} is 
> raised to the user.
> It might be better the producer to retry the expired batch rather up to the 
> configured number of retries. This is more intuitive from the user's point of 
> view. 
> Further the proposed behavior makes it easier for applications like mirror 
> maker to provide ordering guarantees even when batches expire. Today, they 
> would resend the expired batch and it would get added to the back of the 
> queue, causing the output ordering to be different from the input ordering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5648) make Merger extend Aggregator

2017-07-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16102119#comment-16102119
 ] 

Matthias J. Sax commented on KAFKA-5648:


Your observation is correct, that {{Merger}} and {{Aggregator}} are similar. 
You also stated correctly, that the types are different though, as the 
{{Merger}} merges two aggregates of same type, while the Aggregator in general 
merged a single value (of type-A) merges the value into an aggregate (of 
type-B). Thus, {{Merger extends Aggregator make Merger extend Aggregator
> -
>
> Key: KAFKA-5648
> URL: https://issues.apache.org/jira/browse/KAFKA-5648
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Clemens Valiente
>Assignee: Clemens Valiente
>Priority: Minor
>
> Hi,
> I suggest that Merger should extend Aggregator.
> reason:
> Both classes usually do very similar things. A merger takes two sessions and 
> combines them, an aggregator takes an existing session and aggregates new 
> values into it.
> in some use cases it is actually the same thing, e.g.:
>  -> .map() to  -> 
> .groupByKey().aggregate() to 
> In this case both merger and aggregator do the same thing: take two lists and 
> combine them into one.
> With the proposed change we could pass the Merger as both the merger and 
> aggregator to the .aggregate() method and keep our business logic within one 
> merger class.
> Or in other words: The Merger is simply an Aggregator that happens to 
> aggregate two objects of the same class



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled

2017-07-26 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101932#comment-16101932
 ] 

Jiangjie Qin commented on KAFKA-5621:
-

[~ijuma] [~apurva] The expiration for messages in the accumulator was not for 
memory footprint control, but for making progress when a partition is stuck. 
For example, if the leader of a partition becomes unavailable for some reason, 
the records in the accumulator cannot be sent. Retry only makes sense when the 
producer can try. So we have to expire the records at some point when that 
partition cannot make progress, whether it is expired after request_timeout or 
retries * request_timeout could be discussed. But notice that some times client 
will set the retries to be Integer.MAX_VALUE. This will also result in 
unexpected behavior.

The reasons of having an explicit batch.expiry.ms are: 1) we have exposed the 
concept of bathing to the users through batch.size and linger.ms. So users 
should have already known the producer sends batches. No new concept is added. 
2) If a record has been sitting in the record accumulator for more than 
batch.expiry.ms, likely there is a very long queue or the producer cannot make 
progress. So users may want to get an exception and do something. And this 
expiration time is kind of an SLO and is not necessarily related to the 
request_timeout * retries which is intended for the remote call. So decoupling 
them would be useful.

> The producer should retry expired batches when retries are enabled
> --
>
> Key: KAFKA-5621
> URL: https://issues.apache.org/jira/browse/KAFKA-5621
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
> Fix For: 1.0.0
>
>
> Today, when a batch is expired in the accumulator, a {{TimeoutException}} is 
> raised to the user.
> It might be better the producer to retry the expired batch rather up to the 
> configured number of retries. This is more intuitive from the user's point of 
> view. 
> Further the proposed behavior makes it easier for applications like mirror 
> maker to provide ordering guarantees even when batches expire. Today, they 
> would resend the expired batch and it would get added to the back of the 
> queue, causing the output ordering to be different from the input ordering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5656) Support bulk attributes request on KafkaMbean where some attributes do not exist

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101941#comment-16101941
 ] 

ASF GitHub Bot commented on KAFKA-5656:
---

GitHub user ErikKringen opened a pull request:

https://github.com/apache/kafka/pull/3582

KAFKA-5656: Support bulk attributes request on KafkaMbean where some 
Support bulk attributes request on KafkaMbean where some attributes do not exist



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ErikKringen/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3582.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3582


commit 0febcdd59cee9e1f34bdd9646aee59944c28386e
Author: Erik.Kringen 
Date:   2017-07-26T17:12:04Z

KAFKA-5656: Support bulk attributes request on KafkaMbean where some 
attributes do not exist




> Support bulk attributes request on KafkaMbean where some attributes do not 
> exist
> 
>
> Key: KAFKA-5656
> URL: https://issues.apache.org/jira/browse/KAFKA-5656
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Erik Kringen
>Priority: Minor
>
> According to Oracle documentation on [Implementing a Dynamic 
> MBean|http://docs.oracle.com/cd/E19698-01/816-7609/6mdjrf83d/index.html] 
> bq. The bulk getter and setter methods usually rely on the generic getter and 
> setter, respectively. This makes them independent of the management 
> interface, which can simplify certain modifications. In this case, their 
> implementation consists mostly of error checking on the list of attributes. 
> However, all bulk getters and setters must be implemented so that an error on 
> any one attribute does not interrupt or invalidate the bulk operation on the 
> other attributes.
> bq. If an attribute cannot be read, then its name-value pair is not included 
> in the list of results. If an attribute cannot be written, it will not be 
> copied to the returned list of successful set operations. As a result, if 
> there are any errors, the lists returned by bulk operators will not have the 
> same length as the array or list passed to them. In any case, the bulk 
> operators do not guarantee that their returned lists have the same ordering 
> of attributes as the input array or list.
> The current implementation of 
> {code}org.apache.kafka.common.metrics.JmxReporter.KafkaMbean#getAttributes{code}
>  returns an empty list if any of the the requested attributes are not found.
> This method should instead log the exception but allow all requested 
> attributes that are present to be returned, as prescribed via the 
> DynamicMBean interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5658) Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions

2017-07-26 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-5658:
---
Description: The AdminClient does not properly clear calls from the 
callsInFlight structure.  Later, in an effort to clear the lingering call 
objects, it closes the connection they are associated with.  This disrupts new 
incoming calls, which then get {{java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.BrokerNotAvailableException}}.  (was: if i 
create an admin client and let it sit unused for some amount of time, then 
attempt to use it i will get the following 

{noformat}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.BrokerNotAvailableException
{noformat}

even though the broker is up. if before each usage of adminclient i create a 
new admin client i do not see the same behavior.)

> Fix AdminClient request timeout handling bug resulting in continual 
> BrokerNotAvailableExceptions
> 
>
> Key: KAFKA-5658
> URL: https://issues.apache.org/jira/browse/KAFKA-5658
> Project: Kafka
>  Issue Type: Bug
>Reporter: dan norwood
>Assignee: Colin P. McCabe
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> The AdminClient does not properly clear calls from the callsInFlight 
> structure.  Later, in an effort to clear the lingering call objects, it 
> closes the connection they are associated with.  This disrupts new incoming 
> calls, which then get {{java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.BrokerNotAvailableException}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5659) AdminClient#describeConfigs makes an extra empty request when only broker info is requested

2017-07-26 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-5659:
--

 Summary: AdminClient#describeConfigs makes an extra empty request 
when only broker info is requested
 Key: KAFKA-5659
 URL: https://issues.apache.org/jira/browse/KAFKA-5659
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


AdminClient#describeConfigs makes an extra empty request when only broker info 
is requested



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5658) adminclient will stop working after some amount of time

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5658:
---
Priority: Critical  (was: Major)

> adminclient will stop working after some amount of time
> ---
>
> Key: KAFKA-5658
> URL: https://issues.apache.org/jira/browse/KAFKA-5658
> Project: Kafka
>  Issue Type: Bug
>Reporter: dan norwood
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> if i create an admin client and let it sit unused for some amount of time, 
> then attempt to use it i will get the following 
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.BrokerNotAvailableException
> {noformat}
> even though the broker is up. if before each usage of adminclient i create a 
> new admin client i do not see the same behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5658) adminclient will stop working after some amount of time

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5658:
---
Fix Version/s: 0.11.0.1

> adminclient will stop working after some amount of time
> ---
>
> Key: KAFKA-5658
> URL: https://issues.apache.org/jira/browse/KAFKA-5658
> Project: Kafka
>  Issue Type: Bug
>Reporter: dan norwood
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> if i create an admin client and let it sit unused for some amount of time, 
> then attempt to use it i will get the following 
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.BrokerNotAvailableException
> {noformat}
> even though the broker is up. if before each usage of adminclient i create a 
> new admin client i do not see the same behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5664) Disable auto offset commit in ConsoleConsumer if no group is provided

2017-07-26 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-5664:
--

 Summary: Disable auto offset commit in ConsoleConsumer if no group 
is provided
 Key: KAFKA-5664
 URL: https://issues.apache.org/jira/browse/KAFKA-5664
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


In ConsoleCosnumer, if no group is provided, we generate a random groupId:
{code}
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new 
Random().nextInt(10)}")
{code}
In this case, since the group is not likely to be used again, we should disable 
automatic offset commits. This avoids polluting the coordinator cache with 
offsets that will never be used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5663) LogDirFailureTest system test fails

2017-07-26 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16102403#comment-16102403
 ] 

Dong Lin commented on KAFKA-5663:
-

Thanks [~apurva]. I will look into this.

> LogDirFailureTest system test fails
> ---
>
> Key: KAFKA-5663
> URL: https://issues.apache.org/jira/browse/KAFKA-5663
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Dong Lin
>
> The recently added JBOD system test failed last night.
> {noformat}
> Producer failed to produce messages for 20s.
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/core/log_dir_failure_test.py",
>  line 166, in test_replication_with_disk_failure
> self.start_producer_and_consumer()
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 75, in start_producer_and_consumer
> self.producer_start_timeout_sec)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Producer failed to produce messages for 20s.
> {noformat}
> Complete logs here:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-07-26--001.1501074756--apache--trunk--91c207c/LogDirFailureTest/test_replication_with_disk_failure/bounce_broker=False.security_protocol=PLAINTEXT.broker_type=follower/48.tgz



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5664) Disable auto offset commit in ConsoleConsumer if no group is provided

2017-07-26 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-5664:
--

Assignee: Vahid Hashemian

> Disable auto offset commit in ConsoleConsumer if no group is provided
> -
>
> Key: KAFKA-5664
> URL: https://issues.apache.org/jira/browse/KAFKA-5664
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> In ConsoleCosnumer, if no group is provided, we generate a random groupId:
> {code}
> consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new 
> Random().nextInt(10)}")
> {code}
> In this case, since the group is not likely to be used again, we should 
> disable automatic offset commits. This avoids polluting the coordinator cache 
> with offsets that will never be used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5665) Incorrect interruption invoking method used for Heartbeat thread

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16102661#comment-16102661
 ] 

ASF GitHub Bot commented on KAFKA-5665:
---

GitHub user huxihx opened a pull request:

https://github.com/apache/kafka/pull/3586

KAFKA-5665: Heartbeat thread should use correct interruption method to 
restore status

When interrupting the background heartbeat thread, `Thread.interrupted();` 
is used. Actually, `Thread.currentThread().interrupt();` should be used to 
restore the interruption status. An alternative way to solve is to remove 
`Thread.interrupted();` since HeartbeatThread extends Thread and all code 
higher up on the call stack is controlled, so we could safely swallow this 
exception. Anyway, `Thread.interrupted();` should not be used here. It's a test 
method not an action.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huxihx/kafka KAFKA-5665

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3586.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3586


commit 36d489eede2229db92eda077ae4baff80044fb25
Author: huxihx 
Date:   2017-07-27T03:53:21Z

KAFKA-5665: Incorrect interruption invoking method used for Heartbeat thread

When interrupting the background heartbeat thread, `Thread.interrupted();` 
is used. Actually, `Thread.currentThread().interrupt();` should be used to 
restore the interruption status. An alternative way to solve is to remove 
`Thread.interrupted();` since HeartbeatThread extends Thread and all code 
higher up on the call stack is controlled, so we could safely swallow this 
exception. Anyway, `Thread.interrupted();` should not be used here. It's a test 
method not an action.




> Incorrect interruption invoking method used for Heartbeat thread 
> -
>
> Key: KAFKA-5665
> URL: https://issues.apache.org/jira/browse/KAFKA-5665
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When interrupting the background heartbeat thread, `Thread.interrupted();` is 
> used. Actually, `Thread.currentThread().interrupt();` should be used to 
> restore the interruption status. An alternative way to solve is to remove 
> `Thread.interrupted();` since HeartbeatThread extends Thread and all code 
> higher up on the call stack is controlled, so we could safely swallow this 
> exception. Anyway, `Thread.interrupted();`  should not be used here. It's a 
> test method not an action.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5007) Kafka Replica Fetcher Thread- Resource Leak

2017-07-26 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16102622#comment-16102622
 ] 

huxihx commented on KAFKA-5007:
---

[~joseph.alias...@gmail.com] what's the status for this jira? Have you 
confirmed that it's the reason? If yes, I could work on it to fix.

> Kafka Replica Fetcher Thread- Resource Leak
> ---
>
> Key: KAFKA-5007
> URL: https://issues.apache.org/jira/browse/KAFKA-5007
> Project: Kafka
>  Issue Type: Bug
>  Components: core, network
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.10.2.0
> Environment: Centos 7
> Jave 8
>Reporter: Joseph Aliase
>Priority: Critical
>  Labels: reliability
> Attachments: jstack-kafka.out, jstack-zoo.out, lsofkafka.txt, 
> lsofzookeeper.txt
>
>
> Kafka is running out of open file descriptor when system network interface is 
> done.
> Issue description:
> We have a Kafka Cluster of 5 node running on version 0.10.1.1. The open file 
> descriptor for the account running Kafka is set to 10.
> During an upgrade, network interface went down. Outage continued for 12 hours 
> eventually all the broker crashed with java.io.IOException: Too many open 
> files error.
> We repeated the test in a lower environment and observed that Open Socket 
> count keeps on increasing while the NIC is down.
> We have around 13 topics with max partition size of 120 and number of replica 
> fetcher thread is set to 8.
> Using an internal monitoring tool we observed that Open Socket descriptor   
> for the broker pid continued to increase although NIC was down leading to  
> Open File descriptor error. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)